将任务添加到 Databricks 资产捆绑包中的作业
本文提供了一些可以在 Databricks Asset Bundles 中向 Azure Databricks 作业添加的不同类型任务的示例。 请参阅什么是 Databricks 资产捆绑包?。
大多数作业任务类型在其支持的设置中都具有特定于任务的参数,但你也可以定义传递给任务的作业参数。 作业参数支持动态值引用,从而支持在任务之间传递特定于作业运行的值。 请参阅什么是动态值引用?。
注意
可以替代作业任务设置。 请参阅替代 Databricks 资产捆绑包中的作业任务设置。
提示
若要使用 Databricks CLI 快速生成现有作业的资源配置,可以使用 bundle generate job
命令。 请参阅捆绑命令。
笔记本任务
使用此任务运行笔记本。
以下示例将笔记本任务添加到作业,并设置名为 my_job_run_id
的作业参数。 要部署的笔记本的路径相对于声明了此任务的配置文件。 该任务从其在 Azure Databricks 工作区中的部署位置获取笔记本。
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: '{{job.run_id}}'
有关为此任务设置的额外映射,请参见以 YAML 格式表示的 tasks > notebook_task
,该内容在 REST API 参考中 POST /api/2.1/jobs/create 的创建作业操作请求负载中定义。 请参阅用于作业的笔记本任务。
If/else 条件任务
通过 condition_task
,可以将具有 if/else 条件逻辑的任务添加到作业。 该任务评估可用于控制其他任务的执行的条件。 条件任务不需要群集执行,并且不支持重试或通知。 有关 if/else 任务的详细信息,请参阅使用 If/else 任务向作业添加分支逻辑。
以下示例包含条件任务和笔记本任务,其中笔记本任务仅在作业修复数小于 5 时执行。
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: '{{job.repair_count}}'
right: '5'
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: 'true'
notebook_task:
notebook_path: ../src/notebook.ipynb
有关可为每个任务设置的额外映射,请参见创建作业操作请求有效负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > condition_task
,以 YAML 格式表示。
对于每个任务
使用 for_each_task
,可以向作业中添加一个带有 for each 循环的任务。 每当提供输入时,该任务会执行一个嵌套任务。 如需获取有关 for_each_task
的详细信息,请参阅 。使用 For each
任务在循环中运行另一个任务。
以下示例将 for_each_task
添加到作业,它会在该作业中循环访问并处理另一个任务的值。
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: '{{tasks.generate_countries_list.values.countries}}'
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
有关可为每个任务设置的额外映射,请参见创建作业操作请求有效负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > for_each_task
,以 YAML 格式表示。
Python 脚本任务
使用此任务运行 Python 文件。
以下示例向作业添加 Python 脚本任务。 要部署的 Python 文件的路径是相对于声明此任务的配置文件的。 该任务从其在 Azure Databricks 工作区中的部署位置获取 Python 文件。
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
有关可以为此任务设置的附加映射信息,请参见创建任务操作请求负载中的 tasks > spark_python_task
(如 REST API 参考中的 POST /api/2.1/jobs/create 所定义),并以 YAML 格式表示。 另请参阅用于作业的 Python 脚本任务。
Python wheel 任务
使用此任务可运行 Python wheel 文件。
以下示例向作业添加 Python wheel 任务。 要部署的 Python wheel 文件的路径相对于声明了此任务的配置文件。 请参阅 Databricks 资产捆绑包的库依赖项。
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
有关可为每个任务设置的额外映射,请参见创建作业操作请求有效负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > python_wheel_task
,以 YAML 格式表示。 另请参阅使用 Databricks 资产捆绑包开发 Python wheel 文件和用于作业的 Python wheel 任务。
JAR 任务
您使用此任务来运行 JAR 文件。 可以引用本地 JAR 库或工作区、Unity Catalog 卷或外部云存储位置中的 JAR 库。 请参阅 Databricks 资产捆绑包的库依赖项。
以下示例向作业添加 JAR 任务。 JAR 的路径指向指定的卷位置。
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
有关可为此任务设置的额外映射,请参见 REST API 参考中定义的创建作业操作请求负载中的 tasks > spark_jar_task
,如 POST /api/2.1/jobs/create,以 YAML 格式表示。 请参阅用于作业的 JAR 任务。
SQL 文件任务
使用此任务运行位于工作区或远程 Git 存储库中的 SQL 文件。
以下示例向作业添加 SQL 文件任务。 此 SQL 文件任务使用指定的 SQL 仓库来运行指定的 SQL 文件。
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概览”选项卡上“名称”字段中仓库名称后面括号中的 ID。。
有关可为每个任务设置的额外映射,请参见创建作业操作请求有效负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > sql_task > file
,以 YAML 格式表示。 请参阅用于作业的 SQL 任务。
DLT 管道任务
使用此任务运行 DLT 管道。 请参阅 什么是 DLT?。
以下示例向作业添加 DLT 管道任务。 此 DLT 管道任务运行指定的管道。
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
可以通过以下方法来获取管道的 ID:在工作区中打开管道,并在管道设置页的“管道详细信息”选项卡上复制“管道 ID”值。
有关您可以为此任务设置的附加映射,请参阅在 REST API 参考文档中,POST /api/2.1/jobs/create 定义的创建作业操作请求负载中的 tasks > pipeline_task
,以 YAML 格式表示。 请参阅用于作业的 DLT 管道任务。
dbt 任务
使用此任务运行一个或多个 dbt 命令。 请参阅连接到 dbt Cloud。
以下示例向作业添加 dbt 任务。 此 dbt 任务使用指定的 SQL 仓库来运行指定的 dbt 命令。
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- 'dbt deps'
- 'dbt seed'
- 'dbt run'
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: 'dbt-databricks>=1.0.0,<2.0.0'
若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概览”选项卡上“名称”字段中仓库名称后面括号中的 ID。。
关于您可以为此任务设置的额外映射,请参阅 REST API 参考中定义为 POST /api/2.1/jobs/create 的创建作业操作请求负载中的 tasks > dbt_task
,以 YAML 格式表示。 请参阅用于作业的 dbt 任务。
Databricks 资产捆绑包还包含一个 dbt-sql
项目模板,该模板定义一个具有 dbt 任务的作业,以及已部署的 dbt 作业的 dbt 配置文件。 有关 Databricks 资产捆绑包模板的信息,请参阅 默认捆绑模板。
运行作业任务
使用此任务运行另一个作业。
以下示例在第二个作业中包含了运行第一个作业的运行作业任务。
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: '13.3.x-scala2.12'
node_type_id: 'i3.xlarge'
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
此示例使用替换来检索要运行的作业的 ID。 若要从 UI 获取作业的 ID,请在工作区中打开该作业,并从作业设置页的“作业详细信息”选项卡中的“作业 ID”值复制 ID。
有关可为此任务设置的额外映射,请参见在 REST API 参考中的 POST /api/2.1/jobs/create 中定义的创建作业操作的请求负载 tasks > run_job_task
,以 YAML 格式表示。