DLT Python nyelvi referencia
Ez a cikk a DLT Python programozási felületének részleteit tartalmazza.
Az SQL API-val kapcsolatos információkért lásd a DLT SQL nyelvi referencia.
Az automatikus betöltő konfigurálásával kapcsolatos részletekért lásd Mi az automatikus betöltő?.
Kezdés előtt
A folyamatok DLT Python-felülettel történő implementálása során az alábbiakat érdemes figyelembe venni:
- Mivel a Folyamatfrissítés tervezése és futtatása során a Python
table()
ésview()
függvények többször is meghívásra kerülnek, ne tartalmazzon olyan kódot ezen függvények egyikében, amelyek mellékhatásai lehetnek (például az adatokat módosító vagy e-mailt küldő kód). A váratlan viselkedés elkerülése érdekében az adathalmazokat meghatározó Python-függvényeknek csak a tábla vagy nézet definiálásához szükséges kódot kell tartalmazniuk. - Ha olyan műveleteket szeretne végrehajtani, mint az e-mailek küldése vagy egy külső figyelési szolgáltatással való integráció, különösen az adathalmazokat meghatározó funkciókban, használja eseményhookokat. Ha ezeket a műveleteket az adathalmazokat meghatározó függvényekben implementálja, az váratlan viselkedést fog okozni.
- A Python
table
ésview
függvénynek dataFrame-et kell visszaadnia. A DataFrame-en működő egyes függvények nem adnak vissza DataFrame-eket, ezért nem használhatók. Ilyen műveletek például acollect()
,count()
,toPandas()
,save()
éssaveAsTable()
. Mivel a DataFrame-átalakítások végrehajtása a teljes adatfolyam-gráf feloldása után történik, az ilyen műveletek használata nem várt mellékhatásokat eredményezhet.
A dlt
Python-modul importálása
A DLT Python-függvények a dlt
modulban vannak definiálva. A Python API-val implementált folyamatoknak importálnia kell ezt a modult:
import dlt
DLT materializált nézet vagy streamelési tábla létrehozása
A Pythonban a DLT határozza meg, hogy egy adathalmazt materializált nézetként vagy streamelési táblaként kell-e frissíteni a definiáló lekérdezés alapján. A @table
dekoratőr használható a materializált nézetek és a streamelési táblák definiálására is.
Ha materializált nézetet szeretne definiálni a Pythonban, alkalmazzon @table
egy olyan lekérdezésre, amely statikus olvasást végez egy adatforráson. Streamelési tábla definiálásához alkalmazzon @table
egy olyan lekérdezésre, amely streamelési olvasást végez egy adatforráson, vagy használja a create_streaming_table() függvényt. Mindkét adathalmaztípus szintaxis-specifikációja megegyezik az alábbiakkal:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
DLT-nézet létrehozása
A nézet Pythonban való definiálásához alkalmazza a @view
dekoratőrt. A @table
dekoratőrhöz hasonlóan a DLT nézetei statikus vagy streamelt adathalmazokhoz is használhatók. A Nézetek Pythonnal való definiálása a következő szintaxist tartalmazza:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Példa: Táblák és nézetek definiálása
Táblázat vagy nézet Pythonban való definiálásához alkalmazza a @dlt.view
vagy @dlt.table
dekorátort egy függvényre. A tábla vagy a nézetnév hozzárendeléséhez használhatja a függvény nevét vagy a name
paramétert. Az alábbi példa két különböző adatkészletet határoz meg: egy taxi_raw
nevű nézetet, amely egy JSON-fájlt fogad bemeneti forrásként, és egy filtered_data
nevű táblát, amely bemenetként veszi fel a taxi_raw
nézetet:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
Példa: Hozzáférés az ugyanabban a folyamatban definiált adatkészlethez
Jegyzet
Bár a dlt.read()
és dlt.read_stream()
függvények továbbra is elérhetők és teljes mértékben támogatottak a DLT Python-felületen, a Databricks a spark.read.table()
és spark.readStream.table()
függvények használatát javasolja az alábbiak miatt:
- A
spark
függvények támogatják a belső és külső adatkészletek olvasását, beleértve a külső tárolóban lévő vagy más folyamatokban definiált adathalmazokat is. Adlt
függvények csak belső adathalmazok olvasását támogatják. - A
spark
függvények támogatják az opciók meghatározását az olvasási műveletekhez, példáulskipChangeCommits
. A beállítások megadását adlt
függvények nem támogatják.
Az ugyanabban a folyamatban definiált adathalmaz eléréséhez használja a spark.read.table()
vagy spark.readStream.table()
függvényeket:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
Jegyzet
A folyamat nézeteinek vagy tábláinak lekérdezésekor közvetlenül megadhatja a katalógust és a sémát, vagy használhatja a folyamatban konfigurált alapértelmezett beállításokat. Ebben a példában a customers
tábla a folyamathoz konfigurált alapértelmezett katalógusból és sémából van megírva és beolvasva.
Példa: Olvasás metaadattárban regisztrált táblából
Ha a Hive metaadattárban regisztrált táblából szeretne adatokat olvasni, a függvényargumentumban a tábla nevét az adatbázis nevével minősítheti:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Egy Unity Catalog-táblából való olvasásra példa: adatok betöltése Unity Catalog-folyamatba.
Példa: Adatkészlet elérése spark.sql
használatával
Az adathalmazokat egy lekérdezési függvényben lévő spark.sql
kifejezéssel is visszaadhatja. Ha belső adatkészletből szeretne olvasni, a nevet nem minősítetten használhatja az alapértelmezett katalógus és séma használatára, vagy elő is használhatja őket:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
Rekordok végleges törlése materializált nézetből vagy streamelési táblából
Ha véglegesen törölni szeretné a rekordokat egy materializált nézetből vagy streamelőtáblából, és engedélyezve van a törlési vektorok használata, például a GDPR-megfelelőség érdekében, további műveleteket kell végrehajtani az objektum alapjául szolgáló Delta-táblákon. A rekordok materializált nézetből való törlésének biztosításához lásd: Rekordok végleges törlése materializált nézetben, engedélyezett törlési vektorokkal. A rekordok streamelési táblából való törlésének biztosításához lásd: Rekordok végleges törlése streamelési táblából.
Írás külső eseménystreamelési szolgáltatásokba vagy Delta-táblákba a DLT sink
API-val
Fontos
A DLT sink
API nyilvános előzetes verzióban van.
Jegyzet
- A teljes frissítés futtatása nem törli az adatokat a tárolóhelyekről. Az újrafeldolgozott adatok hozzá lesznek fűzve a kimeneti tárolóhoz, és a meglévő adatok nem módosulnak.
- A DLT-elvárások nem támogatottak a
sink
API-val.
Ha olyan eseménystreamelési szolgáltatásba szeretne írni, mint az Apache Kafka vagy az Azure Event Hubs, vagy egy DLT-folyamatból egy Delta-táblába, használja a create_sink()
Python-modulban található dlt
függvényt. Miután létrehozott egy gyűjtőhelyet a create_sink()
függvénnyel, a gyűjtőhelyet egy hozzáfűzési folyamatban használja az adatok hozzáfűzésére. A hozzáfűzési áramlás az egyetlen áramlástípus, amelyet az create_sink()
függvény támogat. Más folyamattípusok, például apply_changes
, nem támogatottak.
A következő szintaxissal hozhat létre fogadót a create_sink()
függvénnyel:
create_sink(<sink_name>, <format>, <options>)
Érvek |
---|
name Típus: str Egy karakterlánc, amely azonosítja a csatornát, és a csatorna hivatkozására és kezeléséhez szolgál. A kimenetneveknek egyedinek kell lenniük a csővezetékhez, beleértve az összes forráskódot, mint például a jegyzetfüzeteket vagy a csővezeték részét képező modulokat. Ez a paraméter kötelező. |
format Típus: str A kimeneti formátumot definiáló sztring, kafka vagy delta .Ez a paraméter kötelező. |
options Típus: dict A kimeneti lehetőségek választható listája, {"key": "value"} formátumban, ahol a kulcs és az érték egyaránt szöveg. Minden, a Kafka és a Delta fogadók által támogatott Databricks futtatókörnyezeti beállítás támogatott. A Kafka beállításaiért lásd: A Kafka strukturált streamelési írójának konfigurálása. A Delta beállításaiért lásd: Delta-tábla fogadóként. |
Példa: Egy Kafka csatlakozó létrehozása a create_sink()
függvénnyel
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
Példa: Delta adattároló létrehozása a create_sink()
függvénnyel és a fájlrendszer elérési útjával
Az alábbi példa bemutatja, hogyan lehet egy adatforrást létrehozni, amely egy Delta-táblába ír a fájlrendszer elérési útjának megadásával.
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
Példa: Delta adatfogadó létrehozása a create_sink()
függvénnyel és egy Unity Catalog táblanévvel
Jegyzet
A Delta sink támogatja a Unity Catalog külső és felügyelt tábláit, valamint a Hive metaadattár által felügyelt táblákat. A táblaneveknek teljes mértékben minősítettnek kell lenniük. A Unity Catalog tábláinak például háromszintű azonosítót kell használniuk: <catalog>.<schema>.<table>
. A Hive metaadattártábláinak <schema>.<table>
kell használniuk.
Az alábbi példa egy adatfogadót hoz létre, amely egy Delta-táblába ír úgy, hogy átadja egy tábla nevét a Unity Catalogban.
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
Példa: Hozzáfűzési művelet használata a Delta-fogadóba való íráshoz
Az alábbi példa létrehoz egy olyan fogadót, amely egy Delta-táblába ír, majd létrehoz egy hozzáfűzési folyamatot a fogadóba való íráshoz:
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
Példa: Egy hozzáfűzési folyamat használata egy Kafka-csatornába való íráshoz
Az alábbi példa létrehoz egy kimenetet, amely egy Kafka témába ír, majd létrehoz egy hozzáfűzési adatfolyamot a kimenetbe való íráshoz.
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
A Kafkába írt DataFrame sémájának tartalmaznia kell azokat az oszlopokat, amelyeket a A Kafka strukturált streamelési írójának konfigurálása meg van adva.
Streamelési műveletek céltáblájának létrehozása
A create_streaming_table()
függvénnyel létrehozhat egy céltáblát a streamelési műveletek által előállított rekordokhoz, beleértve a apply_changes(), apply_changes_from_snapshot()és @append_flow kimeneti rekordokat.
Jegyzet
A create_target_table()
és create_streaming_live_table()
függvények elavultak. A Databricks a meglévő kód frissítését javasolja a create_streaming_table()
függvény használatához.
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Érvek |
---|
name Típus: str A tábla neve. Ez a paraméter kötelező. |
comment Típus: str A táblázat opcionális leírása. |
spark_conf Típus: dict A lekérdezés végrehajtásához szükséges Spark-konfigurációk választható listája. |
table_properties Típus: dict A tábla táblatulajdonságainak választható listája. |
partition_cols Típus: array A tábla particionálásához használandó egy vagy több oszlop választható listája. |
cluster_by Típus: array Opcióként engedélyezze a folyékony fürtözést a táblán, és definiálja a fürtözési kulcsként használni kívánt oszlopokat. Lásd A Delta-táblákfolyékony fürtözésének használata című témakört. |
path Típus: str A táblaadatok opcionális tárolási helye. Ha nincs beállítva, akkor a rendszer alapértelmezettként a tárolócsővezeték helyét használja. |
schema Típus: str vagy StructType A tábla számára választható sémadefiníció. A sémák definiálhatók SQL DDL-sztringként vagy Pythonnal StructType . |
expect_all expect_all_or_drop expect_all_or_fail Típus: dict A tábla opcionális adatminőségi korlátozásai. Lásd: több elvárás. |
row_filter (nyilvános előzetes verzió)Típus: str A tábla opcionális sorszűrő záradéka. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal. |
A táblák materializálásának szabályozása
A táblák a materializációjuk további szabályozását is lehetővé teszi:
- Adja meg, hogyan lehet a táblákat klaszterezni
cluster_by
felhasználásával. A lekérdezések felgyorsításához folyékony fürtözést használhat. Lásd A Delta-táblákfolyékony fürtözésének használata című témakört. - Adja meg, hogyan vannak a táblák particionálva a
partition_cols
használatával. - Nézet vagy tábla definiálásakor táblatulajdonságokat állíthat be. Lásd a DLT tábla tulajdonságait.
- A táblaadatok tárolási helyének beállítása a
path
beállítással. A táblaadatok alapértelmezés szerint a folyamat tárolási helyén lesznek tárolva, hapath
nincs beállítva. - A sémád definíciójában használhat létrehozott oszlopokat. Lásd: Példa: Séma és klaszter oszlopok megadása.
Megjegyzés
Az 1 TB-nál kisebb méretű táblák esetében a Databricks azt javasolja, hogy a DLT vezérelje az adatszervezést. Nem szükséges megadni partícióoszlopokat, hacsak nem számít arra, hogy a táblája meghaladja egy terabájtot.
Példa: Séma és fürtoszlopok megadása
A táblázatsémát választhatóan megadhatja Python StructType
vagy SQL DDL karakterlánc használatával. Ha DDL-sztringgel van megadva, a definíció tartalmazhat létrehozott oszlopokat.
Az alábbi példa létrehoz egy sales
nevű táblát egy Python-StructType
használatával megadott sémával:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Az alábbi példa egy tábla sémáját adja meg egy DDL karakterlánc használatával, meghatároz egy generált oszlopot, és klaszterezési oszlopokat határoz meg.
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
Alapértelmezés szerint a DLT a sémát a table
definíciójából következteti, ha nem ad meg sémát.
Példa: Partícióoszlopok megadása
Az alábbi példa egy tábla sémáját határozza meg egy DDL-sztring használatával, definiál egy generált oszlopot, és definiál egy partícióoszlopot:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Példa: Táblakorlátozások definiálása
Fontos
A táblakorlátozások a nyilvános előzetes változatban vannak.
Séma megadásakor megadhatja az elsődleges és az idegen kulcsokat. A korlátozások tájékoztató jellegűek, és nincsenek kényszerítve. Tekintse meg a CONSTRAINT záradékot az SQL nyelvi hivatkozásában.
Az alábbi példa egy elsődleges és idegenkulcs-korlátozással rendelkező táblát határoz meg:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Példa: Sorszűrő és oszlopmaszk definiálása
Fontos
A sorszűrők és az oszlopmaszkok nyilvános előzetes verziójú.
Ha materializált nézetet vagy streamelési táblát szeretne létrehozni sorszűrővel és oszlopmaszkkal, használja a ROW FILTER záradékot és a MASZK záradékot. Az alábbi példa bemutatja, hogyan definiálhat materializált nézetet és streamelési táblát sorszűrővel és oszlopmaszkkal is:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
További információ a sorszűrőkről és az oszlopmaszkokról: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.
Stream-tábla konfigurálása a forrásstreamelési tábla módosításainak figyelmen kívül hagyásához
Jegyzet
- A
skipChangeCommits
jelző csak akkor működik, ha aspark.readStream
aoption()
függvényt használja. Ezt a jelölőt nem használhatjadlt.read_stream()
függvényben. - Nem használhatja a
skipChangeCommits
jelzőt, ha a forrásstreamelési tábla egy apply_changes() függvény céljaként van definiálva.
Alapértelmezés szerint a streamelési táblák csak hozzáfűző forrásokat igényelnek. Ha egy streamelő tábla egy másik streamelési táblát használ forrásként, és a forrásstreamelési tábla frissítéseket vagy törléseket igényel, például a GDPR "az elfelejtettséghez való jog" feldolgozását, a skipChangeCommits
jelző a forrásstreamelési tábla olvasásakor beállítható, hogy figyelmen kívül hagyja ezeket a módosításokat. A jelölőről további információt a Frissítések és törlések mellőzésecímű témakörben talál.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Python DLT-tulajdonságok
Az alábbi táblázatok ismertetik a DLT-vel rendelkező táblák és nézetek definiálása során megadható beállításokat és tulajdonságokat:
@table vagy @view |
---|
name Típus: str A tábla vagy nézet opcionális elnevezése. Ha nincs megadva, a függvény neve tábla- vagy nézetnévként lesz használva. |
comment Típus: str A táblázat opcionális leírása. |
spark_conf Típus: dict A lekérdezés végrehajtásához szükséges Spark-konfigurációk választható listája. |
table_properties Típus: dict A táblatulajdonságok választható listája a tábla számára. |
path Típus: str A táblaadatok opcionális tárolási helye. Ha nincs beállítva, a rendszer alapértelmezés szerint a folyamat tárhely helyét használja. |
partition_cols Típus: a collection of str Választható gyűjtemény, például egy vagy több oszlopból álló list a tábla particionálásához. |
cluster_by Típus: array Opcióként engedélyezze a folyékony fürtözést a táblán, és definiálja a fürtözési kulcsként használni kívánt oszlopokat. Lásd A Delta-táblákfolyékony fürtözésének használata című témakört. |
schema Típus: str vagy StructType Egy választható sémadefiníció a táblához. A sémák definiálhatók SQL DDL karakterláncként vagy Python StructType -ként. |
temporary Típus: bool Hozzon létre egy táblát, de ne tegye közzé a tábla metaadatait. A temporary kulcsszó arra utasítja a DLT-t, hogy hozzon létre egy táblát, amely elérhető a folyamat számára, de nem érhető el a folyamaton kívül. A feldolgozási idő csökkentése érdekében egy ideiglenes tábla megmarad az azt létrehozó folyamat teljes élettartama alatt, és nem csak egyetlen frissítéssel.Az alapértelmezett érték a "False". |
row_filter (nyilvános előzetes verzió)Típus: str A tábla választható sorszűrő feltétele. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal. |
Tábla- vagy nézetdefiníció |
---|
def <function-name>() Az adathalmazt meghatározó Python-függvény. Ha a name paraméter nincs beállítva, akkor a <function-name> lesz a céladatkészlet neve. |
query Spark SQL-utasítás, amely Spark-adatkészletet vagy Koalas DataFrame-et ad vissza. Az dlt.read() vagy a spark.read.table() használatával teljes olvasást végezhet az ugyanabban a folyamatban definiált adatkészletből. Külső adatkészlet olvasásához használja a spark.read.table() függvényt. Külső adatkészletek olvasásához nem használhat dlt.read() . Mivel spark.read.table() használható belső adathalmazok, az aktuális folyamaton kívül definiált adathalmazok olvasására, és lehetővé teszi az adatok olvasási lehetőségeinek megadását, a Databricks a dlt.read() függvény helyett azt javasolja.Amikor egy folyamat adatkészletét definiálja, alapértelmezés szerint a folyamatkonfigurációban definiált katalógust és sémát fogja használni. A spark.read.table() függvénnyel a csővezetékben meghatározott adathalmazból tud olvasni, megkötés nélkül. Például egy customers nevű adatkészletből való olvasáshoz:spark.read.table("customers") A spark.read.table() függvénnyel a metaadattárban regisztrált táblákból is olvashat, ha a tábla nevét opcionálisan az adatbázis nevével minősíti:spark.read.table("sales.customers") Az dlt.read_stream() vagy a spark.readStream.table() használatával streamelési olvasást hajthat végre az ugyanabban a folyamatban definiált adatkészletből. Ha egy külső adatkészletből szeretne streamelési olvasást végezni, használja aspark.readStream.table() függvény. Mivel spark.readStream.table() használható belső adathalmazok, az aktuális folyamaton kívül definiált adathalmazok olvasására, és lehetővé teszi az adatok olvasási lehetőségeinek megadását, a Databricks a dlt.read_stream() függvény helyett azt javasolja.Ha sql-szintaxissal szeretne lekérdezést definiálni egy DLT table függvényben, használja a spark.sql függvényt. Lásd: Példa: Adatkészlet elérése spark.sql használatával. Ha egy DLT table függvényben szeretne lekérdezést definiálni a Python használatával, használja PySpark szintaxist. |
Elvárások |
---|
@expect("description", "constraint") Adatminőségi korlátozás deklarálása description . Ha egy sor megsérti a várakozást, vegye fel a sort a céladatkészletbe. |
@expect_or_drop("description", "constraint") Adatminőségi korlátozás deklarálása description . Ha egy sor megsérti a várakozást, ejtse el a sort a céladatkészletből. |
@expect_or_fail("description", "constraint") Adatminőségi korlátozás deklarálása description . Ha egy sor nem felel meg az elvárásoknak, azonnal állítsa le a végrehajtást. |
@expect_all(expectations) Deklarálhat egy vagy több adatminőségi korlátozást. expectations egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármelyik elvárást megsérti, a céladatkészletbe foglalja bele a sort. |
@expect_all_or_drop(expectations) Deklarálhat egy vagy több adatminőségi korlátozást. expectations egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármelyik elvárást megszegi, a céladatkészletből ejtse ki a sort. |
@expect_all_or_fail(expectations) Deklarálhat egy vagy több adatminőségi korlátozást. expectations egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármilyen elvárást megsért, azonnal le kell állítani a végrehajtást. |
Változáscsatorna adatrögzítésének módosítása a Pythonnal a DLT-ben
Használja a Python API apply_changes()
függvényét a DLT-változási adatrögzítési (CDC) funkcióval a forrásadatok változásadat-adatcsatornából (CDF) való feldolgozásához.
Fontos
A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A apply_changes()
céltábla sémájának megadásakor a __START_AT
és az __END_AT
oszlopokat a sequence_by
mezőkkel megegyező adattípussal kell tartalmaznia.
A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a DLT Python felületén.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Jegyzet
A APPLY CHANGES
feldolgozásánál a INSERT
és UPDATE
események alapértelmezett viselkedése a következő: CDC-eseményeket szúr be vagy frissít a forrásból származó adatok alapján. Frissíti a céltábla azon sorait, amelyek megfelelnek a megadott kulcs(ok)nak, vagy új sort szúr be, ha a céltáblában nincs egyező rekord. A DELETE
események kezelése a APPLY AS DELETE WHEN
feltétellel adható meg.
Ha többet szeretne megtudni a CDC változáscsatornával történő feldolgozásáról, olvassa el A VÁLTOZÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése a DLT-vel. A apply_changes()
függvény használatára példa: Példa: SCD 1. és 2. típusú SCD-feldolgozás CDF-forrásadatokkal.
Fontos
A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A apply_changes
céltáblaséma megadásakor a __START_AT
és __END_AT
oszlopokat is meg kell adnia, és az adattípusa megegyezik a sequence_by
mezővel.
Lásd: A MÓDOSÍTÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése A DLThasználatával.
Érvek |
---|
target Típus: str A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel létrehozhatja a céltáblát a apply_changes() függvény végrehajtása előtt.Ez a paraméter kötelező. |
source Típus: str A CDC-rekordokat tartalmazó adatforrás. Ez a paraméter kötelező. |
keys Típus: list Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira. Megadhatja az alábbiakat:
A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId) , de nem használhatja col(source.userId) .Ez a paraméter kötelező. |
sequence_by Típus: str vagy col() Az oszlop neve, amely a CDC-események logikai sorrendjét adja meg a forrásadatokban. A DLT ezzel a szekvenálással kezeli a sorrenden kívül érkező változási eseményeket. Megadhatja az alábbiakat:
A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId) , de nem használhatja col(source.userId) .A megadott oszlopnak rendezhető adattípusnak kell lennie. Ez a paraméter kötelező. |
ignore_null_updates Típus: bool A céloszlopok egy részhalmazát tartalmazó frissítések betöltésének engedélyezése. Ha egy CDC-esemény egyezik egy meglévő sorral és ignore_null_updates True , az null oszlopok megtartják a meglévő értékeiket a célban. Ez a null értékkel rendelkező beágyazott oszlopokra is vonatkozik. Ha ignore_null_updates False , a meglévő értékek felülíródnak null értékekkel.Ez a paraméter nem kötelező. Az alapértelmezett érték a False . |
apply_as_deletes Típus: str vagy expr() Azt határozza meg, hogy a CDC-eseményeket mikor kell DELETE ként kezelni, nem pedig upsertként. A rendelésen kívüli adatok kezeléséhez a törölt sor ideiglenesen sírkőként marad meg az alapul szolgáló Delta-táblában, és létrejön egy nézet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz konfigurálható a következővel:A pipelines.cdc.tombstoneGCThresholdInSeconds táblatulajdonság.Megadhatja az alábbiakat:
Ez a paraméter nem kötelező. |
apply_as_truncates Típus: str vagy expr() Azt jelzi, hogy a CDC-eseményeket mikor kell teljes táblaként kezelni TRUNCATE . Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható.A apply_as_truncates paraméter csak az 1. SCD-típus esetén támogatott. A 2. SCD-típus nem támogatja a csonkolási műveleteket.Megadhatja az alábbiakat:
Ez a paraméter nem kötelező. |
column_list except_column_list Típus: list A céltáblában szerepeltetni kívánt oszlopok egy részhalmaza. A column_list használatával adja meg a belefoglalandó oszlopok teljes listáját. Az except_column_list használatával adja meg a kizárni kívánt oszlopokat. Deklarálhatja az értéket sztringek listájaként vagy Spark SQL-col() függvényként:
A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId) , de nem használhatja col(source.userId) .Ez a paraméter nem kötelező. Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha a függvénynek nem ad át column_list vagy except_column_list argumentumot. |
stored_as_scd_type Típus: str vagy int A rekordok tárolása 1. vagy 2. SCD-típusként. Állítsa 1 -ra az SCD 1-es típushoz vagy 2 -re az SCD 2-es típushoz.Ez a záradék nem kötelező. Az alapértelmezett scd típus 1. |
track_history_column_list track_history_except_column_list Típus: list A célzott táblázat előzményeinek nyomon követésére szolgáló kimeneti oszlopok egy részhalmaza. A track_history_column_list használatával adja meg a követendő oszlopok teljes listáját. Használjatrack_history_except_column_list a nyomon követésből kizárandó oszlopok megadásához. Deklarálhatja az értéket sztringek listájaként vagy Spark SQL-col() függvényként:
A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId) , de nem használhatja col(source.userId) .Ez a paraméter nem kötelező. Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nincs track_history_column_list vagytrack_history_except_column_list argumentumot a függvény továbbítja. |
Adatbázis-pillanatképek adatrögzítésének módosítása a Pythonnal a DLT-ben
Fontos
A APPLY CHANGES FROM SNAPSHOT
API a nyilvános előzetes verzióban van.
Használja a Python API apply_changes_from_snapshot()
függvényét a DLT-változási adatrögzítési (CDC) funkcióval a forrásadatok adatbázis-pillanatképekből való feldolgozásához.
Fontos
A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A apply_changes_from_snapshot()
céltábla sémájának megadásakor a __START_AT
és __END_AT
oszlopokat is meg kell adnia, és az adattípusa megegyezik a sequence_by
mezővel.
A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a DLT Python felületén.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Jegyzet
A APPLY CHANGES FROM SNAPSHOT
feldolgozásához az alapértelmezett viselkedés egy új sor beszúrása, ha egy azonos kulccsal (kulcsokkal) rendelkező rekord nem található a célban. Ha létezik egyező rekord, az csak akkor frissül, ha a sorban szereplő értékek bármelyike módosult. A célban található, de a forrásban már nem szereplő kulcsokat tartalmazó sorok törlődnek.
A CDC pillanatképekkel való feldolgozásával kapcsolatos további információkért lásd AZ VÁLTOZÁSOK ALKALMAZÁSA APIs: A változtatási adatok rögzítésének egyszerűsítése DLT-vel. A apply_changes_from_snapshot()
függvény használatára vonatkozó példákért tekintse meg a rendszeres pillanatkép-betöltési és előzmény-pillanatkép-betöltési példákat.
Érvek |
---|
target Típus: str A frissíteni kívánt tábla neve. A függvény futtatása előtt a apply_changes() függvénnyel hozhatja létre a céltáblát.Ez a paraméter kötelező. |
source Típus: str vagy lambda function Egy táblázat vagy nézet neve, amely rendszeres időközönként pillanatképet hoz létre, vagy egy Python lambda függvény, amely visszaadja a feldolgozandó pillanatkép dataFrame-et és a pillanatkép verzióját. Lásd: A source argumentum implementálása.Ez a paraméter kötelező. |
keys Típus: list Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira. Megadhatja az alábbiakat:
A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId) , de nem használhatja col(source.userId) .Ez a paraméter kötelező. |
stored_as_scd_type Típus: str vagy int A rekordok tárolása 1. vagy 2. SCD-típusként. Állítsa 1 -ra az SCD 1-es típushoz vagy 2 -re az SCD 2-es típushoz.Ez a záradék nem kötelező. Az alapértelmezett scd típus 1. |
track_history_column_list track_history_except_column_list Típus: list A célzott táblázat előzményeinek nyomon követésére szolgáló kimeneti oszlopok egy részhalmaza. A track_history_column_list használatával adja meg a követendő oszlopok teljes listáját. Használtrack_history_except_column_list a nyomon követésből kizárandó oszlopok megadásához. Deklarálhatja az értéket sztringek listájaként vagy Spark SQL-col() függvényként:
A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId) , de nem használhatja col(source.userId) .Ez a paraméter nem kötelező. Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nincs track_history_column_list vagytrack_history_except_column_list argumentumot a függvény továbbítja. |
A source
argumentum implementálása
A apply_changes_from_snapshot()
függvény tartalmazza a source
argumentumot. Az előzmény-pillanatképek feldolgozásához a source
argumentum várhatóan egy Python lambda függvény lesz, amely két értéket ad vissza a apply_changes_from_snapshot()
függvénynek: a feldolgozandó pillanatképadatokat tartalmazó Python DataFrame-et és egy pillanatkép-verziót.
A lambda függvény aláírása a következő:
lambda Any => Optional[(DataFrame, Any)]
- A lambda függvény argumentuma a legutóbb feldolgozott pillanatkép-verzió.
- A lambda függvény visszatérési értéke
None
vagy két értékből álló rekord: A rekord első értéke a feldolgozandó pillanatképet tartalmazó DataFrame. A tuple második értéke a pillanatkép verziója, amely a pillanatkép logikai sorrendjét képviseli.
Példa a lambda függvény implementálására és meghívására:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
A DLT-futtatókörnyezet a következő lépéseket hajtja végre minden alkalommal, amikor a apply_changes_from_snapshot()
függvényt tartalmazó folyamat aktiválódik:
- A
next_snapshot_and_version
függvény futtatásával betölti a következő pillanatkép-adatkeretet és a megfelelő pillanatkép-verziót. - Ha a DataFrame nem ad vissza, a futtatás leáll, és a folyamatfrissítés befejezettként van megjelölve.
- Észleli az új pillanatkép módosításait, és fokozatosan alkalmazza őket a cél táblára.
- Visszatér az 1. lépéshez, hogy betöltse a következő pillanatképet és annak verzióját.
korlátozások
A DLT Python-felületre a következő korlátozások vonatkoznak:
A pivot()
függvény nem támogatott. A Spark pivot
művelete megköveteli a bemeneti adatok lelkes betöltését a kimeneti séma kiszámításához. A DLT nem támogatja ezt a képességet.