Megosztás a következőn keresztül:


Unity Catalog-folyamat létrehozása Hive-metaadattár-folyamat klónozásával

Ez a cikk ismerteti a Databricks REST API clone a pipeline kérését, és azt, hogy hogyan másolhat egy meglévő folyamatot, amely közzétesz a Hive metaadattárban egy új, a Unity Catalogban közzétett folyamatba. Amikor a clone a pipeline kérést meghívja, akkor a következő történik:

  • Átmásolja a meglévő folyamat forráskódját és konfigurációját egy új folyamatba, alkalmazva az Ön által megadott konfigurációs felülbírálásokat.
  • Frissíti a materializált nézet és a streamelő tábla definícióit és hivatkozásait a megfelelő változtatásokkal, hogy ezeket az objektumokat a Unity Catalog kezelje.
  • Elindít egy pipeline-frissítést a meglévő adatok és metaadatok, például ellenőrzőpontok migrálásához a pipeline bármely streaming táblájában. Ez lehetővé teszi, hogy a streamelési táblák ugyanazon a ponton folytassák a feldolgozást, mint az eredeti folyamat.

A klónozási művelet befejezése után az eredeti és az új folyamatok egymástól függetlenül is futtathatók.

Ez a cikk példákat tartalmaz az API-kérés meghívására közvetlenül és Egy Python-szkripten keresztül egy Databricks-jegyzetfüzetből.

Mielőtt hozzákezdene

A folyamat klónozása előtt a következőkre van szükség:

  • Hive-metaadattár-folyamat klónozásához a folyamatban definiált tábláknak és nézeteknek közzé kell tenni a táblákat egy célsémában. Ha meg szeretné tudni, hogyan adhat hozzá célsémát egy folyamathoz, olvassa el Folyamat konfigurálása a Hive metaadattárban való közzétételhezcímű témakört.

  • A hive metaadattár által felügyelt táblákra vagy nézetekre mutató hivatkozásokat a klónozni kívánt folyamatban teljes mértékben minősíteni kell a katalógus (hive_metastore), séma és táblanév alapján. Például a következő kódban, amely egy customers adatkészletet hoz létre, a táblanév argumentumot frissíteni kell hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Klónozási művelet közben ne szerkessze a forrás Hive metaadattár-folyamat forráskódját, beleértve a folyamat részeként konfigurált jegyzetfüzeteket és a Git-mappákban vagy munkaterületfájlokban tárolt modulokat.

  • A forrás Hive metaadattár-folyamat nem futhat a klónozási művelet indításakor. Ha egy frissítés fut, állítsa le, vagy várja meg, amíg befejeződik.

A csővezeték klónozása előtt az alábbiakat érdemes figyelembe venni:

  • Ha a Hive metaadattár-folyamat táblái egy tárolóhelyet adnak meg a Python path argumentumával vagy az SQL LOCATION használatával, adja át a "pipelines.migration.ignoreExplicitPath": "true" konfigurációt a klónozási kérelemnek. Ennek a konfigurációnak a beállítását az alábbi utasítások tartalmazzák.
  • Ha a Hive metaadattár-folyamat tartalmaz egy automatikus betöltőforrást, amely megadja a cloudFiles.schemaLocation beállítás értékét, és a Hive metaadattár-folyamat a Unity Katalógus klónjának létrehozása után is működőképes marad, a mergeSchema beállítást úgy kell beállítania, hogy true a Hive metaadattár-folyamatban és a klónozott Unity Catalog-folyamatban is. Ha ezt a beállítást a Hive metaadattár-folyamathoz a klónozás előtt hozzáadja, az átmásolja a beállítást az új folyamatba.

Folyamat klónozása a Databricks REST API-val

Az alábbi példa a curl paranccsal hívja meg a clone a pipeline kérést a Databricks REST API-ban:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Helyettesít:

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Helyettesít:

  • <target-catalog-name> egy katalógus nevével a Unity Katalógusban, amelyhez az új folyamatnak közzé kell tennie. Ennek egy meglévő katalógusnak kell lennie.
  • <target-schema-name> egy séma nevével a Unity Catalogban, amelyhez az új folyamatnak közzé kell tennie, ha az eltér az aktuális séma nevétől. Ez a paraméter nem kötelező, és ha nincs megadva, a rendszer a meglévő sémanevet használja.
  • <new-pipeline-name> egy opcionális névvel az új csővezeték számára. Ha nincs megadva, az új folyamatot a forrásfolyamat nevével nevezik el, a [UC] hozzáfűzésével.

clone_mode megadja a klónozási művelethez használni kívánt módot. MIGRATE_TO_UC az egyetlen támogatott lehetőség.

Az új folyamat konfigurációinak megadásához használja a configuration mezőt. Az itt megadott értékek felülbírálják az eredeti folyamat konfigurációit.

A clone REST API-kérés válasza az új Unity Catalog-folyamat folyamatazonosítója.

Folyamat klónozása Databricks-jegyzetfüzetből

Az alábbi példa meghívja a create a pipeline kérést egy Python-szkriptből. Ezt a szkriptet databricks-jegyzetfüzettel futtathatja:

  1. Hozzon létre egy új jegyzetfüzetet a szkripthez. Lásd: Jegyzetfüzet létrehozása.
  2. Másolja a következő Python-szkriptet a jegyzetfüzet első cellájába.
  3. Frissítse a helyőrző értékeket a szkriptben a következő lecserélésével:
    • <databricks-instance> az Azure Databricks -munkaterületpéldány nevével, például adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> a klónozni kívánt Hive metaadattár-adatfolyam egyedi azonosítójával. A folyamatazonosítót a DLT felhasználói felületéntalálhatja meg.
    • <target-catalog-name> egy katalógus nevével a Unity Katalógusban, amelyhez az új folyamatvonalnak közzé kell tennie. Ennek egy meglévő katalógusnak kell lennie.
    • <target-schema-name> egy séma neve a Unity Catalogban, amelyhez az új folyamatot közzé kell tenni, ha eltér az aktuális séma nevétől. Ez a paraméter nem kötelező, és ha nincs megadva, a rendszer a meglévő sémanevet használja.
    • <new-pipeline-name> egy opcionális névvel az új folyamatlánchoz. Ha nincs megadva, az új folyamat neve a forrásfolyamat nevéből származik, a végére [UC]-t hozzáfűzve.
  4. Futtassa a szkriptet. Lásd: Databricks-jegyzetfüzetek futtatása.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Korlátozások

A DLT clone a pipeline API-kérés korlátozásai a következők:

  • Csak a Hive metaadattár használatára konfigurált folyamatból a Unity Catalog-folyamatba való klónozás támogatott.
  • Klónt csak ugyanabban az Azure Databricks-munkaterületen hozhat létre, mint a klónozott folyamat.
  • A klónozott adattovábbítási csatorna csak a következő streamforrásokat tartalmazhatja:
  • Ha a klónozni kívánt Hive metaadattár-folyamat automatikus betöltőfájl-értesítési módot használ, a Databricks azt javasolja, hogy klónozás után ne futtassa a Hive metaadattár-folyamatot. Ennek az az oka, hogy a Hive metaadattár-folyamat futtatásával néhány fájlértesítési eseményt elvet a Unity Catalog klónjáról. Ha a forrás Hive metaadattár-folyamat a klónozási művelet befejeződése után fut, az Automatikus betöltő használatával a cloudFiles.backfillInterval lehetőséggel újra kitöltheti a hiányzó fájlokat. Az automatikus betöltő fájlértesítési módjáról az Mi az automatikus betöltő fájlértesítési mód?. A fájlok utólagos betöltéséről az Auto Loader segítségével további információt a CloudFiles.backfillInterval használatával történő rendszeres utólagos betöltések kiváltása és a gyakori Auto Loader opciókcímű részekben talál.
  • A folyamatkarbantartási feladatok mindkét folyamat esetében automatikusan szünetelnek, miközben a klónozás folyamatban van.
  • A klónozott Unity Catalog-folyamat tábláira vonatkozó időutazási lekérdezésekre az alábbiak vonatkoznak:
    • Ha egy táblázatverziót eredetileg egy Hive metaadattár által felügyelt objektumba írtak, a klónozott Unity Catalog-objektum lekérdezésekor a rendszer nem definiálja a timestamp_expression záradékot használó időutazási lekérdezéseket.
    • Ha azonban a táblaverziót a klónozott Unity Catalog-objektumba írták, az időutazási lekérdezések timestamp_expression záradékkal megfelelően működnek.
    • Az version záradékot használó időutazó lekérdezések megfelelően működnek klónozott Unity Catalog-objektum lekérdezésekor, még akkor is, ha a verzió eredetileg a Hive metaadattár felügyelt objektumába lett írva.
  • Más korlátozásokért a DLT és a Unity Catalog használata során lásd: Unity Catalog-folyamat korlátozásai.
  • A Unity Catalog korlátozásaiért lásd Unity Catalog-korlátozások.