Unity Catalog-folyamat létrehozása Hive-metaadattár-folyamat klónozásával
Ez a cikk ismerteti a Databricks REST API clone a pipeline
kérését, és azt, hogy hogyan másolhat egy meglévő folyamatot, amely közzétesz a Hive metaadattárban egy új, a Unity Catalogban közzétett folyamatba. Amikor a clone a pipeline
kérést meghívja, akkor a következő történik:
- Átmásolja a meglévő folyamat forráskódját és konfigurációját egy új folyamatba, alkalmazva az Ön által megadott konfigurációs felülbírálásokat.
- Frissíti a materializált nézet és a streamelő tábla definícióit és hivatkozásait a megfelelő változtatásokkal, hogy ezeket az objektumokat a Unity Catalog kezelje.
- Elindít egy pipeline-frissítést a meglévő adatok és metaadatok, például ellenőrzőpontok migrálásához a pipeline bármely streaming táblájában. Ez lehetővé teszi, hogy a streamelési táblák ugyanazon a ponton folytassák a feldolgozást, mint az eredeti folyamat.
A klónozási művelet befejezése után az eredeti és az új folyamatok egymástól függetlenül is futtathatók.
Ez a cikk példákat tartalmaz az API-kérés meghívására közvetlenül és Egy Python-szkripten keresztül egy Databricks-jegyzetfüzetből.
Mielőtt hozzákezdene
A folyamat klónozása előtt a következőkre van szükség:
Hive-metaadattár-folyamat klónozásához a folyamatban definiált tábláknak és nézeteknek közzé kell tenni a táblákat egy célsémában. Ha meg szeretné tudni, hogyan adhat hozzá célsémát egy folyamathoz, olvassa el Folyamat konfigurálása a Hive metaadattárban való közzétételhezcímű témakört.
A hive metaadattár által felügyelt táblákra vagy nézetekre mutató hivatkozásokat a klónozni kívánt folyamatban teljes mértékben minősíteni kell a katalógus (
hive_metastore
), séma és táblanév alapján. Például a következő kódban, amely egycustomers
adatkészletet hoz létre, a táblanév argumentumot frissíteni kellhive_metastore.sales.customers
:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
Klónozási művelet közben ne szerkessze a forrás Hive metaadattár-folyamat forráskódját, beleértve a folyamat részeként konfigurált jegyzetfüzeteket és a Git-mappákban vagy munkaterületfájlokban tárolt modulokat.
A forrás Hive metaadattár-folyamat nem futhat a klónozási művelet indításakor. Ha egy frissítés fut, állítsa le, vagy várja meg, amíg befejeződik.
A csővezeték klónozása előtt az alábbiakat érdemes figyelembe venni:
- Ha a Hive metaadattár-folyamat táblái egy tárolóhelyet adnak meg a Python
path
argumentumával vagy az SQLLOCATION
használatával, adja át a"pipelines.migration.ignoreExplicitPath": "true"
konfigurációt a klónozási kérelemnek. Ennek a konfigurációnak a beállítását az alábbi utasítások tartalmazzák. - Ha a Hive metaadattár-folyamat tartalmaz egy automatikus betöltőforrást, amely megadja a
cloudFiles.schemaLocation
beállítás értékét, és a Hive metaadattár-folyamat a Unity Katalógus klónjának létrehozása után is működőképes marad, amergeSchema
beállítást úgy kell beállítania, hogytrue
a Hive metaadattár-folyamatban és a klónozott Unity Catalog-folyamatban is. Ha ezt a beállítást a Hive metaadattár-folyamathoz a klónozás előtt hozzáadja, az átmásolja a beállítást az új folyamatba.
Folyamat klónozása a Databricks REST API-val
Az alábbi példa a curl
paranccsal hívja meg a clone a pipeline
kérést a Databricks REST API-ban:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Helyettesít:
-
<personal-access-token>
a Databricks személyes hozzáférési jogkivonattal. -
<databricks-instance>
az Azure Databricks -munkaterületpéldány nevével, példáuladb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
a klónozni kívánt Hive metaadattár-folyamat egyedi azonosítójával. A folyamatazonosítót a DLT felhasználói felületéntalálhatja meg.
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Helyettesít:
-
<target-catalog-name>
egy katalógus nevével a Unity Katalógusban, amelyhez az új folyamatnak közzé kell tennie. Ennek egy meglévő katalógusnak kell lennie. -
<target-schema-name>
egy séma nevével a Unity Catalogban, amelyhez az új folyamatnak közzé kell tennie, ha az eltér az aktuális séma nevétől. Ez a paraméter nem kötelező, és ha nincs megadva, a rendszer a meglévő sémanevet használja. -
<new-pipeline-name>
egy opcionális névvel az új csővezeték számára. Ha nincs megadva, az új folyamatot a forrásfolyamat nevével nevezik el, a[UC]
hozzáfűzésével.
clone_mode
megadja a klónozási művelethez használni kívánt módot.
MIGRATE_TO_UC
az egyetlen támogatott lehetőség.
Az új folyamat konfigurációinak megadásához használja a configuration
mezőt. Az itt megadott értékek felülbírálják az eredeti folyamat konfigurációit.
A clone
REST API-kérés válasza az új Unity Catalog-folyamat folyamatazonosítója.
Folyamat klónozása Databricks-jegyzetfüzetből
Az alábbi példa meghívja a create a pipeline
kérést egy Python-szkriptből. Ezt a szkriptet databricks-jegyzetfüzettel futtathatja:
- Hozzon létre egy új jegyzetfüzetet a szkripthez. Lásd: Jegyzetfüzet létrehozása.
- Másolja a következő Python-szkriptet a jegyzetfüzet első cellájába.
- Frissítse a helyőrző értékeket a szkriptben a következő lecserélésével:
-
<databricks-instance>
az Azure Databricks -munkaterületpéldány nevével, példáuladb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
a klónozni kívánt Hive metaadattár-adatfolyam egyedi azonosítójával. A folyamatazonosítót a DLT felhasználói felületéntalálhatja meg. -
<target-catalog-name>
egy katalógus nevével a Unity Katalógusban, amelyhez az új folyamatvonalnak közzé kell tennie. Ennek egy meglévő katalógusnak kell lennie. -
<target-schema-name>
egy séma neve a Unity Catalogban, amelyhez az új folyamatot közzé kell tenni, ha eltér az aktuális séma nevétől. Ez a paraméter nem kötelező, és ha nincs megadva, a rendszer a meglévő sémanevet használja. -
<new-pipeline-name>
egy opcionális névvel az új folyamatlánchoz. Ha nincs megadva, az új folyamat neve a forrásfolyamat nevéből származik, a végére[UC]
-t hozzáfűzve.
-
- Futtassa a szkriptet. Lásd: Databricks-jegyzetfüzetek futtatása.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Korlátozások
A DLT clone a pipeline
API-kérés korlátozásai a következők:
- Csak a Hive metaadattár használatára konfigurált folyamatból a Unity Catalog-folyamatba való klónozás támogatott.
- Klónt csak ugyanabban az Azure Databricks-munkaterületen hozhat létre, mint a klónozott folyamat.
- A klónozott adattovábbítási csatorna csak a következő streamforrásokat tartalmazhatja:
- Delta-források
- Automatikus betöltő, beleértve az automatikus betöltő által támogatott adatforrásokat is. Lásd: Fájlok betöltése felhőalapú objektumtárolóból.
- Apached Kafka strukturált streameléssel. A Kafka-forrás azonban nem konfigurálható a
kafka.group.id
beállítás használatára. Lásd folyamfeldolgozás az Apache Kafkával és az Azure Databricks. - Amazon Kinesis strukturált streameléssel. A Kinesis-forrás azonban nem konfigurálható úgy, hogy beállítsa
consumerMode
-tefo
-re.
- Ha a klónozni kívánt Hive metaadattár-folyamat automatikus betöltőfájl-értesítési módot használ, a Databricks azt javasolja, hogy klónozás után ne futtassa a Hive metaadattár-folyamatot. Ennek az az oka, hogy a Hive metaadattár-folyamat futtatásával néhány fájlértesítési eseményt elvet a Unity Catalog klónjáról. Ha a forrás Hive metaadattár-folyamat a klónozási művelet befejeződése után fut, az Automatikus betöltő használatával a
cloudFiles.backfillInterval
lehetőséggel újra kitöltheti a hiányzó fájlokat. Az automatikus betöltő fájlértesítési módjáról az Mi az automatikus betöltő fájlértesítési mód?. A fájlok utólagos betöltéséről az Auto Loader segítségével további információt a CloudFiles.backfillInterval használatával történő rendszeres utólagos betöltések kiváltása és a gyakori Auto Loader opciókcímű részekben talál. - A folyamatkarbantartási feladatok mindkét folyamat esetében automatikusan szünetelnek, miközben a klónozás folyamatban van.
- A klónozott Unity Catalog-folyamat tábláira vonatkozó időutazási lekérdezésekre az alábbiak vonatkoznak:
- Ha egy táblázatverziót eredetileg egy Hive metaadattár által felügyelt objektumba írtak, a klónozott Unity Catalog-objektum lekérdezésekor a rendszer nem definiálja a
timestamp_expression
záradékot használó időutazási lekérdezéseket. - Ha azonban a táblaverziót a klónozott Unity Catalog-objektumba írták, az időutazási lekérdezések
timestamp_expression
záradékkal megfelelően működnek. - Az
version
záradékot használó időutazó lekérdezések megfelelően működnek klónozott Unity Catalog-objektum lekérdezésekor, még akkor is, ha a verzió eredetileg a Hive metaadattár felügyelt objektumába lett írva.
- Ha egy táblázatverziót eredetileg egy Hive metaadattár által felügyelt objektumba írtak, a klónozott Unity Catalog-objektum lekérdezésekor a rendszer nem definiálja a
- Más korlátozásokért a DLT és a Unity Catalog használata során lásd: Unity Catalog-folyamat korlátozásai.
- A Unity Catalog korlátozásaiért lásd Unity Catalog-korlátozások.