Dela via


Skapa en Unity Catalog-pipeline genom att klona en Hive-metaarkivpipeline

Den här artikeln beskriver clone a pipeline begäran i Databricks REST API och hur du kan använda den för att kopiera en befintlig pipeline som publicerar till Hive-metaarkivet till en ny pipeline som publicerar till Unity Catalog. När du anropar begäran clone a pipeline så kommer följande att ske:

  • Kopierar källkoden och konfigurationen från den befintliga pipelinen till en ny och tillämpar eventuella konfigurations åsidosättningar som du har angett.
  • Uppdaterar definitioner och referenser för materialiserad vy och strömningstabell med nödvändiga ändringar för de objekt som ska hanteras av Unity Catalog.
  • Startar en pipelineuppdatering för att migrera befintliga data och metadata, till exempel kontrollpunkter, för alla strömningstabeller i pipelinen. Detta gör att dessa strömningstabeller kan återuppta bearbetningen på samma plats som den ursprungliga pipelinen.

När kloningen har slutförts kan både de ursprungliga och nya pipelines köras oberoende av varandra.

Den här artikeln innehåller exempel på hur du anropar API-begäran direkt och via ett Python-skript från en Databricks-notebook-fil.

Innan du börjar

Följande krävs innan du klonar en pipeline:

  • Om du vill klona en Hive metastore-pipeline måste tabellerna och vyerna som definierats i pipelinen publicera tabeller i ett målschema. Information om hur du lägger till ett målschema i en pipeline finns i Konfigurera en pipeline för publicering i Hive-metaarkivet.

  • Referenser till hanterade Hive-metaarkivtabeller eller vyer i pipelinen för att klona måste vara fullständigt kvalificerade med katalogen (hive_metastore), schemat och tabellnamnet. I följande kod som till exempel skapar en customers datauppsättning måste argumentet tabellnamn uppdateras till hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Redigera inte källkoden för hive-metaarkivets källpipeline medan en kloning pågår, inklusive notebook-filer som konfigurerats som en del av pipelinen och moduler som lagras i Git-mappar eller arbetsytefiler.

  • Hive-metaarkivets källpipeline får inte vara igång när du startar kloningsprocessen. Om en uppdatering körs stoppar du den eller väntar tills den har slutförts.

Följande är andra viktiga överväganden innan du klonar en pipeline:

  • Om tabeller i Hive-metaarkivpipelinen anger en lagringsplats med argumentet path i Python eller LOCATION i SQL skickar du "pipelines.migration.ignoreExplicitPath": "true" konfigurationen till klonbegäran. Inställningen av den här konfigurationen ingår i anvisningarna nedan.
  • Om Hive-metaarkivpipelinen innehåller en autoinläsningskälla som anger ett värde för alternativet cloudFiles.schemaLocation och Hive-metaarkivets pipeline fortsätter att fungera när du har skapat Unity Catalog-klonen måste du ange alternativet mergeSchema till true i både Hive-metaarkivpipelinen och den klonade Unity Catalog-pipelinen. Om du lägger till det här alternativet i Hive-metaarkivpipelinen innan kloning kopieras alternativet till den nya pipelinen.

Klona en pipeline genom att använda Databricks REST API

I följande exempel används kommandot curl för att anropa clone a pipeline-begäran i Databricks REST API:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Ersätta:

  • <personal-access-token> med en Personlig åtkomsttoken för Databricks .
  • <databricks-instance> med instansnamnet för Azure Databricks -arbetsytan, till exempel adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> med den unika identifieraren för Hive metastore-pipelinen som ska klonas. Du hittar pipeline-ID:t i DLT-användargränssnittet.

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Ersätta:

  • <target-catalog-name> med namnet på en katalog i Unity Catalog som den nya pipelinen ska publicera till. Detta måste vara en befintlig katalog.
  • <target-schema-name> med namnet på ett schema i Unity Catalog som den nya pipelinen ska publicera om den skiljer sig från det aktuella schemanamnet. Den här parametern är valfri och om den inte anges används det befintliga schemanamnet.
  • <new-pipeline-name> med ett valfritt namn för den nya pipelinen. Om den inte anges namnges den nya pipelinen med namnet på källpipelinen med [UC] bifogad.

clone_mode anger läget som ska användas för kloningsåtgärden. MIGRATE_TO_UC är det enda alternativet som stöds.

Använd fältet configuration för att ange konfigurationer för den nya pipelinen. Värdena som anges här åsidosätter konfigurationer i den ursprungliga pipelinen.

Svaret från clone REST API-begäran är pipeline-ID:t för den nya Unity Catalog-pipelinen.

Klona en pipeline från en Databricks-notebook-fil

Följande exempel anropar create a pipeline-anropet från ett Python-skript. Du kan använda en Databricks-notebook-fil för att köra det här skriptet:

  1. Skapa en ny notebook-fil för skriptet. Se Skapa en anteckningsbok.
  2. Kopiera följande Python-skript till den första cellen i notebook-filen.
  3. Uppdatera platshållarvärdena i skriptet genom att ersätta:
    • <databricks-instance> med instansnamnet för Azure Databricks -arbetsytan, till exempel adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> med den unika identifieraren för Hive-metaarkivpipelinen som ska klonas. Du hittar pipeline-ID:t i DLT-användargränssnittet.
    • <target-catalog-name> med namnet på en katalog i Unity Catalog som den nya pipelinen ska publicera till. Detta måste vara en befintlig katalog.
    • <target-schema-name> med namnet på ett schema i Unity Catalog som den nya pipelinen ska publicera om den skiljer sig från det aktuella schemanamnet. Den här parametern är valfri och om den inte anges används det befintliga schemanamnet.
    • <new-pipeline-name> med ett valfritt namn för den nya pipelinen. Om den inte anges namnges den nya pipelinen med namnet på källpipelinen med [UC] bifogad.
  4. Kör skriptet. Se Kör Databricks-anteckningar.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Begränsningar

Följande är begränsningar i DLT-clone a pipeline API-begäran:

  • Endast kloning är möjlig från en pipeline som är konfigurerad för att använda Hive-metastore till en Unity Catalog-pipeline.
  • Du kan bara skapa en klon på samma Azure Databricks-arbetsyta som pipelinen som du klonar från.
  • Den pipeline som du klonar kan bara innehålla följande strömningskällor:
  • Om Hive-metaarkivpipelinen som du klonar använder meddelandeläget för autoinläsningsfiler rekommenderar Databricks att du inte kör Hive-metaarkivpipelinen efter kloning. Detta beror på att när Hive-metaarkivets pipeline körs resulterar det i att vissa filaviseringshändelser tas bort från Unity-katalogens klon. Om Hives källmetakatalogspipeline körs efter att kloningsoperationen har slutförts kan du fylla på saknade filer med Auto Loader med alternativet cloudFiles.backfillInterval. Mer information om meddelandeläget för automatisk inläsning av filer finns i Vad är meddelandeläge för automatisk inläsning av fil?. Mer information om att fylla i filer med automatisk inläsning finns i Utlösa vanliga återfyllnad med hjälp av cloudFiles.backfillInterval och vanliga alternativ för automatisk inläsning.
  • Underhållsaktiviteter för pipeline pausas automatiskt för båda rören under kloningsprocessen.
  • Följande gäller för tidsresefrågor mot tabeller i den klonade Unity Catalog-pipelinen:
    • Om en tabellversion ursprungligen skrevs till ett Hive-metaarkivhanterat objekt, är tidsresefrågor med hjälp av en timestamp_expression-sats odefinierade vid frågning av det klonade Unity Catalog-objektet.
    • Men om tabellversionen skrevs till det klonade Unity Catalog-objektet fungerar tidsresefrågor med hjälp av en timestamp_expression-sats korrekt.
    • Tidsresefrågor med hjälp av en version-sats fungerar korrekt när du kör frågor mot ett klonat Unity Catalog-objekt, även när versionen ursprungligen skrevs till det hanterade Hive-metaarkivobjektet.
  • Andra begränsningar när du använder DLT med Unity Catalog finns i Unity Catalogs pipelinebegränsningar.
  • Informationen om de begränsningar som finns i Unity Catalog finns i Unity Catalog-begränsningar.