Megosztás a következőn keresztül:


DLT-folyamat futtatása munkafolyamatban

Egy DLT-folyamatot egy adatfeldolgozási munkafolyamat részeként futtathat Databricks-feladatokkal, Apache Airflow-nal vagy Azure Data Factoryvel.

Munkahelyek

Egy Databricks-feladatban több feladatot is vezényelhet egy adatfeldolgozási munkafolyamat implementálásához. Ha DLT-folyamatot szeretne belefoglalni egy feladatba, használja a folyamat feladatot egy feladat létrehozásakor. Lásd a(z) DLT csővezeték feladatot a(z)feladatokhoz.

Apache Airflow

Apache Airflow egy nyílt forráskódú megoldás az adat-munkafolyamatok kezelésére és ütemezésére. Az Airflow a munkafolyamatokat a műveletek irányított aciklikus gráfjaiként (DAG-k) jelöli. Definiálhat egy munkafolyamatot egy Python-fájlban, és az Airflow kezeli az ütemezést és a végrehajtást. Az Airflow és az Azure Databricks telepítésével és használatával kapcsolatos információkért lásd Azure Databricks-feladatok vezénylése az Apache Airflow.

DLT-folyamatot az Airflow munkafolyamat részeként futtathatja a DatabricksSubmitRunOperatorhasználatával.

Követelmények

A DLT Airflow-támogatásának használatához a következők szükségesek:

Példa

Az alábbi példa létrehoz egy Airflow DAG-t, amely elindítja a DLT-folyamat frissítését a 8279d543-063c-4d63-9926-dae38e35ce8bazonosítóval:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Cserélje le a CONNECTION_ID azonosítót a munkaterülethez tartozó Airflow-kapcsolat elemére.

Mentse ezt a példát a airflow/dags könyvtárban, és használja az Airflow felhasználói felületét a DAG megtekintéséhez és aktiválásához. A folyamatfrissítés részleteit a DLT felhasználói felületén tekintheti meg.

Azure Data Factory

Jegyzet

A DLT és az Azure Data Factory mindegyike lehetőséget tartalmaz az újrapróbálkozások számának konfigurálására meghibásodás esetén. Ha az újrapróbálkozási értékek konfigurálva vannak a DLT-folyamatban és, valamint a folyamatot meghívó Azure Data Factory tevékenységen, az újrapróbálkozások száma az Azure Data Factory újrapróbálkozási értékének és a DLT újrapróbálkozási értékének szorzata lesz.

Ha például egy folyamat frissítése sikertelen, a DLT alapértelmezés szerint legfeljebb ötször próbálkozik újra a frissítéssel. Ha az Azure Data Factory újrapróbálkozása háromra van állítva, és a DLT-folyamat öt újrapróbálkozás alapértelmezett értékét használja, a sikertelen DLT-folyamat akár tizenötször is újrapróbálkozhat. A folyamatfrissítések sikertelensége esetén a túlzott újrapróbálkozási kísérletek elkerülése érdekében a Databricks javasolja az újrapróbálkozások számának korlátozását a DLT-folyamat vagy a folyamatot meghívó Azure Data Factory-tevékenység konfigurálásakor.

A DLT-folyamat újrapróbálkozási konfigurációjának módosításához használja a pipelines.numUpdateRetryAttempts beállítást a folyamat konfigurálásakor.

Az Azure Data Factory egy felhőalapú ETL-szolgáltatás, amely lehetővé teszi az adatintegrációs és átalakítási munkafolyamatok vezénylét. Az Azure Data Factory közvetlenül támogatja az Azure Databricks-feladatok munkafolyamatban való futtatását, beleértve jegyzetfüzeteket, JAR-feladatokat és Python-szkripteket. A munkafolyamatba egy csővezetéket is beilleszthet, ha meghívja a DLT API-t egy Azure Data Factory webes tevékenységsorán. Például folyamatfrissítés aktiválása az Azure Data Factoryből:

  1. Adat-előállító létrehozása vagy meglévő adat-előállító megnyitása.

  2. Amikor a létrehozás befejeződött, nyissa meg az adatgyár lapját, és kattintson az Azure Data Factory Studio csempére. Megjelenik az Azure Data Factory felhasználói felülete.

  3. Hozzon létre egy új Azure Data Factory-folyamatot az Azure Data Factory Studio felhasználói felületén található Új legördülő menü Pipeline kiválasztásával.

  4. A Tevékenységek eszközkészletben bontsa ki Általános, és húzza a webes tevékenységet a folyamatvászonra. Kattintson a Beállítások fülre, és adja meg a következő értékeket:

    Jegyzet

    Biztonsági ajánlott eljárásként, amikor automatizált eszközökkel, rendszerekkel, szkriptekkel és alkalmazásokkal hitelesít, a Databricks azt javasolja, hogy munkaterület-felhasználók helyett szolgáltatásnevekhez tartozó személyes hozzáférési jogkivonatokat használjon. A szolgáltatásfő tokenek létrehozásához lásd: Szolgáltatásfő tokenek kezelése.

    • URL-cím: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Cserélje le <get-workspace-instance>.

      Cserélje le a <pipeline-id>-t a csővezeték azonosítójára.

    • metódus: Válassza POST a legördülő menüből.

    • Fejlécek: Kattintson a + Újgombra. A Név szövegmezőbe írja be a Authorization. Az Érték szövegmezőbe írja be a Bearer <personal-access-token>.

      Cserélje le a <personal-access-token> egy Azure Databricks személyes hozzáférési jogkivonatra.

    • törzs: További kérelemparaméterek megadásához adjon meg egy JSON-dokumentumot, amely tartalmazza a paramétereket. Például egy frissítés indításához és a folyamat összes adatának újrafeldolgozásához: {"full_refresh": "true"}. Ha nincsenek további kérésparaméterek, adjon meg üres zárójeleket ({}).

A webes tevékenység teszteléséhez kattintson Hibakeresési elemre a Data Factory felhasználói felületén található folyamat eszköztárán. A futtatás kimenete és állapota, beleértve a hibákat is, az Azure Data Factory-folyamat Kimeneti lapján jelenik meg. A folyamatfrissítés részleteit a DLT felhasználói felületén tekintheti meg.

Borravaló

Gyakori munkafolyamat-követelmény, hogy egy feladatot egy korábbi tevékenység befejezése után kezdjen el. Mivel a DLT updates kérés aszinkron – a kérés a frissítés elindítása után, de a frissítés befejezése előtt tér vissza –, a DLT-frissítéstől függő Azure Data Factory-folyamatban lévő feladatoknak várniuk kell a frissítés befejezésére. A frissítés befejezésére való várakozás egyik lehetősége egy Until tevékenység hozzáadása a DLT frissítést kiváltó webes tevékenység után. A Until tevékenységben:

  1. Adjon hozzá egy várakozási tevékenységet, hogy konfigurált számú másodpercet várjon a frissítés befejezéséhez.
  2. Adjon hozzá egy webes tevékenységet a várakozási tevékenység után, amely a DLT frissítés részleteire vonatkozó kérést használja a frissítés állapotának lekéréséhez. A válasz state mezője a frissítés aktuális állapotát adja vissza, beleértve azt is, hogy befejeződött-e.
  3. Használja a state mező értékét a Until tevékenység befejezési feltételének beállításához. A Változótevékenység beállítása használatával is hozzáadhat egy folyamatváltozót a state érték alapján, és használhatja ezt a változót a megszüntetési feltételhez.