Megosztás a következőn keresztül:


DLT Python nyelvi referencia

Ez a cikk a DLT Python programozási felületének részleteit tartalmazza.

Az SQL API-val kapcsolatos információkért lásd a DLT SQL nyelvi referencia.

Az automatikus betöltő konfigurálásával kapcsolatos részletekért lásd Mi az automatikus betöltő?.

Kezdés előtt

A folyamatok DLT Python-felülettel történő implementálása során az alábbiakat érdemes figyelembe venni:

  • Mivel a Folyamatfrissítés tervezése és futtatása során a Python table() és view() függvények többször is meghívásra kerülnek, ne tartalmazzon olyan kódot ezen függvények egyikében, amelyek mellékhatásai lehetnek (például az adatokat módosító vagy e-mailt küldő kód). A váratlan viselkedés elkerülése érdekében az adathalmazokat meghatározó Python-függvényeknek csak a tábla vagy nézet definiálásához szükséges kódot kell tartalmazniuk.
  • Ha olyan műveleteket szeretne végrehajtani, mint az e-mailek küldése vagy egy külső figyelési szolgáltatással való integráció, különösen az adathalmazokat meghatározó funkciókban, használja eseményhookokat. Ha ezeket a műveleteket az adathalmazokat meghatározó függvényekben implementálja, az váratlan viselkedést fog okozni.
  • A Python table és view függvénynek dataFrame-et kell visszaadnia. A DataFrame-en működő egyes függvények nem adnak vissza DataFrame-eket, ezért nem használhatók. Ilyen műveletek például a collect(), count(), toPandas(), save()és saveAsTable(). Mivel a DataFrame-átalakítások végrehajtása a teljes adatfolyam-gráf feloldása után történik, az ilyen műveletek használata nem várt mellékhatásokat eredményezhet.

A dlt Python-modul importálása

A DLT Python-függvények a dlt modulban vannak definiálva. A Python API-val implementált folyamatoknak importálnia kell ezt a modult:

import dlt

DLT materializált nézet vagy streamelési tábla létrehozása

A Pythonban a DLT határozza meg, hogy egy adathalmazt materializált nézetként vagy streamelési táblaként kell-e frissíteni a definiáló lekérdezés alapján. A @table dekoratőr használható a materializált nézetek és a streamelési táblák definiálására is.

Ha materializált nézetet szeretne definiálni a Pythonban, alkalmazzon @table egy olyan lekérdezésre, amely statikus olvasást végez egy adatforráson. Streamelési tábla definiálásához alkalmazzon @table egy olyan lekérdezésre, amely streamelési olvasást végez egy adatforráson, vagy használja a create_streaming_table() függvényt. Mindkét adathalmaztípus szintaxis-specifikációja megegyezik az alábbiakkal:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

DLT-nézet létrehozása

A nézet Pythonban való definiálásához alkalmazza a @view dekoratőrt. A @table dekoratőrhöz hasonlóan a DLT nézetei statikus vagy streamelt adathalmazokhoz is használhatók. A Nézetek Pythonnal való definiálása a következő szintaxist tartalmazza:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Példa: Táblák és nézetek definiálása

Táblázat vagy nézet Pythonban való definiálásához alkalmazza a @dlt.view vagy @dlt.table dekorátort egy függvényre. A tábla vagy a nézetnév hozzárendeléséhez használhatja a függvény nevét vagy a name paramétert. Az alábbi példa két különböző adatkészletet határoz meg: egy taxi_raw nevű nézetet, amely egy JSON-fájlt fogad bemeneti forrásként, és egy filtered_data nevű táblát, amely bemenetként veszi fel a taxi_raw nézetet:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

Példa: Hozzáférés az ugyanabban a folyamatban definiált adatkészlethez

Jegyzet

Bár a dlt.read() és dlt.read_stream() függvények továbbra is elérhetők és teljes mértékben támogatottak a DLT Python-felületen, a Databricks a spark.read.table() és spark.readStream.table() függvények használatát javasolja az alábbiak miatt:

  • A spark függvények támogatják a belső és külső adatkészletek olvasását, beleértve a külső tárolóban lévő vagy más folyamatokban definiált adathalmazokat is. A dlt függvények csak belső adathalmazok olvasását támogatják.
  • A spark függvények támogatják az opciók meghatározását az olvasási műveletekhez, például skipChangeCommits. A beállítások megadását a dlt függvények nem támogatják.

Az ugyanabban a folyamatban definiált adathalmaz eléréséhez használja a spark.read.table() vagy spark.readStream.table() függvényeket:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

Jegyzet

A folyamat nézeteinek vagy tábláinak lekérdezésekor közvetlenül megadhatja a katalógust és a sémát, vagy használhatja a folyamatban konfigurált alapértelmezett beállításokat. Ebben a példában a customerstábla a folyamathoz konfigurált alapértelmezett katalógusból és sémából van megírva és beolvasva.

Példa: Olvasás metaadattárban regisztrált táblából

Ha a Hive metaadattárban regisztrált táblából szeretne adatokat olvasni, a függvényargumentumban a tábla nevét az adatbázis nevével minősítheti:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Egy Unity Catalog-táblából való olvasásra példa: adatok betöltése Unity Catalog-folyamatba.

Példa: Adatkészlet elérése spark.sql használatával

Az adathalmazokat egy lekérdezési függvényben lévő spark.sql kifejezéssel is visszaadhatja. Ha belső adatkészletből szeretne olvasni, a nevet nem minősítetten használhatja az alapértelmezett katalógus és séma használatára, vagy elő is használhatja őket:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

Rekordok végleges törlése materializált nézetből vagy streamelési táblából

Ha véglegesen törölni szeretné a rekordokat egy materializált nézetből vagy streamelőtáblából, és engedélyezve van a törlési vektorok használata, például a GDPR-megfelelőség érdekében, további műveleteket kell végrehajtani az objektum alapjául szolgáló Delta-táblákon. A rekordok materializált nézetből való törlésének biztosításához lásd: Rekordok végleges törlése materializált nézetben, engedélyezett törlési vektorokkal. A rekordok streamelési táblából való törlésének biztosításához lásd: Rekordok végleges törlése streamelési táblából.

Írás külső eseménystreamelési szolgáltatásokba vagy Delta-táblákba a DLT sink API-val

Fontos

A DLT sink API nyilvános előzetes verzióban van.

Jegyzet

  • A teljes frissítés futtatása nem törli az adatokat a tárolóhelyekről. Az újrafeldolgozott adatok hozzá lesznek fűzve a kimeneti tárolóhoz, és a meglévő adatok nem módosulnak.
  • A DLT-elvárások nem támogatottak a sink API-val.

Ha olyan eseménystreamelési szolgáltatásba szeretne írni, mint az Apache Kafka vagy az Azure Event Hubs, vagy egy DLT-folyamatból egy Delta-táblába, használja a create_sink() Python-modulban található dlt függvényt. Miután létrehozott egy gyűjtőhelyet a create_sink() függvénnyel, a gyűjtőhelyet egy hozzáfűzési folyamatban használja az adatok hozzáfűzésére. A hozzáfűzési áramlás az egyetlen áramlástípus, amelyet az create_sink() függvény támogat. Más folyamattípusok, például apply_changes, nem támogatottak.

A következő szintaxissal hozhat létre fogadót a create_sink() függvénnyel:

create_sink(<sink_name>, <format>, <options>)
Érvek
name
Típus: str
Egy karakterlánc, amely azonosítja a csatornát, és a csatorna hivatkozására és kezeléséhez szolgál. A kimenetneveknek egyedinek kell lenniük a csővezetékhez, beleértve az összes forráskódot, mint például a jegyzetfüzeteket vagy a csővezeték részét képező modulokat.
Ez a paraméter kötelező.
format
Típus: str
A kimeneti formátumot definiáló sztring, kafka vagy delta.
Ez a paraméter kötelező.
options
Típus: dict
A kimeneti lehetőségek választható listája, {"key": "value"} formátumban, ahol a kulcs és az érték egyaránt szöveg. Minden, a Kafka és a Delta fogadók által támogatott Databricks futtatókörnyezeti beállítás támogatott. A Kafka beállításaiért lásd: A Kafka strukturált streamelési írójának konfigurálása. A Delta beállításaiért lásd: Delta-tábla fogadóként.

Példa: Egy Kafka csatlakozó létrehozása a create_sink() függvénnyel

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

Példa: Delta adattároló létrehozása a create_sink() függvénnyel és a fájlrendszer elérési útjával

Az alábbi példa bemutatja, hogyan lehet egy adatforrást létrehozni, amely egy Delta-táblába ír a fájlrendszer elérési útjának megadásával.

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

Példa: Delta adatfogadó létrehozása a create_sink() függvénnyel és egy Unity Catalog táblanévvel

Jegyzet

A Delta sink támogatja a Unity Catalog külső és felügyelt tábláit, valamint a Hive metaadattár által felügyelt táblákat. A táblaneveknek teljes mértékben minősítettnek kell lenniük. A Unity Catalog tábláinak például háromszintű azonosítót kell használniuk: <catalog>.<schema>.<table>. A Hive metaadattártábláinak <schema>.<table>kell használniuk.

Az alábbi példa egy adatfogadót hoz létre, amely egy Delta-táblába ír úgy, hogy átadja egy tábla nevét a Unity Catalogban.

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

Példa: Hozzáfűzési művelet használata a Delta-fogadóba való íráshoz

Az alábbi példa létrehoz egy olyan fogadót, amely egy Delta-táblába ír, majd létrehoz egy hozzáfűzési folyamatot a fogadóba való íráshoz:

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

Példa: Egy hozzáfűzési folyamat használata egy Kafka-csatornába való íráshoz

Az alábbi példa létrehoz egy kimenetet, amely egy Kafka témába ír, majd létrehoz egy hozzáfűzési adatfolyamot a kimenetbe való íráshoz.

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

A Kafkába írt DataFrame sémájának tartalmaznia kell azokat az oszlopokat, amelyeket a A Kafka strukturált streamelési írójának konfigurálása meg van adva.

Streamelési műveletek céltáblájának létrehozása

A create_streaming_table() függvénnyel létrehozhat egy céltáblát a streamelési műveletek által előállított rekordokhoz, beleértve a apply_changes(), apply_changes_from_snapshot()és @append_flow kimeneti rekordokat.

Jegyzet

A create_target_table() és create_streaming_live_table() függvények elavultak. A Databricks a meglévő kód frissítését javasolja a create_streaming_table() függvény használatához.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Érvek
name
Típus: str
A tábla neve.
Ez a paraméter kötelező.
comment
Típus: str
A táblázat opcionális leírása.
spark_conf
Típus: dict
A lekérdezés végrehajtásához szükséges Spark-konfigurációk választható listája.
table_properties
Típus: dict
A tábla táblatulajdonságainak választható listája.
partition_cols
Típus: array
A tábla particionálásához használandó egy vagy több oszlop választható listája.
cluster_by
Típus: array
Opcióként engedélyezze a folyékony fürtözést a táblán, és definiálja a fürtözési kulcsként használni kívánt oszlopokat.
Lásd A Delta-táblákfolyékony fürtözésének használata című témakört.
path
Típus: str
A táblaadatok opcionális tárolási helye. Ha nincs beállítva, akkor a rendszer alapértelmezettként a tárolócsővezeték helyét használja.
schema
Típus: str vagy StructType
A tábla számára választható sémadefiníció. A sémák definiálhatók SQL DDL-sztringként vagy Pythonnal
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail
Típus: dict
A tábla opcionális adatminőségi korlátozásai. Lásd: több elvárás.
row_filter (nyilvános előzetes verzió)
Típus: str
A tábla opcionális sorszűrő záradéka. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.

A táblák materializálásának szabályozása

A táblák a materializációjuk további szabályozását is lehetővé teszi:

  • Adja meg, hogyan lehet a táblákat klasztereznicluster_by felhasználásával. A lekérdezések felgyorsításához folyékony fürtözést használhat. Lásd A Delta-táblákfolyékony fürtözésének használata című témakört.
  • Adja meg, hogyan vannak a táblák particionálva a partition_cols használatával.
  • Nézet vagy tábla definiálásakor táblatulajdonságokat állíthat be. Lásd a DLT tábla tulajdonságait.
  • A táblaadatok tárolási helyének beállítása a path beállítással. A táblaadatok alapértelmezés szerint a folyamat tárolási helyén lesznek tárolva, ha path nincs beállítva.
  • A sémád definíciójában használhat létrehozott oszlopokat. Lásd: Példa: Séma és klaszter oszlopok megadása.

Megjegyzés

Az 1 TB-nál kisebb méretű táblák esetében a Databricks azt javasolja, hogy a DLT vezérelje az adatszervezést. Nem szükséges megadni partícióoszlopokat, hacsak nem számít arra, hogy a táblája meghaladja egy terabájtot.

Példa: Séma és fürtoszlopok megadása

A táblázatsémát választhatóan megadhatja Python StructType vagy SQL DDL karakterlánc használatával. Ha DDL-sztringgel van megadva, a definíció tartalmazhat létrehozott oszlopokat.

Az alábbi példa létrehoz egy sales nevű táblát egy Python-StructTypehasználatával megadott sémával:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Az alábbi példa egy tábla sémáját adja meg egy DDL karakterlánc használatával, meghatároz egy generált oszlopot, és klaszterezési oszlopokat határoz meg.

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

Alapértelmezés szerint a DLT a sémát a table definíciójából következteti, ha nem ad meg sémát.

Példa: Partícióoszlopok megadása

Az alábbi példa egy tábla sémáját határozza meg egy DDL-sztring használatával, definiál egy generált oszlopot, és definiál egy partícióoszlopot:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Példa: Táblakorlátozások definiálása

Fontos

A táblakorlátozások a nyilvános előzetes változatban vannak.

Séma megadásakor megadhatja az elsődleges és az idegen kulcsokat. A korlátozások tájékoztató jellegűek, és nincsenek kényszerítve. Tekintse meg a CONSTRAINT záradékot az SQL nyelvi hivatkozásában.

Az alábbi példa egy elsődleges és idegenkulcs-korlátozással rendelkező táblát határoz meg:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Példa: Sorszűrő és oszlopmaszk definiálása

Fontos

A sorszűrők és az oszlopmaszkok nyilvános előzetes verziójú.

Ha materializált nézetet vagy streamelési táblát szeretne létrehozni sorszűrővel és oszlopmaszkkal, használja a ROW FILTER záradékot és a MASZK záradékot. Az alábbi példa bemutatja, hogyan definiálhat materializált nézetet és streamelési táblát sorszűrővel és oszlopmaszkkal is:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

További információ a sorszűrőkről és az oszlopmaszkokról: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.

Stream-tábla konfigurálása a forrásstreamelési tábla módosításainak figyelmen kívül hagyásához

Jegyzet

  • A skipChangeCommits jelző csak akkor működik, ha a spark.readStream a option() függvényt használja. Ezt a jelölőt nem használhatja dlt.read_stream() függvényben.
  • Nem használhatja a skipChangeCommits jelzőt, ha a forrásstreamelési tábla egy apply_changes() függvény céljaként van definiálva.

Alapértelmezés szerint a streamelési táblák csak hozzáfűző forrásokat igényelnek. Ha egy streamelő tábla egy másik streamelési táblát használ forrásként, és a forrásstreamelési tábla frissítéseket vagy törléseket igényel, például a GDPR "az elfelejtettséghez való jog" feldolgozását, a skipChangeCommits jelző a forrásstreamelési tábla olvasásakor beállítható, hogy figyelmen kívül hagyja ezeket a módosításokat. A jelölőről további információt a Frissítések és törlések mellőzésecímű témakörben talál.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Python DLT-tulajdonságok

Az alábbi táblázatok ismertetik a DLT-vel rendelkező táblák és nézetek definiálása során megadható beállításokat és tulajdonságokat:

@table vagy @view
name
Típus: str
A tábla vagy nézet opcionális elnevezése. Ha nincs megadva, a függvény neve tábla- vagy nézetnévként lesz használva.
comment
Típus: str
A táblázat opcionális leírása.
spark_conf
Típus: dict
A lekérdezés végrehajtásához szükséges Spark-konfigurációk választható listája.
table_properties
Típus: dict
A táblatulajdonságok választható listája a tábla számára.
path
Típus: str
A táblaadatok opcionális tárolási helye. Ha nincs beállítva, a rendszer alapértelmezés szerint a folyamat tárhely helyét használja.
partition_cols
Típus: a collection of str
Választható gyűjtemény, például egy vagy több oszlopból álló list a tábla particionálásához.
cluster_by
Típus: array
Opcióként engedélyezze a folyékony fürtözést a táblán, és definiálja a fürtözési kulcsként használni kívánt oszlopokat.
Lásd A Delta-táblákfolyékony fürtözésének használata című témakört.
schema
Típus: str vagy StructType
Egy választható sémadefiníció a táblához. A sémák definiálhatók SQL DDL karakterláncként vagy Python StructType-ként.
temporary
Típus: bool
Hozzon létre egy táblát, de ne tegye közzé a tábla metaadatait. A temporary kulcsszó arra utasítja a DLT-t, hogy hozzon létre egy táblát, amely elérhető a folyamat számára, de nem érhető el a folyamaton kívül. A feldolgozási idő csökkentése érdekében egy ideiglenes tábla megmarad az azt létrehozó folyamat teljes élettartama alatt, és nem csak egyetlen frissítéssel.
Az alapértelmezett érték a "False".
row_filter (nyilvános előzetes verzió)
Típus: str
A tábla választható sorszűrő feltétele. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.
Tábla- vagy nézetdefiníció
def <function-name>()
Az adathalmazt meghatározó Python-függvény. Ha a name paraméter nincs beállítva, akkor a <function-name> lesz a céladatkészlet neve.
query
Spark SQL-utasítás, amely Spark-adatkészletet vagy Koalas DataFrame-et ad vissza.
Az dlt.read() vagy a spark.read.table() használatával teljes olvasást végezhet az ugyanabban a folyamatban definiált adatkészletből. Külső adatkészlet olvasásához használja a spark.read.table() függvényt. Külső adatkészletek olvasásához nem használhat dlt.read(). Mivel spark.read.table() használható belső adathalmazok, az aktuális folyamaton kívül definiált adathalmazok olvasására, és lehetővé teszi az adatok olvasási lehetőségeinek megadását, a Databricks a dlt.read() függvény helyett azt javasolja.
Amikor egy folyamat adatkészletét definiálja, alapértelmezés szerint a folyamatkonfigurációban definiált katalógust és sémát fogja használni. A spark.read.table() függvénnyel a csővezetékben meghatározott adathalmazból tud olvasni, megkötés nélkül. Például egy customersnevű adatkészletből való olvasáshoz:
spark.read.table("customers")
A spark.read.table() függvénnyel a metaadattárban regisztrált táblákból is olvashat, ha a tábla nevét opcionálisan az adatbázis nevével minősíti:
spark.read.table("sales.customers")
Az dlt.read_stream() vagy a spark.readStream.table() használatával streamelési olvasást hajthat végre az ugyanabban a folyamatban definiált adatkészletből. Ha egy külső adatkészletből szeretne streamelési olvasást végezni, használja a
spark.readStream.table() függvény. Mivel spark.readStream.table() használható belső adathalmazok, az aktuális folyamaton kívül definiált adathalmazok olvasására, és lehetővé teszi az adatok olvasási lehetőségeinek megadását, a Databricks a dlt.read_stream() függvény helyett azt javasolja.
Ha sql-szintaxissal szeretne lekérdezést definiálni egy DLT table függvényben, használja a spark.sql függvényt. Lásd: Példa: Adatkészlet elérése spark.sqlhasználatával. Ha egy DLT table függvényben szeretne lekérdezést definiálni a Python használatával, használja PySpark szintaxist.
Elvárások
@expect("description", "constraint")
Adatminőségi korlátozás deklarálása
description. Ha egy sor megsérti a várakozást, vegye fel a sort a céladatkészletbe.
@expect_or_drop("description", "constraint")
Adatminőségi korlátozás deklarálása
description. Ha egy sor megsérti a várakozást, ejtse el a sort a céladatkészletből.
@expect_or_fail("description", "constraint")
Adatminőségi korlátozás deklarálása
description. Ha egy sor nem felel meg az elvárásoknak, azonnal állítsa le a végrehajtást.
@expect_all(expectations)
Deklarálhat egy vagy több adatminőségi korlátozást.
expectations egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármelyik elvárást megsérti, a céladatkészletbe foglalja bele a sort.
@expect_all_or_drop(expectations)
Deklarálhat egy vagy több adatminőségi korlátozást.
expectations egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármelyik elvárást megszegi, a céladatkészletből ejtse ki a sort.
@expect_all_or_fail(expectations)
Deklarálhat egy vagy több adatminőségi korlátozást.
expectations egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármilyen elvárást megsért, azonnal le kell állítani a végrehajtást.

Változáscsatorna adatrögzítésének módosítása a Pythonnal a DLT-ben

Használja a Python API apply_changes() függvényét a DLT-változási adatrögzítési (CDC) funkcióval a forrásadatok változásadat-adatcsatornából (CDF) való feldolgozásához.

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A apply_changes() céltábla sémájának megadásakor a __START_AT és az __END_AT oszlopokat a sequence_by mezőkkel megegyező adattípussal kell tartalmaznia.

A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a DLT Python felületén.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Jegyzet

A APPLY CHANGES feldolgozásánál a INSERT és UPDATE események alapértelmezett viselkedése a következő: CDC-eseményeket szúr be vagy frissít a forrásból származó adatok alapján. Frissíti a céltábla azon sorait, amelyek megfelelnek a megadott kulcs(ok)nak, vagy új sort szúr be, ha a céltáblában nincs egyező rekord. A DELETE események kezelése a APPLY AS DELETE WHEN feltétellel adható meg.

Ha többet szeretne megtudni a CDC változáscsatornával történő feldolgozásáról, olvassa el A VÁLTOZÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése a DLT-vel. A apply_changes() függvény használatára példa: Példa: SCD 1. és 2. típusú SCD-feldolgozás CDF-forrásadatokkal.

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A apply_changes céltáblaséma megadásakor a __START_AT és __END_AT oszlopokat is meg kell adnia, és az adattípusa megegyezik a sequence_by mezővel.

Lásd: A MÓDOSÍTÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése A DLThasználatával.

Érvek
target
Típus: str
A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel létrehozhatja a céltáblát a apply_changes() függvény végrehajtása előtt.
Ez a paraméter kötelező.
source
Típus: str
A CDC-rekordokat tartalmazó adatforrás.
Ez a paraméter kötelező.
keys
Típus: list
Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira.
Megadhatja az alábbiakat:
  • Sztringek listája: ["userId", "orderId"]
  • A Spark SQL col() függvényeinek listája: [col("userId"), col("orderId"]

A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId), de nem használhatja col(source.userId).
Ez a paraméter kötelező.
sequence_by
Típus: str vagy col()
Az oszlop neve, amely a CDC-események logikai sorrendjét adja meg a forrásadatokban. A DLT ezzel a szekvenálással kezeli a sorrenden kívül érkező változási eseményeket.
Megadhatja az alábbiakat:
  • Karakterlánc: "sequenceNum"
  • Spark SQL col() függvény: col("sequenceNum")

A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId), de nem használhatja col(source.userId).
A megadott oszlopnak rendezhető adattípusnak kell lennie.
Ez a paraméter kötelező.
ignore_null_updates
Típus: bool
A céloszlopok egy részhalmazát tartalmazó frissítések betöltésének engedélyezése. Ha egy CDC-esemény egyezik egy meglévő sorral és ignore_null_updatesTrue, az null oszlopok megtartják a meglévő értékeiket a célban. Ez a nullértékkel rendelkező beágyazott oszlopokra is vonatkozik. Ha ignore_null_updatesFalse, a meglévő értékek felülíródnak null értékekkel.
Ez a paraméter nem kötelező.
Az alapértelmezett érték a False.
apply_as_deletes
Típus: str vagy expr()
Azt határozza meg, hogy a CDC-eseményeket mikor kell DELETE ként kezelni, nem pedig upsertként. A rendelésen kívüli adatok kezeléséhez a törölt sor ideiglenesen sírkőként marad meg az alapul szolgáló Delta-táblában, és létrejön egy nézet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz konfigurálható a következővel:
A pipelines.cdc.tombstoneGCThresholdInSeconds táblatulajdonság.
Megadhatja az alábbiakat:
  • Karakterlánc: "Operation = 'DELETE'"
  • Spark SQL expr() függvény: expr("Operation = 'DELETE'")

Ez a paraméter nem kötelező.
apply_as_truncates
Típus: str vagy expr()
Azt jelzi, hogy a CDC-eseményeket mikor kell teljes táblaként kezelni TRUNCATE. Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható.
A apply_as_truncates paraméter csak az 1. SCD-típus esetén támogatott. A 2. SCD-típus nem támogatja a csonkolási műveleteket.
Megadhatja az alábbiakat:
  • Sztring: "Operation = 'TRUNCATE'"
  • Spark SQL expr() függvény: expr("Operation = 'TRUNCATE'")

Ez a paraméter nem kötelező.
column_list
except_column_list
Típus: list
A céltáblában szerepeltetni kívánt oszlopok egy részhalmaza. A column_list használatával adja meg a belefoglalandó oszlopok teljes listáját. Az except_column_list használatával adja meg a kizárni kívánt oszlopokat. Deklarálhatja az értéket sztringek listájaként vagy Spark SQL-col() függvényként:
  • column_list = ["userId", "name", "city"].
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId), de nem használhatja col(source.userId).
Ez a paraméter nem kötelező.
Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha a függvénynek nem ad át column_list vagy except_column_list argumentumot.
stored_as_scd_type
Típus: str vagy int
A rekordok tárolása 1. vagy 2. SCD-típusként.
Állítsa 1-ra az SCD 1-es típushoz vagy 2-re az SCD 2-es típushoz.
Ez a záradék nem kötelező.
Az alapértelmezett scd típus 1.
track_history_column_list
track_history_except_column_list
Típus: list
A célzott táblázat előzményeinek nyomon követésére szolgáló kimeneti oszlopok egy részhalmaza. A track_history_column_list használatával adja meg a követendő oszlopok teljes listáját. Használja
track_history_except_column_list a nyomon követésből kizárandó oszlopok megadásához. Deklarálhatja az értéket sztringek listájaként vagy Spark SQL-col() függvényként:
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId), de nem használhatja col(source.userId).
Ez a paraméter nem kötelező.
Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nincs track_history_column_list vagy
track_history_except_column_list argumentumot a függvény továbbítja.

Adatbázis-pillanatképek adatrögzítésének módosítása a Pythonnal a DLT-ben

Fontos

A APPLY CHANGES FROM SNAPSHOT API a nyilvános előzetes verzióban van.

Használja a Python API apply_changes_from_snapshot() függvényét a DLT-változási adatrögzítési (CDC) funkcióval a forrásadatok adatbázis-pillanatképekből való feldolgozásához.

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A apply_changes_from_snapshot() céltábla sémájának megadásakor a __START_AT és __END_AT oszlopokat is meg kell adnia, és az adattípusa megegyezik a sequence_by mezővel.

A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a DLT Python felületén.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Jegyzet

A APPLY CHANGES FROM SNAPSHOT feldolgozásához az alapértelmezett viselkedés egy új sor beszúrása, ha egy azonos kulccsal (kulcsokkal) rendelkező rekord nem található a célban. Ha létezik egyező rekord, az csak akkor frissül, ha a sorban szereplő értékek bármelyike módosult. A célban található, de a forrásban már nem szereplő kulcsokat tartalmazó sorok törlődnek.

A CDC pillanatképekkel való feldolgozásával kapcsolatos további információkért lásd AZ VÁLTOZÁSOK ALKALMAZÁSA APIs: A változtatási adatok rögzítésének egyszerűsítése DLT-vel. A apply_changes_from_snapshot() függvény használatára vonatkozó példákért tekintse meg a rendszeres pillanatkép-betöltési és előzmény-pillanatkép-betöltési példákat.

Érvek
target
Típus: str
A frissíteni kívánt tábla neve. A függvény futtatása előtt a apply_changes() függvénnyel hozhatja létre a céltáblát.
Ez a paraméter kötelező.
source
Típus: str vagy lambda function
Egy táblázat vagy nézet neve, amely rendszeres időközönként pillanatképet hoz létre, vagy egy Python lambda függvény, amely visszaadja a feldolgozandó pillanatkép dataFrame-et és a pillanatkép verzióját. Lásd: A source argumentum implementálása.
Ez a paraméter kötelező.
keys
Típus: list
Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira.
Megadhatja az alábbiakat:
  • Sztringek listája: ["userId", "orderId"]
  • A Spark SQL col() függvényeinek listája: [col("userId"), col("orderId"]

A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId), de nem használhatja col(source.userId).
Ez a paraméter kötelező.
stored_as_scd_type
Típus: str vagy int
A rekordok tárolása 1. vagy 2. SCD-típusként.
Állítsa 1-ra az SCD 1-es típushoz vagy 2-re az SCD 2-es típushoz.
Ez a záradék nem kötelező.
Az alapértelmezett scd típus 1.
track_history_column_list
track_history_except_column_list
Típus: list
A célzott táblázat előzményeinek nyomon követésére szolgáló kimeneti oszlopok egy részhalmaza. A track_history_column_list használatával adja meg a követendő oszlopok teljes listáját. Használ
track_history_except_column_list a nyomon követésből kizárandó oszlopok megadásához. Deklarálhatja az értéket sztringek listájaként vagy Spark SQL-col() függvényként:
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

A col() függvény argumentumai nem tartalmazhatnak minősítőket. Használhatja például a col(userId), de nem használhatja col(source.userId).
Ez a paraméter nem kötelező.
Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nincs track_history_column_list vagy
track_history_except_column_list argumentumot a függvény továbbítja.

A source argumentum implementálása

A apply_changes_from_snapshot() függvény tartalmazza a source argumentumot. Az előzmény-pillanatképek feldolgozásához a source argumentum várhatóan egy Python lambda függvény lesz, amely két értéket ad vissza a apply_changes_from_snapshot() függvénynek: a feldolgozandó pillanatképadatokat tartalmazó Python DataFrame-et és egy pillanatkép-verziót.

A lambda függvény aláírása a következő:

lambda Any => Optional[(DataFrame, Any)]
  • A lambda függvény argumentuma a legutóbb feldolgozott pillanatkép-verzió.
  • A lambda függvény visszatérési értéke None vagy két értékből álló rekord: A rekord első értéke a feldolgozandó pillanatképet tartalmazó DataFrame. A tuple második értéke a pillanatkép verziója, amely a pillanatkép logikai sorrendjét képviseli.

Példa a lambda függvény implementálására és meghívására:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

A DLT-futtatókörnyezet a következő lépéseket hajtja végre minden alkalommal, amikor a apply_changes_from_snapshot() függvényt tartalmazó folyamat aktiválódik:

  1. A next_snapshot_and_version függvény futtatásával betölti a következő pillanatkép-adatkeretet és a megfelelő pillanatkép-verziót.
  2. Ha a DataFrame nem ad vissza, a futtatás leáll, és a folyamatfrissítés befejezettként van megjelölve.
  3. Észleli az új pillanatkép módosításait, és fokozatosan alkalmazza őket a cél táblára.
  4. Visszatér az 1. lépéshez, hogy betöltse a következő pillanatképet és annak verzióját.

korlátozások

A DLT Python-felületre a következő korlátozások vonatkoznak:

A pivot() függvény nem támogatott. A Spark pivot művelete megköveteli a bemeneti adatok lelkes betöltését a kimeneti séma kiszámításához. A DLT nem támogatja ezt a képességet.