将任务添加到 Databricks 资产捆绑包中的作业

本文提供了一些可以在 Databricks Asset Bundles 中向 Azure Databricks 作业添加的不同类型任务的示例。 请参阅什么是 Databricks 资产捆绑包?

大多数作业任务类型在其支持的设置中都具有特定于任务的参数,但你也可以定义传递给任务的作业参数。 作业参数支持动态值引用,从而支持在任务之间传递特定于作业运行的值。 请参阅什么是动态值引用?

注意

可以替代作业任务设置。 请参阅替代 Databricks 资产捆绑包中的作业任务设置

提示

若要使用 Databricks CLI 快速生成现有作业的资源配置,可以使用 bundle generate job 命令。 请参阅捆绑命令

笔记本任务

使用此任务运行笔记本。

以下示例将笔记本任务添加到作业,并设置名为 my_job_run_id 的作业参数。 要部署的笔记本的路径相对于声明了此任务的配置文件。 该任务从其在 Azure Databricks 工作区中的部署位置获取笔记本。

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: '{{job.run_id}}'

有关为此任务设置的额外映射,请参见以 YAML 格式表示的 tasks > notebook_task,该内容在 REST API 参考中 POST /api/2.1/jobs/create 的创建作业操作请求负载中定义。 请参阅用于作业的笔记本任务

If/else 条件任务

通过 condition_task,可以将具有 if/else 条件逻辑的任务添加到作业。 该任务评估可用于控制其他任务的执行的条件。 条件任务不需要群集执行,并且不支持重试或通知。 有关 if/else 任务的详细信息,请参阅使用 If/else 任务向作业添加分支逻辑

以下示例包含条件任务和笔记本任务,其中笔记本任务仅在作业修复数小于 5 时执行。

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: '{{job.repair_count}}'
            right: '5'
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: 'true'
          notebook_task:
            notebook_path: ../src/notebook.ipynb

有关可为每个任务设置的额外映射,请参见创建作业操作请求有效负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > condition_task,以 YAML 格式表示。

对于每个任务

使用 for_each_task,可以向作业中添加一个带有 for each 循环的任务。 每当提供输入时,该任务会执行一个嵌套任务。 如需获取有关 for_each_task的详细信息,请参阅 。使用 For each 任务在循环中运行另一个任务

以下示例将 for_each_task 添加到作业,它会在该作业中循环访问并处理另一个任务的值。

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: '{{tasks.generate_countries_list.values.countries}}'
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

有关可为每个任务设置的额外映射,请参见创建作业操作请求有效负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > for_each_task,以 YAML 格式表示。

Python 脚本任务

使用此任务运行 Python 文件。

以下示例向作业添加 Python 脚本任务。 要部署的 Python 文件的路径是相对于声明此任务的配置文件的。 该任务从其在 Azure Databricks 工作区中的部署位置获取 Python 文件。

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

有关可以为此任务设置的附加映射信息,请参见创建任务操作请求负载中的 tasks > spark_python_task(如 REST API 参考中的 POST /api/2.1/jobs/create 所定义),并以 YAML 格式表示。 另请参阅用于作业的 Python 脚本任务

Python wheel 任务

使用此任务可运行 Python wheel 文件。

以下示例向作业添加 Python wheel 任务。 要部署的 Python wheel 文件的路径相对于声明了此任务的配置文件。 请参阅 Databricks 资产捆绑包的库依赖项

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

有关可为每个任务设置的额外映射,请参见创建作业操作请求有效负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > python_wheel_task,以 YAML 格式表示。 另请参阅使用 Databricks 资产捆绑包开发 Python wheel 文件用于作业的 Python wheel 任务

JAR 任务

您使用此任务来运行 JAR 文件。 可以引用本地 JAR 库或工作区、Unity Catalog 卷或外部云存储位置中的 JAR 库。 请参阅 Databricks 资产捆绑包的库依赖项

以下示例向作业添加 JAR 任务。 JAR 的路径指向指定的卷位置。

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

有关可为此任务设置的额外映射,请参见 REST API 参考中定义的创建作业操作请求负载中的 tasks > spark_jar_task,如 POST /api/2.1/jobs/create,以 YAML 格式表示。 请参阅用于作业的 JAR 任务

SQL 文件任务

使用此任务运行位于工作区或远程 Git 存储库中的 SQL 文件。

以下示例向作业添加 SQL 文件任务。 此 SQL 文件任务使用指定的 SQL 仓库来运行指定的 SQL 文件。

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概览”选项卡上“名称”字段中仓库名称后面括号中的 ID。

有关可为每个任务设置的额外映射,请参见创建作业操作请求有效负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > sql_task > file,以 YAML 格式表示。 请参阅用于作业的 SQL 任务

DLT 管道任务

使用此任务运行 DLT 管道。 请参阅 什么是 DLT?

以下示例向作业添加 DLT 管道任务。 此 DLT 管道任务运行指定的管道。

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

可以通过以下方法来获取管道的 ID:在工作区中打开管道,并在管道设置页的“管道详细信息”选项卡上复制“管道 ID”值。

有关您可以为此任务设置的附加映射,请参阅在 REST API 参考文档中,POST /api/2.1/jobs/create 定义的创建作业操作请求负载中的 tasks > pipeline_task,以 YAML 格式表示。 请参阅用于作业的 DLT 管道任务

dbt 任务

使用此任务运行一个或多个 dbt 命令。 请参阅连接到 dbt Cloud

以下示例向作业添加 dbt 任务。 此 dbt 任务使用指定的 SQL 仓库来运行指定的 dbt 命令。

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - 'dbt deps'
              - 'dbt seed'
              - 'dbt run'
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: 'dbt-databricks>=1.0.0,<2.0.0'

若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概览”选项卡上“名称”字段中仓库名称后面括号中的 ID。

关于您可以为此任务设置的额外映射,请参阅 REST API 参考中定义为 POST /api/2.1/jobs/create 的创建作业操作请求负载中的 tasks > dbt_task,以 YAML 格式表示。 请参阅用于作业的 dbt 任务

Databricks 资产捆绑包还包含一个 dbt-sql 项目模板,该模板定义一个具有 dbt 任务的作业,以及已部署的 dbt 作业的 dbt 配置文件。 有关 Databricks 资产捆绑包模板的信息,请参阅 默认捆绑模板

运行作业任务

使用此任务运行另一个作业。

以下示例在第二个作业中包含了运行第一个作业的运行作业任务。

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: '13.3.x-scala2.12'
            node_type_id: 'i3.xlarge'
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

此示例使用替换来检索要运行的作业的 ID。 若要从 UI 获取作业的 ID,请在工作区中打开该作业,并从作业设置页的“作业详细信息”选项卡中的“作业 ID”值复制 ID。

有关可为此任务设置的额外映射,请参见在 REST API 参考中的 POST /api/2.1/jobs/create 中定义的创建作业操作的请求负载 tasks > run_job_task,以 YAML 格式表示。