Referenční dokumentace jazyka PYTHONu DLT
Tento článek obsahuje podrobnosti o programovacím rozhraní DLT Python.
Informace o rozhraní SQL API najdete v referenční jazyka SQLDLT .
Podrobnosti týkající se konfigurace automatického zavaděče najdete v tématu Co je automatický zavaděč?.
Než začnete
Při implementaci kanálů s rozhraním DLT Python je důležité vzít v úvahu následující skutečnosti:
- Vzhledem k tomu, že se funkce pythonu
table()
aview()
vyvolávají několikrát během plánování a spuštění aktualizace kanálu, nezahrnujte kód do jedné z těchto funkcí, která může mít vedlejší účinky (například kód, který upravuje data nebo odesílá e-mail). Aby nedocházelo k neočekávanému chování, měly by funkce Pythonu, které definují datové sady, obsahovat pouze kód potřebný k definování tabulky nebo zobrazení. - K provádění operací, jako je odesílání e-mailů nebo integrace s externí monitorovací službou, zejména ve funkcích, které definují datové sady, použijte událostní háčky. Implementace těchto operací ve funkcích, které definují datové sady, způsobí neočekávané chování.
- Funkce
table
aview
Pythonu musí vracet datový rámec. Některé funkce, které pracují s datovými rámci, nevrací datové rámce a neměly by se používat. Mezi tyto operace patří funkce, jako jsoucollect()
,count()
,toPandas()
,save()
asaveAsTable()
. Vzhledem k tomu, že transformace datového rámce se provádějí po vyřešení úplného grafu toku dat, může použití těchto operací mít nežádoucí vedlejší účinky.
Import modulu dlt
Pythonu
Funkce DLT Pythonu jsou definovány v modulu dlt
. Vaše kanály implementované pomocí rozhraní Python API musí importovat tento modul:
import dlt
Vytvoření materializovaného zobrazení DLT nebo tabulky streamování
DlT v Pythonu určuje, jestli se má datová sada aktualizovat jako materializovaná nebo streamovaná tabulka na základě definovaného dotazu. Dekorátor @table
lze použít k definování materializovaných zobrazení i streamovaných tabulek.
Pokud chcete definovat materializované zobrazení v Pythonu, použijte @table
na dotaz, který provádí statické čtení proti zdroji dat. Pokud chcete definovat streamovací tabulku, použijte @table
na dotaz, který provádí streamování čtení proti zdroji dat, nebo použijte funkci create_streaming_table(). Oba typy datových sad mají stejnou specifikaci syntaxe:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Vytvoření zobrazení DLT
Pokud chcete definovat zobrazení v Pythonu, použijte @view
dekorátor. Podobně jako dekorátor @table
můžete v DLT použít pohledy pro statické nebo streamované datové sady. Následuje syntaxe pro definování zobrazení pomocí Pythonu:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Příklad: Definování tabulek a zobrazení
Pokud chcete definovat tabulku nebo zobrazení v Pythonu, použijte pro funkci @dlt.view
nebo @dlt.table
dekorátor. Název funkce nebo parametr name
můžete použít k přiřazení názvu tabulky nebo zobrazení. Následující příklad definuje dvě různé datové sady: zobrazení označované jako taxi_raw
, které jako vstupní zdroj přebírá soubor JSON a tabulku s názvem filtered_data
, která přebírá taxi_raw
zobrazení jako vstup:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
Příklad: Přístup k datové sadě definované ve stejném kanálu
Poznámka
I když jsou funkce dlt.read()
a dlt.read_stream()
stále dostupné a plně podporované rozhraním DLT Pythonu, databricks doporučuje vždy používat funkce spark.read.table()
a spark.readStream.table()
z následujících důvodů:
- Funkce
spark
podporují čtení interních a externích datových sad, včetně datových sad v externím úložišti nebo definovaných v jiných kanálech. Funkcedlt
podporují jen čtení interních datových sad. - Funkce
spark
podporují zadávání možností, jako jeskipChangeCommits
, pro operace čtení. Funkcedlt
nepodporují zadávání možností.
Pokud chcete získat přístup k datové sadě definované ve stejném kanálu, použijte funkce spark.read.table()
nebo spark.readStream.table()
:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
Poznámka
Při dotazování zobrazení nebo tabulek v kanálu můžete přímo zadat katalog a schéma nebo můžete použít výchozí hodnoty nakonfigurované v kanálu. V tomto příkladu se tabulka customers
zapíše a načte z výchozího katalogu a schématu nakonfigurovaného pro váš kanál.
Příklad: Čtení z tabulky zaregistrované v metastoru
Pokud chcete číst data z tabulky zaregistrované v metastoru Hive, můžete v argumentu funkce kvalifikovat název tabulky s názvem databáze:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Příklad čtení z tabulky Katalogu Unity najdete v tématu Ingestování dat do kanálu katalogu Unity.
Příklad : Přístup k datové sadě pomocí spark.sql
Datovou sadu můžete vrátit také pomocí výrazu spark.sql
ve funkci dotazu. Pokud chcete číst z interní datové sady, můžete buď ponechat název nekvalifikovaný pro použití výchozího katalogu a schématu, nebo je můžete předem nastavit:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
Trvalé odstranění záznamů z materializovaného zobrazení nebo tabulky streamování
Pokud chcete trvale odstranit záznamy z materializovaného zobrazení nebo tabulky streamování s povolenými vektory odstranění, jako je například dodržování předpisů GDPR, musí být u podkladových tabulek Delta objektu provedeny další operace. Pokud chcete zajistit odstranění záznamů z materializovaného zobrazení, přečtěte si Trvalé odstranění záznamů z materializovaného zobrazení s povolenými vektory odstranění. Pokud chcete zajistit odstranění záznamů z tabulky streamování, přečtěte si téma Trvalé odstranění záznamů z tabulky streamování.
Zápis do externích služeb streamování událostí nebo tabulek Delta pomocí rozhraní DLT sink
API
Důležitý
Rozhraní DLLT sink
API je ve verzi Public Preview.
Poznámka
- Spuštění úplné aktualizace nevymaže data z jímek. Do jímky budou připojena všechna znovu zpracovaná data a stávající data nebudou změněna.
- Očekávání DLT nejsou podporována prostřednictvím API
sink
.
Pokud chcete zapisovat do služby streamování událostí, jako je Apache Kafka nebo Azure Event Hubs nebo do tabulky Delta z kanálu DLT, použijte funkci create_sink()
, která je součástí modulu dlt
Pythonu. Po vytvoření jímky pomocí funkce create_sink()
použijete jímku v připojit tok k zápisu dat do jímky. Tok "Append" je jediný typ toku podporovaný funkcí create_sink()
. Jiné typy toků, například apply_changes
, se nepodporují.
Následuje syntaxe pro vytvoření jímky s funkcí create_sink()
:
create_sink(<sink_name>, <format>, <options>)
Argumenty |
---|
name Typ: str Řetězec, který identifikuje jímku a slouží k odkazování a správě jímky. Názvy jímek musí být pro potrubí jedinečné, včetně celého zdrojového kódu, jako jsou poznámkové bloky nebo moduly, které jsou součástí potrubí. Tento parametr je povinný. |
format Typ: str Řetězec, který definuje výstupní formát, kafka nebo delta .Tento parametr je povinný. |
options Typ: dict Volitelný seznam možností jímky formátovaný jako {"key": "value"} , kde klíč a hodnota jsou oba řetězce. Podporovány jsou všechny možnosti Databricks Runtime, které podporují sinky Kafka a Delta. Informace o možnostech Kafka najdete v tématu Konfigurace zapisovače strukturovaného streamování Kafka. Informace o možnostech Delta najdete v tématu Tabulka Delta jako jímka. |
příklad : Vytvoření jímky Kafka s funkcí create_sink()
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
příklad : Vytvoření jímky Delta s funkcí create_sink()
a cestou k systému souborů
Následující příklad vytvoří zdroj, který zapisuje do tabulky Delta tím, že zadá cestu filesystému k tabulce:
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
příklad : Vytvoření jímky Delta s funkcí create_sink()
a názvem tabulky Katalogu Unity
Poznámka
Jímka Delta podporuje externí a spravované tabulky Katalogu Unity a spravované tabulky metastoru Hive. Názvy tabulek musí být plně kvalifikované. Například tabulky katalogu Unity musí používat identifikátor třívrstvé: <catalog>.<schema>.<table>
. Tabulky metastoru Hive musí používat <schema>.<table>
.
Následující příklad vytvoří jímku, která zapisuje do tabulky Delta předáním názvu tabulky v Katalogu Unity:
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
Příklad: Použití zapisovacího toku k zápisu do úložiště Delta
Následující příklad vytvoří jímku, která zapíše do tabulky Delta a pak vytvoří tok připojení pro zápis do této jímky:
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
příklad : Použití toku připojení k zápisu do jímky Kafka
Následující příklad vytvoří jímku, která zapíše do tématu Kafka a pak vytvoří tok připojení pro zápis do této jímky:
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Schéma datového rámce zapsaného do systému Kafka by mělo obsahovat sloupce zadané v Konfigurace zapisovače strukturovaného streamování Kafka.
Vytvoření tabulky, která se použije jako cíl operací streamování
Pomocí funkce create_streaming_table()
vytvořte cílovou tabulku pro záznamy, které jsou výstupem streamovacích operací, včetně apply_changes(), apply_changes_from_snapshot()a @append_flow.
Poznámka
Funkce create_target_table()
a create_streaming_live_table()
jsou zastaralé. Databricks doporučuje aktualizovat existující kód tak, aby používal funkci create_streaming_table()
.
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumenty |
---|
name Typ: str Název tabulky. Tento parametr je povinný. |
comment Typ: str Volitelný popis tabulky. |
spark_conf Typ: dict Volitelný seznam konfigurací Sparku pro spuštění tohoto dotazu. |
table_properties Typ: dict Volitelný seznam vlastností tabulky této tabulky. |
partition_cols Typ: array Volitelný seznam jednoho nebo více sloupců, které se mají použít k dělení tabulky. |
cluster_by Typ: array Volitelně povolte shlukování kapalin v tabulce a definujte sloupce, které se mají použít jako klíče shlukování. Viz Použijte 'liquid clustering' pro tabulky Delta. |
path Typ: str Volitelné umístění úložiště pro data tabulky. Pokud není nastaveno, systém použije výchozí umístění úložiště pipeline. |
schema Typ: str nebo StructType Volitelná definice schématu pro tabulku. Schémata je možné definovat jako řetězec DDL SQL nebo pomocí Pythonu. StructType . |
expect_all expect_all_or_drop expect_all_or_fail Typ: dict Volitelná omezení kvality dat pro tabulku Viz více očekávání. |
row_filter (Veřejná Ukázka)Typ: str Volitelná klauzule filtru řádků pro tabulku. Viz Publikování tabulek s filtry řádků a maskami sloupců. |
Řízení způsobu materializace tabulek
Tabulky také nabízejí další kontrolu nad jejich materializací:
- Určete, jak tabulky clusteru pomocí
cluster_by
. Můžete použít liquid clustering k urychlení dotazů. Viz Použijte kapalinové seskupování pro tabulky Delta. - Určete, jak jsou tabulky rozdělovány pomocí
partition_cols
. - Vlastnosti tabulky můžete nastavit při definování zobrazení nebo tabulky. Viz vlastnosti tabulky DLT.
- Pomocí nastavení
path
nastavte umístění úložiště pro data tabulky. Ve výchozím nastavení se data tabulky ukládají do umístění úložiště kanálu, pokudpath
není nastaveno. - V definici schématu můžete použít sloupce vygenerované . Viz Příklad: Určení schématu a sloupců clusteru.
Poznámka
U tabulek, které mají velikost menší než 1 TB, databricks doporučuje řídit organizaci dat DLT. Pokud neočekáváte, že se tabulka rozšíří nad jeden terabajt, neměli byste zadávat sloupce oddílů.
příklad : Zadání schématu a sloupců clusteru
Volitelně můžete zadat schéma tabulky pomocí pythonového StructType
nebo řetězce SQL DDL. Pokud je specifikováno pomocí DDL řetězce, může definice zahrnovat generované sloupce.
Následující příklad vytvoří tabulku s názvem sales
se schématem zadaným pomocí StructType
Pythonu:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Následující příklad určuje schéma tabulky pomocí řetězce DDL, definuje vygenerovaný sloupec a definuje sloupce clusteringu:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
Ve výchozím nastavení DLT odvodí schéma z definice table
, pokud nezadáte schéma.
Příklad : Specifikovat sloupce oddílů
Následující příklad určuje schéma tabulky pomocí řetězce DDL, definuje vygenerovaný sloupec a definuje partiční sloupec:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Příklad: Definování omezení tabulky
Při zadávání schématu můžete definovat primární a cizí klíče. Omezení jsou informativní a nevynucují se. Vizte CONSTRAINT klauzuli v referenční dokumentaci jazyka SQL.
Následující příklad definuje tabulku s omezením primárního a cizího klíče:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Příklad: Definování filtru řádků a masky sloupců
Důležitý
Filtry řádků a masky sloupců jsou ve verzi Public Preview.
Pokud chcete vytvořit materializované zobrazení nebo tabulku Streamování s filtrem řádků a maskou sloupců, použijte klauzuli ROW FILTER a klauzuli MASK. Následující příklad ukazuje, jak definovat materializované zobrazení a streamovací tabulku s filtrem řádků i maskou sloupce:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Další informace o filtrech řádků a maskách sloupců najdete v tématu Publikování tabulek s filtry řádků a maskami sloupců.
Konfigurace streamované tabulky tak, aby ignorovala změny ve zdrojové streamovací tabulce
Poznámka
- Příznak
skipChangeCommits
funguje jenom sspark.readStream
pomocí funkceoption()
. Tento příznak nelze použít ve funkcidlt.read_stream()
. - Příznak
skipChangeCommits
nelze použít, pokud je zdrojová tabulka streamování definována jako cíl funkce apply_changes().
Ve výchozím nastavení streamované tabulky vyžadují zdroje, které umožňují pouze přidávání. Pokud streamovací tabulka používá jako zdroj jinou streamovací tabulku a zdrojová streamovací tabulka vyžaduje aktualizace nebo odstranění, například zpracování gdpr "právo na zapomenutí", můžete při čtení zdrojové tabulky streamování nastavit příznak skipChangeCommits
, aby se tyto změny ignorovaly. Další informace o tomto příznaku naleznete v tématu Ignorovat aktualizace a odstranit.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
vlastnosti Python DLT
Následující tabulky popisují možnosti a vlastnosti, které můžete zadat při definování tabulek a zobrazení pomocí DLT:
@table nebo @view |
---|
name Typ: str Volitelný název tabulky nebo zobrazení. Pokud není definován, název funkce se použije jako název tabulky nebo zobrazení. |
comment Typ: str Volitelný popis tabulky. |
spark_conf Typ: dict Volitelný seznam konfigurací Sparku pro spuštění tohoto dotazu. |
table_properties Typ: dict Volitelný seznam vlastností tabulky. |
path Typ: str Volitelné umístění úložiště pro data tabulky. Pokud není nastaveno, systém ve výchozím nastavení použije úložiště datové cesty. |
partition_cols Typ: a collection of str Volitelná kolekce, například list jednoho nebo více sloupců, které se mají použít k dělení tabulky. |
cluster_by Typ: array Volitelně můžete v tabulce povolit funkci liquid clustering a definovat sloupce pro clustering. Vizte Použití clusteringu typu liquid pro tabulky Delta. |
schema Typ: str nebo StructType Volitelná definice schématu pro tabulku. Schémata lze definovat jako řetězec DDL SQL nebo pomocí StructType Pythonu . |
temporary Typ: bool Vytvořte tabulku, ale nepublikujte metadata pro tabulku. Klíčové slovo temporary dává DLT pokyn k vytvoření tabulky, která je k dispozici pro kanál, ale neměla by být přístupná mimo kanál. Aby se zkrátila doba zpracování, dočasná tabulka se zachová po celou životnost kanálu, který ji vytvoří, a nikoliv jenom pro jednu aktualizaci.Výchozí hodnota je False. |
row_filter (Veřejný náhled)Typ: str Volitelná klauzule filtru řádků pro tabulku. Viz Publikování tabulek s filtry řádků a maskami sloupců. |
Definice tabulky nebo zobrazení |
---|
def <function-name>() Funkce Pythonu, která definuje datovou sadu. Pokud parametr name není nastavený, <function-name> se použije jako název cílové datové sady. |
query Příkaz Spark SQL, který vrací datový rámec Spark Dataset nebo Koalas. Pomocí dlt.read() nebo spark.read.table() proveďte úplné čtení z datové sady definované ve stejném kanálu. Ke čtení externí datové sady použijte funkci spark.read.table() . Ke čtení externích datových sad nelze použít dlt.read() . Vzhledem k tomu, že spark.read.table() lze použít ke čtení interních datových sad, datových sad definovaných mimo aktuální kanál a umožňuje zadat možnosti pro čtení dat, doporučuje databricks místo funkce dlt.read() použít.Když v kanálu definujete datovou sadu, použije ve výchozím nastavení katalog a schéma definované v konfiguraci kanálu. Funkci spark.read.table() můžete použít ke čtení z datové sady definované v pipeline bez specifikace. Pokud chcete například číst z datové sady s názvem customers :spark.read.table("customers") Funkci spark.read.table() můžete také použít ke čtení z tabulky registrované v metastoru tak, že volitelně opravíte název tabulky s názvem databáze:spark.read.table("sales.customers") Použijte dlt.read_stream() nebo spark.readStream.table() k provedení čtení streamováním z datové sady definované ve stejném datovém potrubí. Pokud chcete provést streamované čtení z externí datové sady, použijteFunkce spark.readStream.table() . Vzhledem k tomu, že spark.readStream.table() lze použít ke čtení interních datových sad, datových sad definovaných mimo aktuální kanál a umožňuje zadat možnosti pro čtení dat, doporučuje databricks místo funkce dlt.read_stream() použít.Pokud chcete definovat dotaz v table funkci DLT pomocí syntaxe SQL, použijte funkci spark.sql . Viz Příklad: Přístup k datové sadě pomocí spark.sql . Pokud chcete definovat dotaz ve funkci table DLT pomocí Pythonu, použijte syntaxi PySpark. |
Očekávání |
---|
@expect("description", "constraint") Prohlásit omezení kvality dat, která byla identifikována description . Pokud řádek porušuje očekávání, zahrňte řádek do cílové datové sady. |
@expect_or_drop("description", "constraint") Deklarujte omezení kvality dat, která byla identifikována description . Pokud řádek porušuje očekávání, odstraňte řádek z cílové datové sady. |
@expect_or_fail("description", "constraint") Deklarujte omezení kvality dat, která byla identifikována. description . Pokud nějaký řádek porušuje očekávání, okamžitě zastavte vykonávání. |
@expect_all(expectations) Deklarujte jedno nebo více omezení kvality dat. expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je omezení očekávání. Pokud řádek porušuje jakékoli očekávání, zahrňte řádek do cílové datové sady. |
@expect_all_or_drop(expectations) Deklarujte jedno nebo více omezení kvality dat. expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je omezení očekávání. Pokud řádek porušuje jakékoli očekávání, odstraňte řádek z cílové datové sady. |
@expect_all_or_fail(expectations) Deklarujte jedno nebo více omezení kvality dat. expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je omezení očekávání. Pokud řádek porušuje jakékoli očekávání, okamžitě zastavte provádění. |
Zachytávání dat změn z kanálu změn pomocí Pythonu v DLT
Pomocí funkce apply_changes()
v rozhraní Python API můžete ke zpracování zdrojových dat z datového kanálu změn (CDF) použít funkci zachytávání dat DLT (CDC).
Důležitý
Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu cílové tabulky apply_changes()
musíte zahrnout sloupce __START_AT
a __END_AT
se stejným datovým typem jako pole sequence_by
.
K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table() v rozhraní DLT Python.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Poznámka
Pro zpracování APPLY CHANGES
je výchozím chováním událostí INSERT
a UPDATE
upsert CDC ze zdroje: aktualizujte všechny řádky v cílové tabulce, které odpovídají zadaným klíčům, nebo vložte nový řádek, pokud v cílové tabulce neexistuje odpovídající záznam. Zpracování událostí DELETE
lze zadat pomocí podmínky APPLY AS DELETE WHEN
.
Další informace o zpracování CDC pomocí kanálu změn najdete v tématu Rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí knihovny DLT. Pro příklad použití funkce apply_changes()
, viz Příklad: Zpracování SCD typu 1 a typu 2 se zdrojovými daty CDF.
Důležitý
Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu cílové tabulky apply_changes
musíte zahrnout __START_AT
a __END_AT
sloupce se stejným datovým typem jako pole sequence_by
.
Viz rozhraní API APPLY CHANGES : Zjednodušte zachytávání dat změn pomocí DLT.
Argumenty |
---|
target Typ: str Název tabulky, která se má aktualizovat. Pomocí funkce create_streaming_table() můžete před spuštěním apply_changes() funkce vytvořit cílovou tabulku.Tento parametr je povinný. |
source Typ: str Zdroj dat obsahující záznamy CDC. Tento parametr je povinný. |
keys Typ: list Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce. Můžete zadat jednu z těchto:
Argumenty col() funkcí nemůžou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je povinný. |
sequence_by Typ: str nebo col() Název sloupce určující logické pořadí událostí CDC ve zdrojových datech. DlT používá toto sekvencování ke zpracování událostí změn, které přicházejí mimo pořadí. Můžete zadat jednu z těchto:
Argumenty col() funkcí nemůžou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Zadaný sloupec musí být tříditelný datový typ. Tento parametr je povinný. |
ignore_null_updates Typ: bool Povolit příjem aktualizací obsahujících podmnožinu cílových sloupců Když se událost CDC shoduje s existujícím řádkem a ignore_null_updates je True , sloupce s null si zachovávají své stávající hodnoty v cílovém systému. To platí také pro vnořené sloupce s hodnotou null . Pokud je ignore_null_updates False , stávající hodnoty se přepíšou null hodnotami.Tento parametr je volitelný. Výchozí hodnota je False . |
apply_as_deletes Typ: str nebo expr() Určuje, kdy se má událost CDC považovat za DELETE , nikoli jako upsert. Aby bylo možné zpracovat data mimo pořadí, odstraněný řádek se dočasně zachová jako náhrobek v podkladové tabulce Delta a v metastoru se vytvoří zobrazení, které vyfiltruje tyto náhrobky. Interval uchovávání informací je možné nakonfigurovat pomocípipelines.cdc.tombstoneGCThresholdInSeconds atribut tabulky.Můžete zadat jednu z těchto:
Tento parametr je volitelný. |
apply_as_truncates Typ: str nebo expr() Určuje, kdy by měla být CDC událost považována za úplnou tabulku TRUNCATE . Vzhledem k tomu, že tato klauzule aktivuje úplné zkrácení cílové tabulky, měla by být použita pouze pro konkrétní případy použití vyžadující tuto funkci.Parametr apply_as_truncates je podporován pouze pro SCD typu 1. ScD typu 2 nepodporuje operace zkrácení.Můžete zadat jednu z těchto:
Tento parametr je volitelný. |
column_list except_column_list Typ: list Podmnožina sloupců, které se mají zahrnout do cílové tabulky. Pomocí column_list zadejte úplný seznam sloupců, které se mají zahrnout. Pomocí except_column_list určete sloupce, které se mají vyloučit. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce col() Spark SQL:
Argumenty col() funkcí nemůžou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je volitelný. Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud funkci nepředá žádný column_list nebo except_column_list argument. |
stored_as_scd_type Typ: str nebo int Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2. Nastavte na 1 pro SCD typ 1 nebo 2 pro SCD typ 2.Tato klauzule je nepovinná. Výchozí hodnota je SCD typu 1. |
track_history_column_list track_history_except_column_list Typ: list Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Pomocí track_history_column_list určete úplný seznam sloupců, které se mají sledovat. Používattrack_history_except_column_list určit sloupce, které se mají vyloučit ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce col() Spark SQL:
Argumenty col() funkcí nemůžou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je volitelný. Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud není track_history_column_list nebotrack_history_except_column_list argument je předán funkci. |
Změna zachytávání dat ze snímků databáze pomocí Pythonu v DLT
Důležitý
Rozhraní API APPLY CHANGES FROM SNAPSHOT
je ve verzi Public Preview.
Pomocí funkce apply_changes_from_snapshot()
v rozhraní Python API můžete ke zpracování zdrojových dat ze snímků databáze použít funkci zachytávání dat změn DLT (CDC).
Důležitý
Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu cílové tabulky apply_changes_from_snapshot()
musíte zahrnout také sloupce __START_AT
a __END_AT
se stejným datovým typem jako pole sequence_by
.
K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table() v rozhraní DLT Python.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Poznámka
Při zpracování APPLY CHANGES FROM SNAPSHOT
je výchozím chováním vložit nový řádek, pokud v cíli neexistuje odpovídající záznam se stejnými klíči. Pokud existuje odpovídající záznam, aktualizuje se pouze v případě, že se změnily některé hodnoty v řádku. Řádky s klíči, které jsou přítomné v cíli, ale již nejsou ve zdroji, se odstraní.
Další informace o zpracování CDC pomocí snímků najdete v části s rozhraními API "APPLY CHANGES": Zjednodušte snímání změn dat pomocí DLT (a). Příklady použití funkce apply_changes_from_snapshot()
najdete v příkladech periodického příjmu snímků a historického příjmu snímků.
Argumenty |
---|
target Typ: str Název tabulky, která se má aktualizovat. Pomocí funkce create_streaming_table() můžete před spuštěním apply_changes() funkce vytvořit cílovou tabulku.Tento parametr je povinný. |
source Typ: str nebo lambda function Název tabulky nebo zobrazení pro pravidelné snímky nebo funkci lambda Pythonu, která vrací datový rámec snímku, který se má zpracovat, a verzi snímku. Viz Implementujte argument source .Tento parametr je povinný. |
keys Typ: list Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce. Můžete zadat jednu z těchto:
Argumenty col() funkcí nemůžou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je povinný. |
stored_as_scd_type Typ: str nebo int Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2. Nastavte na 1 pro SCD typ 1 nebo 2 pro SCD typ 2.Tato klauzule je nepovinná. Výchozí hodnota je SCD typu 1. |
track_history_column_list track_history_except_column_list Typ: list Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Pomocí track_history_column_list určete úplný seznam sloupců, které se mají sledovat. Používattrack_history_except_column_list určit sloupce, které se mají vyloučit ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce col() Spark SQL:
Argumenty col() funkcí nemůžou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je volitelný. Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud není track_history_column_list nebotrack_history_except_column_list argument je předán funkci. |
Implementovat argument source
Funkce apply_changes_from_snapshot()
obsahuje argument source
. U zpracování historických snímků se očekává, že argument source
bude funkcí lambda Pythonu, která vrátí dvě hodnoty do funkce apply_changes_from_snapshot()
: datový rámec Pythonu obsahující data snímku, která se mají zpracovat, a verzi snímku.
Následuje podpis funkce lambda:
lambda Any => Optional[(DataFrame, Any)]
- Argumentem funkce lambda je nejnovější zpracovaná verze snímku.
- Návratová hodnota funkce lambda je
None
nebo řazená kolekce členů dvou hodnot: První hodnota řazené kolekce členů je datový rámec obsahující snímek, který se má zpracovat. Druhá hodnota n-tice je verze snímku, která představuje logické pořadí snímku.
Příklad, který implementuje a volá funkci lambda:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Modul runtime DLT provede následující kroky pokaždé, když se aktivuje kanál obsahující funkci apply_changes_from_snapshot()
:
- Spustí funkci
next_snapshot_and_version
, která načte další datový rámec snímku a odpovídající verzi snímku. - Pokud se žádný datový rámec nevrátí, spuštění se ukončí a aktualizace datového toku se označí jako dokončená.
- Rozpozná změny v novém snímku a postupně je použije na cílovou tabulku.
- Vrátí se ke kroku 1, aby se načetl další snímek a jeho verze.
omezení
Rozhraní DLT Python má následující omezení:
Funkce pivot()
není podporována. Operace pivot
ve Sparku vyžaduje dychtivé načítání vstupních dat pro výpočet výstupního schématu. Tato funkce není v DLT podporována.