Создание конвейера каталога Unity путем клонирования конвейера хранилища метаданных Hive
В этой статье описывается запрос clone a pipeline
в REST API Databricks и его использование для копирования существующего конвейера, который публикуется в хранилище метаданных Hive в новый конвейер, который публикуется в каталоге Unity. При вызове запроса clone a pipeline
он:
- Копирует исходный код и конфигурацию из существующего конвейера в новый, применяя все указанные вами переопределения конфигурации.
- Обновляет определения материализованных представлений и потоковых таблиц, внося необходимые изменения, чтобы эти объекты управлись каталогом Unity.
- Запускает обновление конвейера для переноса существующих данных и метаданных, таких как контрольные точки для всех таблиц потоковой передачи в конвейере. Это позволяет этим таблицам потоковой передачи возобновлять обработку в той же точке, что и исходный конвейер.
После завершения операции клонирования исходные и новые конвейеры могут выполняться независимо.
В этой статье приведены примеры вызова API напрямую и через скрипт на Python из записной книжки Databricks.
Перед началом работы
Перед клонированием конвейера необходимо выполнить следующие действия.
Чтобы клонировать конвейер хранилища метаданных Hive, таблицы и представления, определенные в конвейере, должны публиковать таблицы в целевой схеме. Сведения о добавлении целевой схемы в конвейер см. в статье Настройка конвейера для публикации в хранилище метаданных Hive.
Ссылки на управляемые таблицы или представления Hive в потоке обработки для клонирования должны быть полностью квалифицированы с указанием каталога (
hive_metastore
), схемы и имени таблицы. Например, в следующем коде, создав набор данныхcustomers
, аргумент имени таблицы необходимо обновить доhive_metastore.sales.customers
:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
Не изменяйте исходный код для исходного конвейера хранилища метаданных Hive во время выполнения операции клонирования, включая записные книжки, настроенные как часть конвейера, и все модули, хранящиеся в папках Git или файлах рабочей области.
Исходный конвейер хранилища метаданных Hive не может работать при запуске операции клонирования. Если обновление выполняется, остановите его или дождитесь завершения.
Ниже приведены другие важные аспекты перед клонированием конвейера.
- Если таблицы в конвейере хранилища метаданных Hive указывают расположение хранилища с помощью аргумента
path
в Python илиLOCATION
в SQL, передайте конфигурацию"pipelines.migration.ignoreExplicitPath": "true"
в запрос клонирования. Настройка этой конфигурации включена в приведенные ниже инструкции. - Если конвейер хранилища метаданных Hive включает источник автозагрузчика, указывающий значение параметра
cloudFiles.schemaLocation
, а конвейер хранилища метаданных Hive останется в рабочем состоянии после создания клона каталога Unity, необходимо задать параметрmergeSchema
дляtrue
как в конвейере хранилища метаданных Hive, так и в клонируемом конвейере каталога Unity. Добавление этого параметра в конвейер хранилища метаданных Hive перед клонированием скопирует параметр в новый конвейер.
Клонирование конвейера с помощью REST API Databricks
В следующем примере команда curl
используется для вызова запроса clone a pipeline
в REST API Databricks:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Заменить:
-
<personal-access-token>
с личным токеном доступа Databricks . -
<databricks-instance>
с именем экземпляра рабочей области Azure Databricks, напримерadb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
с уникальным идентификатором конвейера хранилища метаданных Hive для клонирования. Идентификатор потока можно найти в пользовательском интерфейсе DLT .
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Заменять:
-
<target-catalog-name>
с именем каталога в каталоге Unity, в котором должен публиковаться новый конвейер. Это должен быть существующий каталог. -
<target-schema-name>
с именем схемы в каталоге Unity, в которой должен публиковаться новый конвейер, если он отличается от имени текущей схемы. Этот параметр является необязательным и, если он не указан, используется существующее имя схемы. -
<new-pipeline-name>
с необязательным именем для нового конвейера. Если он не указан, новый конвейер называется с помощью имени исходного конвейера с добавленным[UC]
.
clone_mode
указывает режим, используемый для операции клонирования.
MIGRATE_TO_UC
является единственным поддерживаемым вариантом.
Используйте поле configuration
для указания конфигураций в новом конвейере. Значения, заданные здесь, переопределяют конфигурации в исходном конвейере.
Ответ запроса REST API clone
— это идентификатор нового конвейера каталога Unity.
Клонирование конвейера из записной книжки Databricks
В следующем примере вызывается запрос create a pipeline
из скрипта Python. Для выполнения этого сценария можно использовать записную книжку Databricks:
- Создайте записную книжку для скрипта. См. Создание записной книжки.
- Скопируйте следующий скрипт Python в первую ячейку записной книжки.
- Обновите значения заполнителей в скрипте, заменив:
-
<databricks-instance>
c именем экземпляра рабочей области Azure Databricks, напримерadb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
с уникальным идентификатором потока обработки метаданных Hive для клонирования. Идентификатор конвейера можно найти в DLT UI. -
<target-catalog-name>
с именем каталога в каталоге Unity, в котором должен публиковаться новый конвейер. Это должен быть существующий каталог. -
<target-schema-name>
с именем схемы в каталоге Unity, в которую должен размещаться новый конвейер, если она отличается от имени текущей схемы. Этот параметр является необязательным и, если он не указан, используется существующее имя схемы. -
<new-pipeline-name>
с необязательным именем нового конвейера. Если он не указан, новый конвейер называется с помощью имени исходного конвейера с добавленным[UC]
.
-
- Запустите скрипт. См. статью Запуск записных книжек Databricks.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Ограничения
Ниже приведены ограничения запроса API DLT clone a pipeline
.
- Поддерживается только клонирование из конвейера, настроенного для использования хранилища метаданных Hive, в конвейер, использующий каталог Unity.
- Клонировать можно только в той же рабочей области Azure Databricks, из которой вы клонируете конвейер.
- Конвейер, который вы клонируете, может включать только следующие источники потоковой передачи:
- Источники дельта
- Автозагрузчик, включая все источники данных, поддерживаемые автозагрузчиком. См. раздел загрузить файлы из облачного хранилища объектов.
- Apached Kafka с структурированной потоковой передачей. Однако источник Kafka не может быть настроен для использования параметра
kafka.group.id
. См. потоковую обработку с помощью Apache Kafka и Azure Databricks. - Amazon Kinesis с структурированной потоковой передачей. Однако источник Kinesis не может быть настроен для задания
consumerMode
efo
.
- Если конвейер хранилища метаданных Hive, который вы клонируете, использует режим уведомления файлов Auto Loader, Databricks рекомендует воздержаться от его запуска после клонирования. Это связано с тем, что запуск конвейера метахранилища Hive приводит к удалению некоторых событий уведомлений о файлах из клона Unity Catalog. Если исходный конвейер хранилища метаданных Hive выполняется после завершения операции клонирования, можно заполнить отсутствующие файлы, используя Автозагрузчик с параметром
cloudFiles.backfillInterval
. Дополнительные сведения о режиме уведомлений о файлах автозагрузчика см. в разделе Что такое режим уведомлений о файлах автозагрузчика?. Дополнительные сведения о резервном заполнении файлов с помощью автозагрузчика см. в статье Активация регулярных загрузок с помощью cloudFiles.backfillInterval и распространенных параметров автозагрузчика. - Задачи по техническому обслуживанию автоматически приостановлены для обоих трубопроводов во время клонирования.
- Следующие правила применяются к запросам с использованием временных данных в таблицах клонированного конвейера Unity Catalog:
- Если версия таблицы была изначально записана в управляемый объект хранилища метаданных Hive, запросы по пути времени с помощью предложения
timestamp_expression
не определены при запросе клонированного объекта каталога Unity. - Однако если версия таблицы была записана в клонированный объект каталога Unity, запросы с использованием функции путешествия во времени с помощью предложения
timestamp_expression
работают правильно. - Запросы на перемещение по времени с помощью предложения
version
работают правильно при запросе клонированного объекта каталога Unity, даже если версия была первоначально записана в управляемый объект хранилища метаданных Hive.
- Если версия таблицы была изначально записана в управляемый объект хранилища метаданных Hive, запросы по пути времени с помощью предложения
- Другие ограничения при использовании DLT с каталогом Unity см. в разделе ограничения конвейера каталога Unity.
- Ограничения каталога Unity см. в разделе ограничениях каталога Unity.