Folyamatkód fejlesztése a Pythonnal
A DLT számos új Python-kódszerkezetet vezet be a folyamat materializált nézeteinek és streamtábláinak meghatározásához. A csővezetékek fejlesztésének Python-támogatása a PySpark DataFrame és a strukturált adatfolyam API-k alapjaira épül.
A Pythonnal és DataFrame-ekkel nem ismert felhasználók számára a Databricks az SQL-felület használatát javasolja. Lásd: Folyamatkód fejlesztése AZ SQLhasználatával.
A DLT Python szintaxisának teljes referenciáját DLT Python nyelvi referenciacímű témakörben talál.
A Python alapjai folyamatfejlesztéshez
A DLT-adathalmazokat létrehozó Python-kódnak DataFrame-eket kell visszaadnia.
Az összes DLT Python API a dlt
modulban van implementálva. A Pythonnal implementált DLT-folyamatkódnak explicit módon importálnia kell a dlt
modult a Python-jegyzetfüzetek és -fájlok tetején.
A folyamatkonfiguráció során megadott katalógusba és sémába beolvassa és beírja az alapértelmezett értéket. Lásd: Célkatalógus és sémabeállítása.
A DLT-specifikus Python-kód egy kritikus módon különbözik a Többi Python-kódtípustól: a Python-folyamatkód nem hívja meg közvetlenül az adatbetöltést és átalakítást végző függvényeket DLT-adathalmazok létrehozásához. Ahelyett, a DLT értelmezi a dlt
modul dekorátorfüggvényeit az összes forráskódfájlban, amelyek folyamatba vannak konfigurálva, és egy adatfolyam gráfot hoz létre.
Fontos
A folyamat futtatásakor nem várt viselkedés elkerülése érdekében ne tartalmazzon olyan kódot, amely az adathalmazokat definiáló függvényekben esetlegesen mellékhatásokat okoz. A részletekért lásd a Python-referenciát.
Materializált nézet vagy streamelési tábla létrehozása a Pythonnal
A @dlt.table
dekoratőr arra utasítja a DLT-t, hogy a függvény által visszaadott eredmények alapján hozzon létre materializált nézetet vagy streamelési táblát. A kötegelt olvasás eredményei materializált nézetet hoznak létre, míg a folyamatos olvasás eredményei folyamatos táblázatot hoznak létre.
Alapértelmezés szerint a materializált nézetek és a streamelő táblák neveit a függvénynevekből következtetik. Az alábbi példakód a materializált nézet és a streamelési tábla létrehozásának alapszintaxisát mutatja be:
Jegyzet
Mindkét függvény ugyanarra a táblára hivatkozik a samples
katalógusban, és ugyanazt a dekorátor függvényt használja. Ezek a példák kiemelik, hogy a materializált nézetek és streamelési táblák alapszintaxisának egyetlen különbsége a spark.read
és a spark.readStream
használata.
Nem minden adatforrás támogatja a streamelési olvasásokat. Egyes adatforrásokat mindig streamelési szemantikával kell feldolgozni.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Igény szerint megadhatja a tábla nevét a @dlt.table
dekorátor name
argumentumával. Az alábbi példa ezt a mintát mutatja be egy materializált nézethez és streamelési táblához:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Adatok betöltése objektumtárolóból
A DLT az Azure Databricks által támogatott összes formátumból támogatja az adatok betöltését. Lásd: Adatformátum beállításai.
Jegyzet
Ezek a példák a munkaterületre automatikusan csatlakoztatott /databricks-datasets
alatt elérhető adatokat használják. A Databricks kötetútvonalak vagy felhőalapú URI-k használatát javasolja a felhőobjektum-tárolóban tárolt adatokra való hivatkozáshoz. Lásd Mik azok a Unity Catalog-kötetek?.
A Databricks az automatikus betöltő és a streamelő táblák használatát javasolja, amikor növekményes betöltési számítási feladatokat konfigurál a felhőobjektum-tárolóban tárolt adatokhoz. Lásd Mi az automatikus betöltő?.
Az alábbi példa létrehoz egy streamelési táblát JSON-fájlokból az Automatikus betöltő használatával:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Az alábbi példa kötegelt szemantika alkalmazásával olvas be egy JSON-könyvtárat és hoz létre egy materializált nézetet.
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Adatok validálása az elvárások alapján
Az elvárásokkal adatminőségi korlátozásokat állíthat be és kényszeríthet ki. Lásd: Az adatminőség kezelése a folyamatok elvárásaival.
Az alábbi kód @dlt.expect_or_drop
használ egy valid_data
nevű elvárás definiálásához, amely az adatbetöltés során null értékű rekordokat ad vissza:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
A folyamat során definiált materializált nézetek és streamelési táblák lekérdezése
Az alábbi példa négy adatkészletet határoz meg:
- Egy JSON-adatokat betöltő
orders
nevű streamelési tábla. - A
customers
nevű materializált nézet, amely betölti a CSV-adatokat. - A
orders
éscustomers
adathalmazok rekordjait összekapcsoló,customer_orders
nevű materializált nézet dátumra veti a rendelési időbélyeget, és kiválasztja acustomer_id
,order_number
,state
ésorder_date
mezőket. - Egy
daily_orders_by_state
nevű materializált nézet, amely összesíti az egyes államok napi rendeléseinek számát.
Jegyzet
A folyamat nézeteinek vagy tábláinak lekérdezésekor közvetlenül megadhatja a katalógust és a sémát, vagy használhatja a folyamatban konfigurált alapértelmezett beállításokat. Ebben a példában a orders
, customers
és customer_orders
táblák a folyamathoz konfigurált alapértelmezett katalógusból és sémából vannak megírva és olvasva.
Az örökölt közzétételi mód a LIVE
sémával kérdezi le a folyamatban definiált egyéb materializált nézeteket és streamtáblákat. Az új folyamatokban a LIVE
sémaszintaxisa csendben figyelmen kívül lesz hagyva. Lásd: LIVE séma (örökölt).
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Táblák létrehozása for
ciklusban
A Python for
ciklusokkal programozott módon hozhat létre több táblát. Ez akkor lehet hasznos, ha számos adatforrással vagy céladatkészletkel rendelkezik, amelyek csak néhány paramétertől függően változnak, így kevesebb teljes kóddal kell karbantartani és kevesebb kódredundanciát eredményezni.
A for
ciklus soros sorrendben értékeli ki a logikát, de ha az adathalmazok tervezése befejeződött, a folyamat párhuzamosan futtatja a logikát.
Fontos
Ha ezt a mintát használja adathalmazok definiálásához, győződjön meg arról, hogy a for
ciklusnak átadott értékek listája mindig additív. Ha egy folyamatban lévő munkafolyamatban korábban meghatározott adathalmazt kihagy egy későbbi folyamatfuttatásból, akkor az adathalmazt automatikusan eltávolítják a célsémából.
Az alábbi példa öt táblát hoz létre, amelyek régiónként szűrik az ügyfélrendeléseket. Itt a régiónév a cél materializált nézeteinek nevének beállítására és a forrásadatok szűrésére szolgál. Az ideiglenes nézetek a végleges materializált nézetek létrehozásához használt forrástáblákból származó illesztések definiálására szolgálnak.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Az alábbiakban egy példa látható a folyamat adatfolyam-grafikonjára:
Hibaelhárítás: for
hurok számos táblát hoz létre ugyanazokkal az értékekkel
A Python-kód kiértékelésére használt lusta végrehajtási modell megköveteli, hogy a logika közvetlenül hivatkozik az egyes értékekre a @dlt.table()
által dekorált függvény meghívásakor.
Az alábbi példa két helyes módszert mutat be a táblák for
hurokkal való definiálásához. Mindkét példában a tables
listából származó összes táblanévre kifejezetten hivatkozik a @dlt.table()
által dekorált függvény.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
Az alábbi példa nem helyesen hivatkozik a értékekre. Ez a példa különböző nevű táblákat hoz létre, de az összes tábla betölti az adatokat a for
ciklus utolsó értékéből:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)