共用方式為


在工作流程中執行 DLT 管線

您可以使用 Databricks 作業、Apache Airflow 或 Azure Data Factory,在數據處理工作流程中執行 DLT 管線。

工作

您可以在 Databricks 作業中協調多個工作,以實作數據處理工作流程。 若要在作業中包含 DLT Pipeline,請在建立作業時使用 Pipeline 任務。 如需作業 ,請參閱DLT 管線工作。

Apache Airflow

Apache Airflow 是一個開放原始碼解決方案,可用於管理和排程數據工作流程。 Airflow 將工作流程表示為操作的有向無環圖(DAG)。 您可以在 Python 檔案中定義工作流程,而 Airflow 會管理排程和執行。 如需搭配 Azure Databricks 安裝和使用 Airflow 的資訊,請參閱 使用 Apache Airflow協調 Azure Databricks 作業。

若要在 Airflow 工作流程中執行 DLT 管線,請使用 DatabricksSubmitRunOperator

要求

要使用 Airflow 支援 DLT,您需要下列條件:

下列範例會建立 Airflow DAG,觸發具有識別碼 8279d543-063c-4d63-9926-dae38e35ce8b的 DLT 管線更新:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

請將工作區中的 CONNECTION_ID 取代為 Airflow 連線識別碼。

將此範例儲存在 airflow/dags 目錄中,並使用 Airflow UI 來 檢視並觸發 DAG。 使用 DLT UI 來檢視管線更新的詳細數據。

Azure Data Factory

注意

DLT 和 Azure Data Factory 各包含選項,以在發生失敗時設定重試次數。 如果在您的 DLT 管線上設定了重試值 ,並且在呼叫該管線的 Azure Data Factory 活動上設定了重試值,則重試次數是 Azure Data Factory 的重試值乘以 DLT 的重試值。

例如,如果管線更新失敗,DLT 預設會重試更新最多五次。 如果 Azure Data Factory 重試設定為 3 次,而您的 DLT 管線會使用預設的五次重試,則失敗的 DLT 管線最多可能會重試 15 次。 為了避免在管線更新失敗時發生過多重試嘗試,Databricks 建議在設定 DLT 管線或呼叫管線的 Azure Data Factory 活動時限制重試次數。

若要變更 DLT 管線的重試組態,請在設定管線時使用 pipelines.numUpdateRetryAttempts 設定。

Azure Data Factory 是雲端式 ETL 服務,可讓您協調數據整合和轉換工作流程。 Azure Data Factory 直接支援在工作流程中執行 Azure Databricks 工作,包括 筆記本、JAR 工作和 Python 腳本。 您也可以從 Azure Data Factory Web 活動呼叫 DLT API,來在工作流程中包含管線。 例如,若要從 Azure Data Factory 觸發管線更新:

  1. 建立數據處理站 或開啟現有的數據處理站。

  2. 建立完成時,開啟您的資料工廠頁面,然後按一下 [開啟 Azure Data Factory Studio] 磚片。 Azure Data Factory 使用者介面隨即出現。

  3. 從 Azure Data Factory Studio 使用者介面的 [新增] 下拉功能表中選取 [Pipeline],以建立新的 Azure Data Factory 管線。

  4. 在 [活動] 工具箱中,展開 [一般],然後將 [Web] 活動拖曳至管線畫布。 點選單擊 [設定] 索引標籤,然後輸入下列值:

    注意

    作為安全性最佳做法,當您使用自動化工具、系統、腳本和應用程式進行驗證時,Databricks 建議您使用屬於 服務主體的個人存取令牌, 而不是工作區使用者。 若要建立服務主體的權杖,請參閱 管理服務主體的權杖

    • URLhttps://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates

      取代 <get-workspace-instance>

      <pipeline-id> 取代為管線標識碼。

    • 方法:從下拉功能表中選取 POST

    • 標頭:按一下 + 新增。 在 [名稱] 文字框中,輸入 Authorization。 在 [] 文字框中,輸入 Bearer <personal-access-token>

      <personal-access-token> 取代為 Azure Databricks 個人存取權杖

    • 正文:若要傳遞其他要求參數,請輸入包含參數的 JSON 檔案。 例如,若要啟動更新程序並重新處理管線的所有資料:{"full_refresh": "true"}。 如果沒有額外的要求參數,請輸入空白大括弧 ({})。

若要測試 Web 活動,請在 Data Factory UI 中的管線工具列上按一下 [偵錯]。 執行的輸出和狀態,包括錯誤,會顯示在 Azure Data Factory 管線 [輸出] 標籤中。 使用 DLT UI 來檢視管線更新的詳細數據。

提示

常見的工作流程需求是在上一個工作完成之後啟動工作。 因為 DLT updates 要求是非同步的,要求在開始更新後就會返回,但更新尚未完成,所以您 Azure Data Factory 管線中依賴 DLT 更新的工作必須等待更新完成。 等候更新完成的其中一個選項是在觸發 DLT 更新的 Web 活動之後,新增 Until 活動。 在「直到」活動中:

  1. 新增 等候活動 等候設定的秒數,以完成更新。
  2. 在 Wait 活動之後,新增一個 Web 活動,使用 DLT 更新詳細數據要求來取得更新狀態。 回應中的 state 欄位會傳回更新的目前狀態,包括是否已完成。
  3. 使用 [state] 字段的值來設定 Until 活動的終止條件。 您也可以使用 設定變數活動,根據 state 值新增管線變數,並將此變數用於終止條件。