Поделиться через


Запуск конвейера DLT в рабочем процессе

Конвейер DLT можно запустить в рамках рабочего процесса обработки данных с заданиями Databricks, Apache Airflow или Фабрикой данных Azure.

Вакансии

Вы можете оркестрировать несколько задач в задании Databricks для реализации рабочего процесса обработки данных. Чтобы включить конвейер DLT в задание, используйте задачу Pipeline при создании задания. См. задачу конвейера DLT для заданий.

Apache Airflow

Apache Airflow — это решение с открытым исходным кодом для управления рабочими процессами данных и планирования. Airflow представляет рабочие процессы в виде ациклических графов (DAG) операций. Вы определяете рабочий процесс в файле Python, а Airflow управляет планированием и выполнением. Для получения информации об установке и использовании Airflow с Azure Databricks см. раздел Orchestrate Azure Databricks jobs with Apache Airflow.

Чтобы запустить конвейер DLT в рамках рабочего процесса Airflow, используйте DatabricksSubmitRunOperator.

Требования

Для использования поддержки Airflow для DLT требуется следующее:

  • Airflow версии 2.1.0 или более поздней.
  • Версия 2.1.0 или более поздняя пакета поставщика Databricks .

Пример

В следующем примере создается DAG Airflow, который активирует обновление конвейера DLT с идентификатором 8279d543-063c-4d63-9926-dae38e35ce8b.

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Замените CONNECTION_ID на идентификатор подключения Airflow к вашей рабочей области.

Сохраните этот пример в каталоге airflow/dags и используйте пользовательский интерфейс Airflow для представления и активации DAG. Используйте пользовательский интерфейс DLT для просмотра сведений об обновлении конвейера.

Фабрика данных Azure

Заметка

DLT и Фабрика данных Azure включают параметры настройки количества повторных попыток при возникновении сбоя. Если значения повторных попыток настроены в вашем конвейере DLT и, а также в действии фабрики данных Azure, которое вызывает конвейер, то число повторных попыток определяется как значение повторных попыток в Фабрике данных Azure, умноженное на значение повторных попыток в DLT.

Например, если обновление конвейера завершается ошибкой, DLT пытается выполнить обновление повторно до пяти раз по умолчанию. Если для фабрики данных Azure задано значение три, а конвейер DLT использует по умолчанию пять повторных попыток, ваш конвейер DLT может быть повторно запущен до пятнадцати раз. Чтобы избежать чрезмерной попытки повторных попыток при сбое обновлений конвейера, Databricks рекомендует ограничить количество повторных попыток при настройке конвейера DLT или действия Фабрики данных Azure, вызывающего конвейер.

Чтобы изменить конфигурацию повторных попыток для конвейера DLT, используйте параметр pipelines.numUpdateRetryAttempts при настройке конвейера.

Фабрика данных Azure — это облачная служба ETL, которая позволяет управлять рабочими процессами интеграции и преобразования данных. Фабрика данных Azure напрямую поддерживает выполнение задач Azure Databricks в рабочем процессе, включая записные книжки, задачи JAR и скрипты Python. Вы также можете включить конвейер в рабочий процесс, вызвав API DLT из веб-действий фабрики данных Azure. Например, чтобы запустить обновление конвейера из Azure Data Factory:

  1. создание фабрики данных или открытие существующей фабрики данных.

  2. По завершении создания откройте страницу фабрики данных и щелкните плитку Open Azure Data Factory Studio. Появится пользовательский интерфейс Фабрики данных Azure.

  3. Создайте новый конвейер Azure Data Factory, выбрав Pipeline в раскрывающемся меню New в интерфейсе пользователя Azure Data Factory Studio.

  4. На панели действий разверните Общие и перетащите действие Веб на холст конвейера. Перейдите на вкладку "Параметры" и введите следующие значения:

    Заметка

    В качестве рекомендации по обеспечению безопасности при проверке подлинности с помощью автоматизированных средств, систем, сценариев и приложений Databricks рекомендуется использовать личные маркеры доступа, принадлежащие субъекты-службы, вместо пользователей рабочей области. Сведения о создании токенов для служебных принципалов см. в разделе Управление токенами для служебного принципала.

    • URL-адрес: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Замените <get-workspace-instance>.

      Замените <pipeline-id> идентификатором конвейера.

    • Метод : выберите POST в раскрывающемся меню.

    • Заголовки: нажмите + Создать. В текстовом поле "Имя" введите Authorization. В текстовом поле значение введите Bearer <personal-access-token>.

      Замените <personal-access-token> личным токеном доступа Azure Databricks .

    • Тело: Чтобы передать дополнительные параметры запроса, введите JSON-документ, содержащий параметры. Например, чтобы запустить обновление и повторно обработать все данные для конвейера: {"full_refresh": "true"}. Если нет дополнительных параметров запроса, введите пустые фигурные скобки ({}).

Чтобы проверить веб-активность, щелкните Отладка на панели инструментов конвейера в пользовательском интерфейсе Data Factory. Выходные данные и состояние выполнения, включая ошибки, отображаются на вкладке выходных данных конвейера Фабрики данных Azure. Используйте пользовательский интерфейс DLT для просмотра сведений об обновлении конвейера.

Совет

Обычное требование рабочего процесса — запустить задачу после завершения предыдущей задачи. Так как запрос DLT updates является асинхронным — запрос возвращается после запуска обновления, но до завершения обновления — задачи в конвейере фабрики данных Azure с зависимостью от обновления DLT должны ждать завершения обновления. Вариантом ожидания завершения обновления является добавление действия Until после веб-действия, которое запускает обновление DLT. В действии "До":

  1. Добавьте активность ожидания, чтобы дождаться завершения обновления в течение настроенного количества секунд.
  2. Добавьте веб-действие после действия ожидания, которое использует запрос сведений об обновлении DLT для получения состояния обновления. Поле state в ответе возвращает текущее состояние обновления, в том числе, если оно завершено.
  3. Используйте значение поля state, чтобы установить условие завершения для действия "Until". Можно также использовать действие установки переменной для добавления переменной конвейера на основе значения state и использования этой переменной как условия завершения.