다음을 통해 공유


워크플로에서 DLT 파이프라인 실행

Databricks 작업, Apache Airflow 또는 Azure Data Factory를 사용하여 데이터 처리 워크플로의 일부로 DLT 파이프라인을 실행할 수 있습니다.

작업

Databricks 작업에서 여러 작업을 오케스트레이션하여 데이터 처리 워크플로를 구현할 수 있습니다. 작업에 DLT 파이프라인을 포함하려면 작업을 만들 때 Pipeline 작업을 사용합니다. DLT 파이프라인 작업을 참조하여 작업을 확인하세요.

Apache Airflow

Apache Airflow 데이터 워크플로를 관리하고 예약하기 위한 오픈 소스 솔루션입니다. Airflow는 워크플로를 작업의 방향성 비순환 그래프(DAG)로 나타냅니다. Python 파일에서 워크플로를 정의하고 Airflow는 예약 및 실행을 관리합니다. Azure Databricks에서 Airflow를 설치하고 사용하는 방법에 대한 자세한 내용은 Apache Airflow 사용하여 Azure Databricks 작업 오케스트레이션참조하세요.

Airflow 워크플로의 일부로 DLT 파이프라인을 실행하려면 DatabricksSubmitRunOperator사용합니다.

요구 사항

DLT에 대한 Airflow 지원을 사용하려면 다음이 필요합니다.

  • Airflow 버전 2.1.0 이상.
  • Databricks 공급자 패키지는 버전 2.1.0 이상이어야 합니다.

본보기

다음 예제에서는 식별자 8279d543-063c-4d63-9926-dae38e35ce8b인 DLT 파이프라인에 대한 업데이트를 트리거하는 Airflow DAG(데이터 워크플로우 다이어그램)를 만듭니다.

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_ID을 작업 영역에 대한 Airflow 연결 식별자로 바꾸십시오.

이 예제를 airflow/dags 디렉터리에 저장하고 Airflow UI를 사용하여 DAG를 보고 트리거하는. DLT UI를 사용하여 파이프라인 업데이트의 세부 정보를 볼 수 있습니다.

Azure Data Factory

메모

DLT 및 Azure Data Factory에는 각각 오류가 발생할 때 재시도 횟수를 구성하는 옵션이 포함됩니다. DLT 파이프라인 및 해당 파이프라인을 호출하는 Azure Data Factory 작업에 재시도 값이 구성된 경우, 재시도 횟수는 Azure Data Factory의 재시도 값에 DLT의 재시도 값을 곱한 값입니다.

예를 들어 파이프라인 업데이트가 실패하면 DLT는 기본적으로 업데이트를 최대 5번 다시 시도합니다. Azure Data Factory 재시도를 3으로 설정하고 DLT 파이프라인이 기본값인 5회 재시도를 사용하는 경우 실패한 DLT 파이프라인은 최대 15번 다시 시도될 수 있습니다. 파이프라인 업데이트가 실패할 때 과도한 재시도 시도를 방지하기 위해 Databricks는 DLT 파이프라인 또는 파이프라인을 호출하는 Azure Data Factory 작업을 구성할 때 재시도 횟수를 제한하는 것이 좋습니다.

DLT 파이프라인에 대한 재시도 구성을 변경하려면 파이프라인을 구성할 때 pipelines.numUpdateRetryAttempts 설정을 사용합니다.

Azure Data Factory는 데이터 통합 및 변환 워크플로를 오케스트레이션할 수 있는 클라우드 기반 ETL 서비스입니다. Azure Data Factory는 Notebook, JAR 작업 및 Python 스크립트를 포함하여 워크플로에서 Azure Databricks 작업 실행을 직접 지원합니다. Azure Data Factory 웹 작업에서 DLT API 를 호출하여 워크플로에 파이프라인을 포함시킬 수 있습니다. 예를 들어 Azure Data Factory에서 파이프라인 업데이트를 트리거하려면 다음을 수행합니다.

  1. 데이터 팩터리 만들거나 기존 데이터 팩터리를 엽니다.

  2. 만들기가 완료되면 데이터 팩토리 페이지를 열고 Azure Data Factory Studio 타일을 클릭하여 엽니다. Azure Data Factory 사용자 인터페이스가 나타납니다.

  3. Azure Data Factory Studio 사용자 인터페이스의 드롭다운 메뉴에서 파이프라인 선택하여 새 Azure Data Factory 파이프라인을 만듭니다.

  4. 액티비티 툴박스에서 일반을 확장한 후 액티비티를 파이프라인 캔버스로 드래그합니다. 설정 탭을 클릭하고 다음 값을 입력합니다.

    메모

    보안 모범 사례로, 자동화된 도구, 시스템, 스크립트 및 앱을 사용하여 인증하는 경우 Databricks는 작업 영역 사용자 대신 서비스 주체 속한 개인용 액세스 토큰을 사용하는 것이 좋습니다. 서비스 주체에 대한 토큰을 만들려면 서비스 주체 대한토큰 관리를 참조하세요.

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      <get-workspace-instance>대체합니다.

      <pipeline-id> 파이프라인 식별자로 바꿉다.

    • 메서드: 드롭다운 메뉴에서 POST 선택합니다.

    • 헤더: + 새클릭합니다. 이름 텍스트 상자에 Authorization입력합니다. 텍스트 상자에 Bearer <personal-access-token>입력합니다.

      <personal-access-token>을 Azure Databricks 개인용 액세스 토큰로 대체하십시오.

    • 본문: 추가 요청 매개 변수를 전달하려면 매개 변수가 포함된 JSON 문서를 입력합니다. 예를 들어 업데이트를 시작하고 파이프라인에 대한 모든 데이터를 다시 처리하려면 {"full_refresh": "true"}. 추가 요청 매개 변수가 없는 경우 빈 중괄호({})를 입력합니다.

웹 작업을 테스트하려면 Data Factory UI의 파이프라인 도구 모음에서 디버그 클릭합니다. 오류를 포함한 실행의 출력 및 상태는 Azure Data Factory 파이프라인의 출력 탭에 표시됩니다. DLT UI를 사용하여 파이프라인 업데이트의 세부 정보를 볼 수 있습니다.

일반적인 워크플로 요구 사항은 이전 작업을 완료한 후 작업을 시작하는 것입니다. DLT updates 요청은 비동기이므로 업데이트를 시작한 후 업데이트가 완료되기 전에 요청이 반환되므로 DLT 업데이트에 대한 종속성이 있는 Azure Data Factory 파이프라인의 작업은 업데이트가 완료되기를 기다려야 합니다. 업데이트 완료를 기다리는 옵션은 DLT 업데이트를 트리거하는 웹 작업 다음에 Until 작업 추가하는 것입니다. Until 작업에서:

  1. 대기 작업 추가하여 업데이트 완료를 위해 구성된 시간(초)을 기다립니다.
  2. DLT 업데이트 세부 정보 요청을 사용하여 업데이트 상태를 가져오는 대기 작업 다음에 웹 활동을 추가합니다. 응답의 state 필드는 완료된 경우를 포함하여 업데이트의 현재 상태를 반환합니다.
  3. state 필드의 값을 사용하여 Until 작업에 대한 종료 조건을 설정합니다. 변수 설정 작업 사용하여 state 값을 기반으로 파이프라인 변수를 추가하고 종료 조건에 이 변수를 사용할 수도 있습니다.