Paraméterek használata DLT csatornákkal
Ez a cikk bemutatja, hogyan használhatja a DLT-folyamatkonfigurációkat a folyamatkód paraméterezéséhez.
Referenciaparaméterek
A frissítések során a folyamat forráskódja szintaxissal érheti el a folyamatparamétereket a Spark-konfigurációk értékeinek lekéréséhez.
A kulcs használatával hivatkozhat a folyamatparaméterekre. Az érték sztringként lesz beszúrva a forráskódba, mielőtt a forráskód logikája kiértékelné.
Az alábbi példaszintaxis egy paramétert használ, amelynek kulcsa source_catalog
és értéke dev_catalog
a materializált nézet adatforrásának megadásához.
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Piton
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Paraméterek beállítása
Adja át a paramétereket a folyamatoknak úgy, hogy tetszőleges kulcs-érték párokat ad át a folyamat konfigurációjaként. A folyamatkonfigurációk definiálása vagy szerkesztése során paramétereket állíthat be a munkaterület felhasználói felületén vagy JSON-jának használatával. Lásd: Egy DLT-folyamat konfigurálása.
A folyamatparaméterkulcsok csak _ - .
vagy alfanumerikus karaktereket tartalmazhatnak. A paraméterértékek sztringekként vannak beállítva.
A folyamatparaméterek nem támogatják a dinamikus értékeket. Frissítenie kell a folyamatkonfiguráció egyik kulcsához társított értéket.
Fontos
Ne használjon olyan kulcsszavakat, amelyek ütköznek a foglalva lévő pipeline vagy az Apache Spark konfigurációs értékeivel.
Adathalmaz deklarációinak paraméterezése Pythonban vagy SQL-ben
Az adathalmazokat meghatározó Python- és SQL-kód paraméterezhető a folyamat beállításai alapján. A paraméterezés a következő használati eseteket teszi lehetővé:
- Hosszú útvonalak és más változók elválasztása a kódtól.
- A tesztelés felgyorsítása érdekében csökkentse a fejlesztési vagy előkészítési környezetekben feldolgozott adatok mennyiségét.
- Ugyanazt az átalakítási logikát használja újra több adatforrásból történő feldolgozáshoz.
Az alábbi példa a startDate
konfigurációs értékkel korlátozza a fejlesztési folyamatot a bemeneti adatok egy részhalmazára:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Adatforrások vezérlése paraméterekkel
A folyamatparaméterekkel különböző adatforrásokat adhat meg ugyanazon folyamat különböző konfigurációiban.
Megadhat például különböző elérési utakat egy folyamat fejlesztési, tesztelési és éles konfigurációiban a data_source_path
változó használatával, majd hivatkozhat rá a következő kóddal:
SQL
CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
'${data_source_path}',
format => 'csv',
header => true
)
Piton
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Ez a minta hasznos annak teszteléséhez, hogy a betöltési logika hogyan kezelheti a sémát vagy a hibásan formázott adatokat a kezdeti betöltés során. Az azonos kódot a teljes folyamat során használhatja minden környezetben az adathalmazok kiváltása közben.