Udostępnij za pośrednictwem


Dodawanie zadań do prac w pakietach zasobów Databricks

Ten artykuł zawiera przykłady różnych typów zadań, które można dodać do zadań usługi Azure Databricks w pakietach zasobów usługi Databricks. Zobacz Co to są pakiety zasobów usługi Databricks?.

Większość typów zadań ma parametry specyficzne dla zadania wśród obsługiwanych ustawień, ale można również zdefiniować parametry zadania, które są przekazywane do zadań. Odwołania do wartości dynamicznych są obsługiwane dla parametrów zadania, co umożliwia przekazywanie wartości specyficznych dla przebiegu zadania między operacjami. Zobacz Co to jest odwołanie do wartości dynamicznej?.

Uwaga

Możesz nadpisać ustawienia zadania roboczego. Zobacz Nadpisywanie ustawień zadań w pakietach zasobów Databricks.

Napiwek

Aby szybko wygenerować konfigurację zasobu dla istniejącego zadania przy użyciu Databricks CLI, możesz użyć polecenia bundle generate job. Zobacz polecenia pakietów.

Zadanie notatnika

To zadanie służy do uruchamiania notatnika.

Poniższy przykład dodaje zadanie notatnika do pracy i ustawia parametr pracy o nazwie my_job_run_id. Ścieżka do notebooka jest relatywna do pliku konfiguracji, w którym określono to zadanie. Zadanie pobiera notatnik z jego wdrożonej lokalizacji w obszarze roboczym usługi Azure Databricks.

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: '{{job.run_id}}'

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, zobacz tasks > notebook_task w ładunku żądania operacji tworzenia zadania, jak zdefiniowano w POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonym w formacie YAML. Zobacz Zadanie notatnika dotyczące prac.

Zadanie z warunkiem if/else

condition_task umożliwia dodanie zadania z logiką warunkową if/else do zadania. Zadanie ocenia warunek, który może służyć do kontrolowania wykonywania innych zadań. Zadanie warunkowe nie wymaga klastra do wykonania i nie obsługuje ponownych prób ani powiadomień. Aby uzyskać więcej informacji na temat zadania if/else, zobacz Dodawanie logiki rozgałęziania do zadania za pomocą zadania If/else.

Poniższy przykład zawiera zadanie warunkowe i zadanie notatnika, które jest wykonywane tylko wtedy, gdy liczba napraw zadań jest mniejsza niż 5.

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: '{{job.repair_count}}'
            right: '5'
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: 'true'
          notebook_task:
            notebook_path: ../src/notebook.ipynb

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, zobacz tasks > condition_task w treści żądania operacji utworzenia zadania zdefiniowanej w POST /api/2.1/jobs/create w dokumentacji API REST w formacie YAML.

Dla każdego zadania

for_each_task umożliwia ci dodanie zadania za pomocą pętli 'for each' do projektu. Zadanie wykonuje zagnieżdżone zadanie dla każdych dostarczonych danych wejściowych. Aby uzyskać więcej informacji na temat for_each_task, zobacz Używanie zadania For each do uruchamiania innego zadania w pętli.

Poniższy przykład dodaje for_each_task do zadania, gdzie iteruje po wartościach innego zadania i je przetwarza.

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: '{{tasks.generate_countries_list.values.countries}}'
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, odwołaj się do tasks > for_each_task w ładunku żądania operacji tworzenia zadania zgodnie z definicją w punkcie POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonego w formacie YAML.

Zadanie skryptu języka Python

To zadanie służy do uruchamiania pliku w języku Python.

Poniższy przykład dodaje zadanie skryptu języka Python do zadania. Ścieżka pliku języka Python do wdrożenia jest względna względem pliku konfiguracji, w którym to zadanie jest zadeklarowane. Zadanie pobiera plik języka Python z wdrożonej lokalizacji w obszarze roboczym usługi Azure Databricks.

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, odwołaj się do tasks > spark_python_task w ładunku żądania operacji tworzenia zadania zgodnie z definicją w punkcie POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonego w formacie YAML. Zobacz również zadanie skryptu Pythona dla procesów.

Zadanie koła języka Python

To zadanie służy do uruchamiania pliku wheel języka Python.

W poniższym przykładzie do zadania dodano polecenie tworzenia pakietu Wheel dla języka Python. Ścieżka pliku koła Python do wdrożenia jest względna wobec pliku konfiguracyjnego, w którym to zadanie jest zadeklarowane. Zobacz zależności biblioteki Databricks Asset Bundles.

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, odwołaj się do tasks > python_wheel_task w ładunku żądania operacji tworzenia zadania zgodnie z definicją w punkcie POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonego w formacie YAML. Zobacz również Rozwijanie pliku koła Python za pomocą pakietów zasobów Databricks oraz Zadanie koła Python dla zleceń.

Zadanie JAR

To zadanie służy do uruchamiania pliku JAR. Możesz korzystać z lokalnych bibliotek JAR lub tych w przestrzeni roboczej, woluminie katalogu Unity lub w zewnętrznej lokalizacji w magazynie chmurowym. Zobacz zależności biblioteki Databricks Asset Bundles.

Poniższy przykład dodaje zadanie JAR do zadania. Ścieżka do pliku JAR prowadzi do określonej lokalizacji woluminu.

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, odwołaj się do tasks > spark_jar_task w ładunku żądania operacji tworzenia zadania zgodnie z definicją w punkcie POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonego w formacie YAML. Zobacz Zadanie JAR dla prac.

Zadanie pliku SQL

To zadanie służy do uruchamiania pliku SQL znajdującego się w obszarze roboczym lub zdalnym repozytorium Git.

Poniższy przykład dodaje zadanie pliku SQL do zadania. To zadanie dotyczące pliku SQL korzysta z określonego magazynu SQL Warehouse do uruchomienia tego pliku SQL.

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

Aby uzyskać identyfikator usługi SQL Warehouse, otwórz stronę ustawień usługi SQL Warehouse, a następnie skopiuj identyfikator znaleziony w nawiasach po nazwie magazynu w polu Nazwa na karcie Przegląd .

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, zobacz tasks > sql_task > file w ładunku żądania operacji tworzenia zadania zdefiniowanego jako POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonym w formacie YAML. Zobacz Zadanie SQL dla procesów.

Zadanie rurociągu DLT

Używasz tego zadania, aby uruchomić potok DLT. Zobacz Co to jest DLT?.

W poniższym przykładzie do zadania dodano zadanie potoku DLT. To zadanie uruchamia określony potok w ramach procesu DLT.

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

Identyfikator potoku można uzyskać, otwierając potok w wewnętrznym środowisku pracy i kopiując wartość Identyfikator potoku na zakładce Szczegóły potoku na stronie ustawień potoku.

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, odwołaj się do tasks > pipeline_task w ładunku żądania operacji tworzenia zadania zgodnie z definicją w punkcie POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonego w formacie YAML. Zobacz zadanie potoku DLT dotyczące prac.

dbt - zadanie

Używasz tego zadania do uruchamiania jednego lub więcej poleceń dbt. Zobacz Nawiązywanie połączenia z usługą dbt Cloud.

Poniższy przykład dodaje zadanie dbt do zadania. To zadanie dbt używa określonego magazynu SQL w celu uruchomienia określonych poleceń dbt.

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - 'dbt deps'
              - 'dbt seed'
              - 'dbt run'
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: 'dbt-databricks>=1.0.0,<2.0.0'

Aby uzyskać identyfikator usługi SQL Warehouse, otwórz stronę ustawień usługi SQL Warehouse, a następnie skopiuj identyfikator znaleziony w nawiasach po nazwie magazynu w polu Nazwa na karcie Przegląd .

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, odwołaj się do tasks > dbt_task w ładunku żądania operacji tworzenia zadania zgodnie z definicją w punkcie POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonego w formacie YAML. Zobacz dbt task for jobs.

Pakiety zasobów usługi Databricks zawierają również dbt-sql szablon projektu, który definiuje zadanie dbt, a także profile dbt dla wdrożonych zadań dbt. Aby uzyskać informacje na temat szablonów pakietów zasobów usługi Databricks, zobacz szablony pakietów domyślnych .

Uruchomienie zadania roboczego

To zadanie służy do uruchamiania innego zadania.

Poniższy przykład zawiera zadanie uruchamiania zadania w drugim zadaniu, które uruchamia pierwsze zadanie.

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: '13.3.x-scala2.12'
            node_type_id: 'i3.xlarge'
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

W tym przykładzie użyto podstawienia, aby pobrać identyfikator zadania do uruchomienia. Aby uzyskać identyfikator zadania z interfejsu użytkownika, otwórz zadanie w obszarze roboczym i skopiuj identyfikator z wartości Identyfikator zadania na karcie Szczegóły zadania na stronie ustawień zadania.

Aby uzyskać dodatkowe mapowania, które można ustawić dla tego zadania, odwołaj się do tasks > run_job_task w ładunku żądania operacji tworzenia zadania zgodnie z definicją w punkcie POST /api/2.1/jobs/create w dokumentacji referencyjnej API REST, wyrażonego w formacie YAML.