Criar um pipeline do Unity Catalog através do clonamento de um pipeline de metastore do Hive
Este artigo descreve a solicitação de clone a pipeline
na API REST do Databricks e como você pode usá-la para copiar um pipeline existente que publica no metastore do Hive para um novo pipeline que publica no Unity Catalog. Quando acionas o pedido clone a pipeline
, ele:
- Copia o código-fonte e a configuração do pipeline existente para um novo, aplicando quaisquer substituições de configuração especificadas.
- Atualiza as definições e referências da tabela de exibição materializada e Streaming com as alterações necessárias para que esses objetos sejam gerenciados pelo Unity Catalog.
- Inicia uma atualização do pipeline para migrar os dados e metadados existentes, como pontos de controlo, para quaisquer tabelas de transmissão no pipeline. Isso permite que essas tabelas de Streaming retomem o processamento no mesmo ponto do pipeline original.
Após a conclusão da operação de clonagem, as tubagens original e nova podem ser executadas de forma independente.
Este artigo inclui exemplos de como chamar a solicitação de API diretamente e por meio de um script Python de um bloco de anotações Databricks.
Antes de começar
Os seguintes requisitos devem ser cumpridos antes de clonar um pipeline:
Para clonar um pipeline de metastore do Hive, as tabelas e exibições definidas no pipeline devem publicar tabelas em um esquema de destino. Para saber como adicionar um esquema de destino a uma linha de processamento, consulte Configurar uma linha de processamento para publicar no metastore do Hive.
As referências a tabelas ou exibições gerenciadas pelo metastore do Hive no pipeline para clonagem devem ser totalmente qualificadas com o catálogo (
hive_metastore
), esquema e nome da tabela. Por exemplo, no código a seguir criando um conjunto de dadoscustomers
, o argumento nome da tabela deve ser atualizado parahive_metastore.sales.customers
:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
Não edite o código-fonte do pipeline de metastore do Hive de origem enquanto uma operação de clone estiver em andamento, incluindo os notebooks configurados como parte do pipeline e quaisquer módulos armazenados em pastas Git ou arquivos de ambiente de trabalho.
O pipeline de metastore do Hive de origem não deve estar em execução quando se inicia a operação de clonagem. Se uma atualização estiver em execução, interrompa-a ou aguarde até que seja concluída.
A seguir estão outras considerações importantes antes de clonar um pipeline:
- Se as tabelas no pipeline de metastore do Hive especificarem um local de armazenamento usando o argumento
path
em Python ouLOCATION
em SQL, passe a configuração"pipelines.migration.ignoreExplicitPath": "true"
para a solicitação de clone. A definição desta configuração está incluída nas instruções abaixo. - Se o pipeline de metastore do Hive incluir uma fonte de carregador automático que especifique um valor para a opção
cloudFiles.schemaLocation
e o pipeline de metastore do Hive permanecerá operacional após a criação do clone do Unity Catalog, você deverá definir a opçãomergeSchema
paratrue
no pipeline de metastore do Hive e no pipeline clonado do Unity Catalog. Adicionar essa opção ao pipeline de metastore do Hive antes da clonagem copiará a opção para o novo pipeline.
Clonar um pipeline utilizando a API REST do Databricks
O exemplo a seguir usa o comando curl
para chamar a solicitação clone a pipeline
na API REST do Databricks:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Substituir:
-
<personal-access-token>
com um token de acesso pessoal Databricks . -
<databricks-instance>
com o nome da instância do espaço de trabalho do Azure Databricks , por exemplo,adb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
com o identificador exclusivo do pipeline de metastore do Hive para clonar. Você pode encontrar o ID do pipeline na interface do usuário DLT .
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Substituir:
-
<target-catalog-name>
com o nome de um catálogo no Unity Catalog no qual o novo pipeline deve publicar. Este deve ser um catálogo existente. -
<target-schema-name>
com o nome de um esquema no Unity Catalog no qual o novo pipeline deve publicar, caso seja diferente do nome do esquema atual. Esse parâmetro é opcional e, se não for especificado, o nome do esquema existente será usado. -
<new-pipeline-name>
com um nome opcional para o novo fluxo de dados. Se não estiver especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com[UC]
anexado.
clone_mode
especifica o modo a ser usado para a operação de clone.
MIGRATE_TO_UC
é a única opção suportada.
Use o campo configuration
para especificar configurações no novo pipeline. Os valores definidos aqui substituem as configurações no pipeline original.
A resposta da solicitação de API REST do clone
é a ID do novo pipeline do Catálogo Unity.
Clone um pipeline do notebook Databricks
O exemplo a seguir invoca o pedido create a pipeline
de um script Python. Você pode usar um bloco de anotações Databricks para executar este script:
- Crie um novo bloco de anotações para o script. Consulte Criar um bloco de notas.
- Copie o seguinte script Python para a primeira célula do bloco de anotações.
- Atualize os valores dos espaços reservados no script, substituindo-os por:
-
<databricks-instance>
com o nome da instância do espaço de trabalho do Azure Databricks , por exemploadb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
com o identificador exclusivo para clonar o pipeline do metastore do Hive. Você pode encontrar o ID do pipeline na interface de utilizador DLT . -
<target-catalog-name>
com o nome de um catálogo no Unity Catalog no qual o novo pipeline deve publicar. Este deve ser um catálogo existente. -
<target-schema-name>
com o nome de um esquema no Unity Catalog no qual o novo pipeline deve publicar, caso seja diferente do nome do esquema atual. Esse parâmetro é opcional e, se não for especificado, o nome do esquema existente será usado. -
<new-pipeline-name>
com um nome opcional para o novo pipeline. Se não estiver especificado, o novo pipeline será nomeado com o nome do pipeline de origem seguido de[UC]
.
-
- Corra o script. Consulte Executar blocos de anotações Databricks.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Limitações
A seguir estão as limitações da solicitação de API do DLT clone a pipeline
:
- Somente a clonagem de um pipeline configurado para usar o metastore do Hive para um pipeline do Unity Catalog é suportada.
- Você pode criar um clone somente no mesmo espaço de trabalho do Azure Databricks que o pipeline do qual está clonando.
- O pipeline que você está clonando pode incluir apenas as seguintes fontes de streaming:
- Fontes Delta
- Auto Loader, incluindo quaisquer fontes de dados suportadas pelo Auto Loader. Consulte Carregar ficheiros do armazenamento de objetosna nuvem.
- Apache Kafka com Streaming Estruturado. No entanto, a fonte Kafka não pode ser configurada para usar a opção
kafka.group.id
. Consulte Processamento de fluxo com o Apache Kafka e o Azure Databricks. - Amazon Kinesis com streaming estruturado. No entanto, a origem do Kinesis não pode ser configurada para definir
consumerMode
comoefo
.
- Se o pipeline de metastore do Hive que você está clonando usar o modo de notificação de arquivo do Auto Loader, o Databricks recomenda não executar o pipeline de metastore do Hive após a clonagem. Isso deve-se ao facto de a execução do pipeline de metastore do Hive resultar na eliminação de alguns eventos de notificação de ficheiros do clone do Unity Catalog. Se o pipeline de metastore do Hive de origem for executado após a conclusão da operação de clonagem, você poderá preencher os arquivos ausentes usando o Auto Loader com a opção
cloudFiles.backfillInterval
. Para saber mais sobre o modo de notificação de arquivo do Auto Loader, consulte O que é o modo de notificação de arquivo do Auto Loader?. Para saber mais sobre o preenchimento de arquivos com o Auto Loader, consulte Acionar backfills regulares usando cloudFiles.backfillInterval e opções comuns do Auto Loader. - As tarefas de manutenção das pipelines são pausadas automaticamente para ambas enquanto a clonagem está em progresso.
- O seguinte aplica-se a consultas relacionadas a viagem no tempo nas tabelas no pipeline clonado do Catálogo Unity.
- Se uma versão de tabela foi originalmente gravada em um objeto gerenciado pelo metastore do Hive, as consultas de viagem no tempo usando uma cláusula
timestamp_expression
serão indefinidas ao consultar o objeto Unity Catalog clonado. - No entanto, se a versão da tabela foi gravada no objeto Unity Catalog clonado, as consultas de viagem no tempo usando uma cláusula
timestamp_expression
funcionam corretamente. - Consultas de viagem no tempo usando uma cláusula
version
funcionam corretamente ao consultar um objeto clonado do Unity Catalog, mesmo quando a versão foi originalmente escrita no objeto gerido pelo metastore do Hive.
- Se uma versão de tabela foi originalmente gravada em um objeto gerenciado pelo metastore do Hive, as consultas de viagem no tempo usando uma cláusula
- Para obter outras limitações ao usar DLT com o Unity Catalog, consulte Unity Catalog pipeline limitations.
- Para conhecer as limitações do Catálogo Unity, consulte Limitações do Catálogo Unity.