Megosztás a következőn keresztül:


Paraméterek használata DLT csatornákkal

Ez a cikk bemutatja, hogyan használhatja a DLT-folyamatkonfigurációkat a folyamatkód paraméterezéséhez.

Referenciaparaméterek

A frissítések során a folyamat forráskódja szintaxissal érheti el a folyamatparamétereket a Spark-konfigurációk értékeinek lekéréséhez.

A kulcs használatával hivatkozhat a folyamatparaméterekre. Az érték sztringként lesz beszúrva a forráskódba, mielőtt a forráskód logikája kiértékelné.

Az alábbi példaszintaxis egy paramétert használ, amelynek kulcsa source_catalog és értéke dev_catalog a materializált nézet adatforrásának megadásához.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Piton

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Paraméterek beállítása

Adja át a paramétereket a folyamatoknak úgy, hogy tetszőleges kulcs-érték párokat ad át a folyamat konfigurációjaként. A folyamatkonfigurációk definiálása vagy szerkesztése során paramétereket állíthat be a munkaterület felhasználói felületén vagy JSON-jának használatával. Lásd: Egy DLT-folyamat konfigurálása.

A folyamatparaméterkulcsok csak _ - . vagy alfanumerikus karaktereket tartalmazhatnak. A paraméterértékek sztringekként vannak beállítva.

A folyamatparaméterek nem támogatják a dinamikus értékeket. Frissítenie kell a folyamatkonfiguráció egyik kulcsához társított értéket.

Fontos

Ne használjon olyan kulcsszavakat, amelyek ütköznek a foglalva lévő pipeline vagy az Apache Spark konfigurációs értékeivel.

Adathalmaz deklarációinak paraméterezése Pythonban vagy SQL-ben

Az adathalmazokat meghatározó Python- és SQL-kód paraméterezhető a folyamat beállításai alapján. A paraméterezés a következő használati eseteket teszi lehetővé:

  • Hosszú útvonalak és más változók elválasztása a kódtól.
  • A tesztelés felgyorsítása érdekében csökkentse a fejlesztési vagy előkészítési környezetekben feldolgozott adatok mennyiségét.
  • Ugyanazt az átalakítási logikát használja újra több adatforrásból történő feldolgozáshoz.

Az alábbi példa a startDate konfigurációs értékkel korlátozza a fejlesztési folyamatot a bemeneti adatok egy részhalmazára:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Adatforrások vezérlése paraméterekkel

A folyamatparaméterekkel különböző adatforrásokat adhat meg ugyanazon folyamat különböző konfigurációiban.

Megadhat például különböző elérési utakat egy folyamat fejlesztési, tesztelési és éles konfigurációiban a data_source_path változó használatával, majd hivatkozhat rá a következő kóddal:

SQL

CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
  '${data_source_path}',
  format => 'csv',
  header => true
)

Piton

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Ez a minta hasznos annak teszteléséhez, hogy a betöltési logika hogyan kezelheti a sémát vagy a hibásan formázott adatokat a kezdeti betöltés során. Az azonos kódot a teljes folyamat során használhatja minden környezetben az adathalmazok kiváltása közben.