DLT-folyamat futtatása munkafolyamatban
Egy DLT-folyamatot egy adatfeldolgozási munkafolyamat részeként futtathat Databricks-feladatokkal, Apache Airflow-nal vagy Azure Data Factoryvel.
Munkahelyek
Egy Databricks-feladatban több feladatot is vezényelhet egy adatfeldolgozási munkafolyamat implementálásához. Ha DLT-folyamatot szeretne belefoglalni egy feladatba, használja a folyamat feladatot egy feladat létrehozásakor. Lásd a(z) DLT csővezeték feladatot a(z)feladatokhoz.
Apache Airflow
Apache Airflow egy nyílt forráskódú megoldás az adat-munkafolyamatok kezelésére és ütemezésére. Az Airflow a munkafolyamatokat a műveletek irányított aciklikus gráfjaiként (DAG-k) jelöli. Definiálhat egy munkafolyamatot egy Python-fájlban, és az Airflow kezeli az ütemezést és a végrehajtást. Az Airflow és az Azure Databricks telepítésével és használatával kapcsolatos információkért lásd Azure Databricks-feladatok vezénylése az Apache Airflow.
DLT-folyamatot az Airflow munkafolyamat részeként futtathatja a DatabricksSubmitRunOperatorhasználatával.
Követelmények
A DLT Airflow-támogatásának használatához a következők szükségesek:
- Airflow 2.1.0-s vagy újabb verzió.
- A Databricks szolgáltató csomag 2.1.0 vagy újabb verziója.
Példa
Az alábbi példa létrehoz egy Airflow DAG-t, amely elindítja a DLT-folyamat frissítését a 8279d543-063c-4d63-9926-dae38e35ce8b
azonosítóval:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Cserélje le a CONNECTION_ID
azonosítót a munkaterülethez tartozó Airflow-kapcsolat elemére.
Mentse ezt a példát a airflow/dags
könyvtárban, és használja az Airflow felhasználói felületét a DAG megtekintéséhez és aktiválásához. A folyamatfrissítés részleteit a DLT felhasználói felületén tekintheti meg.
Azure Data Factory
Jegyzet
A DLT és az Azure Data Factory mindegyike lehetőséget tartalmaz az újrapróbálkozások számának konfigurálására meghibásodás esetén. Ha az újrapróbálkozási értékek konfigurálva vannak a DLT-folyamatban és, valamint a folyamatot meghívó Azure Data Factory tevékenységen, az újrapróbálkozások száma az Azure Data Factory újrapróbálkozási értékének és a DLT újrapróbálkozási értékének szorzata lesz.
Ha például egy folyamat frissítése sikertelen, a DLT alapértelmezés szerint legfeljebb ötször próbálkozik újra a frissítéssel. Ha az Azure Data Factory újrapróbálkozása háromra van állítva, és a DLT-folyamat öt újrapróbálkozás alapértelmezett értékét használja, a sikertelen DLT-folyamat akár tizenötször is újrapróbálkozhat. A folyamatfrissítések sikertelensége esetén a túlzott újrapróbálkozási kísérletek elkerülése érdekében a Databricks javasolja az újrapróbálkozások számának korlátozását a DLT-folyamat vagy a folyamatot meghívó Azure Data Factory-tevékenység konfigurálásakor.
A DLT-folyamat újrapróbálkozási konfigurációjának módosításához használja a pipelines.numUpdateRetryAttempts
beállítást a folyamat konfigurálásakor.
Az Azure Data Factory egy felhőalapú ETL-szolgáltatás, amely lehetővé teszi az adatintegrációs és átalakítási munkafolyamatok vezénylét. Az Azure Data Factory közvetlenül támogatja az Azure Databricks-feladatok munkafolyamatban való futtatását, beleértve jegyzetfüzeteket, JAR-feladatokat és Python-szkripteket. A munkafolyamatba egy csővezetéket is beilleszthet, ha meghívja a DLT API-t egy Azure Data Factory webes tevékenységsorán. Például folyamatfrissítés aktiválása az Azure Data Factoryből:
Adat-előállító létrehozása vagy meglévő adat-előállító megnyitása.
Amikor a létrehozás befejeződött, nyissa meg az adatgyár lapját, és kattintson az Azure Data Factory Studio csempére. Megjelenik az Azure Data Factory felhasználói felülete.
Hozzon létre egy új Azure Data Factory-folyamatot az Azure Data Factory Studio felhasználói felületén található Új legördülő menü Pipeline kiválasztásával.
A Tevékenységek eszközkészletben bontsa ki Általános, és húzza a webes tevékenységet a folyamatvászonra. Kattintson a Beállítások fülre, és adja meg a következő értékeket:
Jegyzet
Biztonsági ajánlott eljárásként, amikor automatizált eszközökkel, rendszerekkel, szkriptekkel és alkalmazásokkal hitelesít, a Databricks azt javasolja, hogy munkaterület-felhasználók helyett szolgáltatásnevekhez tartozó személyes hozzáférési jogkivonatokat használjon. A szolgáltatásfő tokenek létrehozásához lásd: Szolgáltatásfő tokenek kezelése.
URL-cím:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Cserélje le
<get-workspace-instance>
.Cserélje le a
<pipeline-id>
-t a csővezeték azonosítójára.metódus: Válassza POST a legördülő menüből.
Fejlécek: Kattintson a + Újgombra. A Név szövegmezőbe írja be a
Authorization
. Az Érték szövegmezőbe írja be aBearer <personal-access-token>
.Cserélje le a
<personal-access-token>
egy Azure Databricks személyes hozzáférési jogkivonatra.törzs: További kérelemparaméterek megadásához adjon meg egy JSON-dokumentumot, amely tartalmazza a paramétereket. Például egy frissítés indításához és a folyamat összes adatának újrafeldolgozásához:
{"full_refresh": "true"}
. Ha nincsenek további kérésparaméterek, adjon meg üres zárójeleket ({}
).
A webes tevékenység teszteléséhez kattintson Hibakeresési elemre a Data Factory felhasználói felületén található folyamat eszköztárán. A futtatás kimenete és állapota, beleértve a hibákat is, az Azure Data Factory-folyamat Kimeneti lapján jelenik meg. A folyamatfrissítés részleteit a DLT felhasználói felületén tekintheti meg.
Borravaló
Gyakori munkafolyamat-követelmény, hogy egy feladatot egy korábbi tevékenység befejezése után kezdjen el. Mivel a DLT updates
kérés aszinkron – a kérés a frissítés elindítása után, de a frissítés befejezése előtt tér vissza –, a DLT-frissítéstől függő Azure Data Factory-folyamatban lévő feladatoknak várniuk kell a frissítés befejezésére. A frissítés befejezésére való várakozás egyik lehetősége egy Until tevékenység hozzáadása a DLT frissítést kiváltó webes tevékenység után. A Until tevékenységben:
- Adjon hozzá egy várakozási tevékenységet, hogy konfigurált számú másodpercet várjon a frissítés befejezéséhez.
- Adjon hozzá egy webes tevékenységet a várakozási tevékenység után, amely a DLT frissítés részleteire vonatkozó kérést használja a frissítés állapotának lekéréséhez. A válasz
state
mezője a frissítés aktuális állapotát adja vissza, beleértve azt is, hogy befejeződött-e. - Használja a
state
mező értékét a Until tevékenység befejezési feltételének beállításához. A Változótevékenység beállítása használatával is hozzáadhat egy folyamatváltozót astate
érték alapján, és használhatja ezt a változót a megszüntetési feltételhez.