Udostępnij za pośrednictwem


Uruchomić potok DLT w przepływie pracy

Pipeline DLT można uruchomić w ramach przepływu pracy przetwarzania danych za pomocą zadań Databricks, systemu Apache Airflow lub usługi Azure Data Factory.

Zadania

W zadaniu usługi Databricks można organizować wiele zadań w celu zaimplementowania przepływu pracy przetwarzania danych. Aby uwzględnić potok DLT w zadaniu, użyj zadania Pipeline podczas tworzenia zadania. Zobacz zadanie potoku DLT dla zadań.

Apache Airflow

Apache Airflow to rozwiązanie typu open source do zarządzania przepływami pracy danych i planowania ich. Airflow reprezentuje przepływy pracy jako skierowane grafy acykliczne operacji (DAGs). Przepływ pracy definiuje się w pliku w języku Python, a funkcja Airflow zarządza planowaniem i wykonywaniem. Aby uzyskać informacje na temat instalowania i używania rozwiązania Airflow z usługą Azure Databricks, zobacz Orchestrate Azure Databricks jobs with Apache Airflow.

Aby uruchomić potok DLT w ramach przepływu pracy Airflow, użyj DatabricksSubmitRunOperator.

Wymagania

Do korzystania z obsługi Airflow dla DLT wymagane są następujące elementy:

Przykład

Poniższy przykład tworzy DAG systemu Airflow, który wyzwala aktualizację potoku DLT o identyfikatorze 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Zastąp CONNECTION_ID identyfikatorem połączenia Airflow do twojego obszaru roboczego.

Zapisz ten przykład w katalogu airflow/dags i użyj interfejsu użytkownika Airflow, aby przeglądać i uruchamiać DAG. Użyj UI DLT, aby wyświetlić szczegóły aktualizacji przepływu danych.

Azure Data Factory

Notatka

Zarówno DLT, jak i Azure Data Factory zawierają opcje do skonfigurowania liczby ponownych prób w przypadku wystąpienia awarii. Jeśli wartości ponawiania prób są skonfigurowane w potoku DLT i oraz w działaniu usługi Azure Data Factory, które wywołuje ten potok, liczba ponownych prób to iloczyn wartości ponawiania prób w usłudze Azure Data Factory i wartości ponawiania prób w DLT.

Jeśli na przykład aktualizacja potoku zakończy się niepowodzeniem, usługa DLT ponawia próbę aktualizacji domyślnie do pięciu razy. Jeśli ponawianie prób w usłudze Azure Data Factory jest ustawione na trzy, a potok DLT używa wartości domyślnej pięciu ponownych prób, nieudany potok DLT może być ponawiany do piętnastu razy. Aby uniknąć nadmiernej liczby ponownych prób w przypadku niepowodzenia aktualizacji potoku, usługa Databricks zaleca ograniczenie liczby ponownych prób podczas konfigurowania potoku DLT lub działania usługi Azure Data Factory, które wywołuje potok.

Aby zmienić konfigurację ponawiania prób dla potoku DLT, użyj ustawienia pipelines.numUpdateRetryAttempts podczas konfigurowania tego potoku.

Azure Data Factory to oparta na chmurze usługa ETL, która umożliwia organizowanie przepływów pracy integracji i przekształcania danych. Usługa Azure Data Factory bezpośrednio obsługuje uruchamianie zadań usługi Azure Databricks w przepływie pracy, w tym notesów , zadań JAR i skryptów języka Python. Potok można również uwzględnić w przepływie pracy, wywołując interfejs API DLT z poziomu działania internetowego usługi Azure Data Factory . Aby wyzwolić na przykład aktualizację potoku w usłudze Azure Data Factory:

  1. Utwórz fabrykę danych lub otwórz istniejącą fabrykę danych.

  2. Po zakończeniu tworzenia, otwórz stronę usługi Data Factory i kliknij kafelek Otwórz Azure Data Factory Studio. Zostanie wyświetlony interfejs użytkownika usługi Azure Data Factory.

  3. Utwórz nowy potok usługi Azure Data Factory, wybierając pozycję Potok z listy rozwijanej Nowy w interfejsie użytkownika Azure Data Factory Studio.

  4. W przyborniku Aktywności rozwiń Ogólne i przeciągnij aktywność Web do obszaru potoku. Kliknij kartę Ustawienia i wprowadź następujące wartości:

    Notatka

    Najlepszym rozwiązaniem w zakresie zabezpieczeń w przypadku uwierzytelniania za pomocą zautomatyzowanych narzędzi, systemów, skryptów i aplikacji usługa Databricks zaleca używanie osobistych tokenów dostępu należących do jednostek usługi zamiast użytkowników obszaru roboczego. Aby utworzyć tokeny dla jednostek usługi, zobacz Zarządzanie tokenami dla jednostki usługi.

    • adres URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Zastąp <get-workspace-instance>.

      Zastąp <pipeline-id> identyfikatorem rury.

    • Metoda: wybierz pozycję POST z menu rozwijanego.

    • Nagłówki: kliknij + Nowy. W polu tekstowym Nazwa wprowadź Authorization. W polu tekstowym wartość wprowadź Bearer <personal-access-token>.

      Zastąp <personal-access-token>osobistym tokenem dostępu usługi Azure Databricks.

    • Treść: aby przekazać dodatkowe parametry żądania, wprowadź dokument JSON zawierający parametry. Aby na przykład uruchomić aktualizację i ponownie przetworzyć wszystkie dane w ramach rurociągu: {"full_refresh": "true"}. Jeśli nie ma żadnych dodatkowych parametrów żądania, wprowadź puste nawiasy klamrowe ({}).

Aby przetestować aktywność sieci Web, kliknij Debug na pasku narzędzi potoku w interfejsie użytkownika Data Factory. Dane wyjściowe i stan przebiegu procesu, w tym błędy, są wyświetlane na karcie Wynik w potoku usługi Azure Data Factory. Użyj interfejsu użytkownika DLT, aby wyświetlić szczegóły aktualizacji potoku.

Napiwek

Typowym wymaganiem dotyczącym przepływu pracy jest uruchomienie zadania po zakończeniu poprzedniego zadania. Ponieważ żądanie updates DLT jest asynchroniczne — żądanie jest zwracane po rozpoczęciu aktualizacji, ale przed ukończeniem aktualizacji — zadania w potoku usługi Azure Data Factory posiadające zależność od aktualizacji DLT muszą czekać na ukończenie aktualizacji. Opcją oczekiwania na ukończenie aktualizacji jest dodanie działania 'Until' po działaniu sieci Web, które wyzwala aktualizację DLT. W działaniu Until:

  1. Dodaj działanie Wait, aby poczekać skonfigurowaną liczbę sekund na ukończenie aktualizacji.
  2. Dodaj działanie sieciowe po działaniu Wait, które korzysta z żądania szczegółów aktualizacji DLT w celu uzyskania stanu aktualizacji. Pole state w odpowiedzi zwraca bieżący stan aktualizacji, w tym informację o tym, czy została zakończona.
  3. Użyj wartości pola state, aby ustawić warunek zakończenia działania Until. Możesz również użyć działania Ustaw Zmienną, aby dodać zmienną potoku w oparciu o wartość state i użyć tej zmiennej jako warunku zakończenia.