Uruchomić potok DLT w przepływie pracy
Pipeline DLT można uruchomić w ramach przepływu pracy przetwarzania danych za pomocą zadań Databricks, systemu Apache Airflow lub usługi Azure Data Factory.
Zadania
W zadaniu usługi Databricks można organizować wiele zadań w celu zaimplementowania przepływu pracy przetwarzania danych. Aby uwzględnić potok DLT w zadaniu, użyj zadania Pipeline podczas tworzenia zadania. Zobacz zadanie potoku DLT dla zadań.
Apache Airflow
Apache Airflow to rozwiązanie typu open source do zarządzania przepływami pracy danych i planowania ich. Airflow reprezentuje przepływy pracy jako skierowane grafy acykliczne operacji (DAGs). Przepływ pracy definiuje się w pliku w języku Python, a funkcja Airflow zarządza planowaniem i wykonywaniem. Aby uzyskać informacje na temat instalowania i używania rozwiązania Airflow z usługą Azure Databricks, zobacz Orchestrate Azure Databricks jobs with Apache Airflow.
Aby uruchomić potok DLT w ramach przepływu pracy Airflow, użyj DatabricksSubmitRunOperator.
Wymagania
Do korzystania z obsługi Airflow dla DLT wymagane są następujące elementy:
- Airflow w wersji 2.1.0 lub nowszej.
- Moduł Dostawcy Databricks w wersji 2.1.0 lub nowszej.
Przykład
Poniższy przykład tworzy DAG systemu Airflow, który wyzwala aktualizację potoku DLT o identyfikatorze 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Zastąp CONNECTION_ID
identyfikatorem połączenia Airflow do twojego obszaru roboczego.
Zapisz ten przykład w katalogu airflow/dags
i użyj interfejsu użytkownika Airflow, aby przeglądać i uruchamiać DAG. Użyj UI DLT, aby wyświetlić szczegóły aktualizacji przepływu danych.
Azure Data Factory
Notatka
Zarówno DLT, jak i Azure Data Factory zawierają opcje do skonfigurowania liczby ponownych prób w przypadku wystąpienia awarii. Jeśli wartości ponawiania prób są skonfigurowane w potoku DLT i oraz w działaniu usługi Azure Data Factory, które wywołuje ten potok, liczba ponownych prób to iloczyn wartości ponawiania prób w usłudze Azure Data Factory i wartości ponawiania prób w DLT.
Jeśli na przykład aktualizacja potoku zakończy się niepowodzeniem, usługa DLT ponawia próbę aktualizacji domyślnie do pięciu razy. Jeśli ponawianie prób w usłudze Azure Data Factory jest ustawione na trzy, a potok DLT używa wartości domyślnej pięciu ponownych prób, nieudany potok DLT może być ponawiany do piętnastu razy. Aby uniknąć nadmiernej liczby ponownych prób w przypadku niepowodzenia aktualizacji potoku, usługa Databricks zaleca ograniczenie liczby ponownych prób podczas konfigurowania potoku DLT lub działania usługi Azure Data Factory, które wywołuje potok.
Aby zmienić konfigurację ponawiania prób dla potoku DLT, użyj ustawienia pipelines.numUpdateRetryAttempts
podczas konfigurowania tego potoku.
Azure Data Factory to oparta na chmurze usługa ETL, która umożliwia organizowanie przepływów pracy integracji i przekształcania danych. Usługa Azure Data Factory bezpośrednio obsługuje uruchamianie zadań usługi Azure Databricks w przepływie pracy, w tym notesów , zadań JAR i skryptów języka Python. Potok można również uwzględnić w przepływie pracy, wywołując interfejs API DLT z poziomu działania internetowego usługi Azure Data Factory . Aby wyzwolić na przykład aktualizację potoku w usłudze Azure Data Factory:
Utwórz fabrykę danych lub otwórz istniejącą fabrykę danych.
Po zakończeniu tworzenia, otwórz stronę usługi Data Factory i kliknij kafelek Otwórz Azure Data Factory Studio. Zostanie wyświetlony interfejs użytkownika usługi Azure Data Factory.
Utwórz nowy potok usługi Azure Data Factory, wybierając pozycję Potok z listy rozwijanej Nowy w interfejsie użytkownika Azure Data Factory Studio.
W przyborniku Aktywności rozwiń Ogólne i przeciągnij aktywność Web do obszaru potoku. Kliknij kartę Ustawienia i wprowadź następujące wartości:
Notatka
Najlepszym rozwiązaniem w zakresie zabezpieczeń w przypadku uwierzytelniania za pomocą zautomatyzowanych narzędzi, systemów, skryptów i aplikacji usługa Databricks zaleca używanie osobistych tokenów dostępu należących do jednostek usługi zamiast użytkowników obszaru roboczego. Aby utworzyć tokeny dla jednostek usługi, zobacz Zarządzanie tokenami dla jednostki usługi.
adres URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Zastąp
<get-workspace-instance>
.Zastąp
<pipeline-id>
identyfikatorem rury.Metoda: wybierz pozycję POST z menu rozwijanego.
Nagłówki: kliknij + Nowy. W polu tekstowym Nazwa wprowadź
Authorization
. W polu tekstowym wartość wprowadźBearer <personal-access-token>
.Zastąp
<personal-access-token>
osobistym tokenem dostępu usługi Azure Databricks.Treść: aby przekazać dodatkowe parametry żądania, wprowadź dokument JSON zawierający parametry. Aby na przykład uruchomić aktualizację i ponownie przetworzyć wszystkie dane w ramach rurociągu:
{"full_refresh": "true"}
. Jeśli nie ma żadnych dodatkowych parametrów żądania, wprowadź puste nawiasy klamrowe ({}
).
Aby przetestować aktywność sieci Web, kliknij Debug na pasku narzędzi potoku w interfejsie użytkownika Data Factory. Dane wyjściowe i stan przebiegu procesu, w tym błędy, są wyświetlane na karcie Wynik w potoku usługi Azure Data Factory. Użyj interfejsu użytkownika DLT, aby wyświetlić szczegóły aktualizacji potoku.
Napiwek
Typowym wymaganiem dotyczącym przepływu pracy jest uruchomienie zadania po zakończeniu poprzedniego zadania. Ponieważ żądanie updates
DLT jest asynchroniczne — żądanie jest zwracane po rozpoczęciu aktualizacji, ale przed ukończeniem aktualizacji — zadania w potoku usługi Azure Data Factory posiadające zależność od aktualizacji DLT muszą czekać na ukończenie aktualizacji. Opcją oczekiwania na ukończenie aktualizacji jest dodanie działania 'Until' po działaniu sieci Web, które wyzwala aktualizację DLT. W działaniu Until:
- Dodaj działanie Wait, aby poczekać skonfigurowaną liczbę sekund na ukończenie aktualizacji.
- Dodaj działanie sieciowe po działaniu Wait, które korzysta z żądania szczegółów aktualizacji DLT w celu uzyskania stanu aktualizacji. Pole
state
w odpowiedzi zwraca bieżący stan aktualizacji, w tym informację o tym, czy została zakończona. - Użyj wartości pola
state
, aby ustawić warunek zakończenia działania Until. Możesz również użyć działania Ustaw Zmienną, aby dodać zmienną potoku w oparciu o wartośćstate
i użyć tej zmiennej jako warunku zakończenia.