Поделиться через


Создание конвейера каталога Unity путем клонирования конвейера хранилища метаданных Hive

В этой статье описывается запрос clone a pipeline в REST API Databricks и его использование для копирования существующего конвейера, который публикуется в хранилище метаданных Hive в новый конвейер, который публикуется в каталоге Unity. При вызове запроса clone a pipeline он:

  • Копирует исходный код и конфигурацию из существующего конвейера в новый, применяя все указанные вами переопределения конфигурации.
  • Обновляет определения материализованных представлений и потоковых таблиц, внося необходимые изменения, чтобы эти объекты управлись каталогом Unity.
  • Запускает обновление конвейера для переноса существующих данных и метаданных, таких как контрольные точки для всех таблиц потоковой передачи в конвейере. Это позволяет этим таблицам потоковой передачи возобновлять обработку в той же точке, что и исходный конвейер.

После завершения операции клонирования исходные и новые конвейеры могут выполняться независимо.

В этой статье приведены примеры вызова API напрямую и через скрипт на Python из записной книжки Databricks.

Перед началом работы

Перед клонированием конвейера необходимо выполнить следующие действия.

  • Чтобы клонировать конвейер хранилища метаданных Hive, таблицы и представления, определенные в конвейере, должны публиковать таблицы в целевой схеме. Сведения о добавлении целевой схемы в конвейер см. в статье Настройка конвейера для публикации в хранилище метаданных Hive.

  • Ссылки на управляемые таблицы или представления Hive в потоке обработки для клонирования должны быть полностью квалифицированы с указанием каталога (hive_metastore), схемы и имени таблицы. Например, в следующем коде, создав набор данных customers, аргумент имени таблицы необходимо обновить до hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Не изменяйте исходный код для исходного конвейера хранилища метаданных Hive во время выполнения операции клонирования, включая записные книжки, настроенные как часть конвейера, и все модули, хранящиеся в папках Git или файлах рабочей области.

  • Исходный конвейер хранилища метаданных Hive не может работать при запуске операции клонирования. Если обновление выполняется, остановите его или дождитесь завершения.

Ниже приведены другие важные аспекты перед клонированием конвейера.

  • Если таблицы в конвейере хранилища метаданных Hive указывают расположение хранилища с помощью аргумента path в Python или LOCATION в SQL, передайте конфигурацию "pipelines.migration.ignoreExplicitPath": "true" в запрос клонирования. Настройка этой конфигурации включена в приведенные ниже инструкции.
  • Если конвейер хранилища метаданных Hive включает источник автозагрузчика, указывающий значение параметра cloudFiles.schemaLocation, а конвейер хранилища метаданных Hive останется в рабочем состоянии после создания клона каталога Unity, необходимо задать параметр mergeSchema для true как в конвейере хранилища метаданных Hive, так и в клонируемом конвейере каталога Unity. Добавление этого параметра в конвейер хранилища метаданных Hive перед клонированием скопирует параметр в новый конвейер.

Клонирование конвейера с помощью REST API Databricks

В следующем примере команда curl используется для вызова запроса clone a pipeline в REST API Databricks:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Заменить:

  • <personal-access-token> с личным токеном доступа Databricks .
  • <databricks-instance> с именем экземпляра рабочей области Azure Databricks, например adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> с уникальным идентификатором конвейера хранилища метаданных Hive для клонирования. Идентификатор потока можно найти в пользовательском интерфейсе DLT .

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Заменять:

  • <target-catalog-name> с именем каталога в каталоге Unity, в котором должен публиковаться новый конвейер. Это должен быть существующий каталог.
  • <target-schema-name> с именем схемы в каталоге Unity, в которой должен публиковаться новый конвейер, если он отличается от имени текущей схемы. Этот параметр является необязательным и, если он не указан, используется существующее имя схемы.
  • <new-pipeline-name> с необязательным именем для нового конвейера. Если он не указан, новый конвейер называется с помощью имени исходного конвейера с добавленным [UC].

clone_mode указывает режим, используемый для операции клонирования. MIGRATE_TO_UC является единственным поддерживаемым вариантом.

Используйте поле configuration для указания конфигураций в новом конвейере. Значения, заданные здесь, переопределяют конфигурации в исходном конвейере.

Ответ запроса REST API clone — это идентификатор нового конвейера каталога Unity.

Клонирование конвейера из записной книжки Databricks

В следующем примере вызывается запрос create a pipeline из скрипта Python. Для выполнения этого сценария можно использовать записную книжку Databricks:

  1. Создайте записную книжку для скрипта. См. Создание записной книжки.
  2. Скопируйте следующий скрипт Python в первую ячейку записной книжки.
  3. Обновите значения заполнителей в скрипте, заменив:
    • <databricks-instance> c именем экземпляра рабочей области Azure Databricks, например adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> с уникальным идентификатором потока обработки метаданных Hive для клонирования. Идентификатор конвейера можно найти в DLT UI.
    • <target-catalog-name> с именем каталога в каталоге Unity, в котором должен публиковаться новый конвейер. Это должен быть существующий каталог.
    • <target-schema-name> с именем схемы в каталоге Unity, в которую должен размещаться новый конвейер, если она отличается от имени текущей схемы. Этот параметр является необязательным и, если он не указан, используется существующее имя схемы.
    • <new-pipeline-name> с необязательным именем нового конвейера. Если он не указан, новый конвейер называется с помощью имени исходного конвейера с добавленным [UC].
  4. Запустите скрипт. См. статью Запуск записных книжек Databricks.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Ограничения

Ниже приведены ограничения запроса API DLT clone a pipeline.

  • Поддерживается только клонирование из конвейера, настроенного для использования хранилища метаданных Hive, в конвейер, использующий каталог Unity.
  • Клонировать можно только в той же рабочей области Azure Databricks, из которой вы клонируете конвейер.
  • Конвейер, который вы клонируете, может включать только следующие источники потоковой передачи:
  • Если конвейер хранилища метаданных Hive, который вы клонируете, использует режим уведомления файлов Auto Loader, Databricks рекомендует воздержаться от его запуска после клонирования. Это связано с тем, что запуск конвейера метахранилища Hive приводит к удалению некоторых событий уведомлений о файлах из клона Unity Catalog. Если исходный конвейер хранилища метаданных Hive выполняется после завершения операции клонирования, можно заполнить отсутствующие файлы, используя Автозагрузчик с параметром cloudFiles.backfillInterval. Дополнительные сведения о режиме уведомлений о файлах автозагрузчика см. в разделе Что такое режим уведомлений о файлах автозагрузчика?. Дополнительные сведения о резервном заполнении файлов с помощью автозагрузчика см. в статье Активация регулярных загрузок с помощью cloudFiles.backfillInterval и распространенных параметров автозагрузчика.
  • Задачи по техническому обслуживанию автоматически приостановлены для обоих трубопроводов во время клонирования.
  • Следующие правила применяются к запросам с использованием временных данных в таблицах клонированного конвейера Unity Catalog:
    • Если версия таблицы была изначально записана в управляемый объект хранилища метаданных Hive, запросы по пути времени с помощью предложения timestamp_expression не определены при запросе клонированного объекта каталога Unity.
    • Однако если версия таблицы была записана в клонированный объект каталога Unity, запросы с использованием функции путешествия во времени с помощью предложения timestamp_expression работают правильно.
    • Запросы на перемещение по времени с помощью предложения version работают правильно при запросе клонированного объекта каталога Unity, даже если версия была первоначально записана в управляемый объект хранилища метаданных Hive.
  • Другие ограничения при использовании DLT с каталогом Unity см. в разделе ограничения конвейера каталога Unity.
  • Ограничения каталога Unity см. в разделе ограничениях каталога Unity.