Bagikan melalui


Menggunakan parameter dalam alur DLT

Artikel ini menjelaskan cara menggunakan konfigurasi alur DLT untuk membuat parameter kode alur.

Parameter referensi

Selama pembaruan, kode sumber alur Anda dapat mengakses parameter alur menggunakan sintaks untuk mendapatkan nilai untuk konfigurasi Spark.

Anda mereferensikan parameter alur menggunakan kunci . Nilai disuntikkan ke dalam kode sumber Anda sebagai string sebelum logika kode sumber Anda dievaluasi.

Sintaks contoh berikut menggunakan parameter dengan kunci source_catalog dan nilai dev_catalog untuk menentukan sumber data untuk tampilan materialisasi.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Mengatur parameter

Teruskan parameter ke pipeline dengan memasukkan pasangan kunci-nilai sembarang sebagai konfigurasi untuk pipeline. Anda dapat mengatur parameter saat menentukan atau mengedit konfigurasi alur menggunakan UI ruang kerja atau JSON. Lihat Konfigurasikan alur DLT.

Kunci parameter alur hanya dapat berisi karakter _ - . atau alfanumerik. Nilai parameter diatur sebagai string.

Parameter alur tidak mendukung nilai dinamis. Anda harus memperbarui nilai yang terkait dengan kunci dalam konfigurasi alur.

Penting

Jangan gunakan kata kunci yang bertentangan dengan alur yang dipesan atau nilai konfigurasi Apache Spark.

Membuat parameter deklarasi himpunan data di Python atau SQL

Kode Python dan SQL yang menentukan himpunan data Anda dapat diparameterkan oleh pengaturan alur. Parameterisasi memungkinkan kasus penggunaan berikut:

  • Memisahkan jalur panjang dan variabel lain dari kode Anda.
  • Mengurangi jumlah data yang diproses dalam lingkungan pengembangan atau penahapan untuk mempercepat pengujian.
  • Menggunakan kembali logika transformasi yang sama untuk diproses dari beberapa sumber data.

Contoh berikut menggunakan nilai konfigurasi startDate untuk membatasi alur pengembangan ke subset data input:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Mengontrol sumber data dengan parameter

Anda dapat menggunakan parameter alur untuk menentukan sumber data yang berbeda dalam konfigurasi yang berbeda dari alur yang sama.

Misalnya, Anda dapat menentukan jalur yang berbeda dalam konfigurasi pengembangan, pengujian, dan produksi untuk alur menggunakan variabel data_source_path lalu mereferensikannya menggunakan kode berikut:

SQL

CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
  '${data_source_path}',
  format => 'csv',
  header => true
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Pola ini bermanfaat untuk menguji bagaimana logika penyerapan mungkin menangani skema atau data cacat selama penyerapan awal. Anda dapat menggunakan kode yang identik di seluruh alur anda di semua lingkungan sambil mengalihkan himpunan data.