Přírůstkové načítání a zpracování dat pomocí toků DLT
Tento článek vysvětluje, co jsou toky a jak můžete pomocí toků v kanálech DLT přírůstkově zpracovávat data ze zdroje do cílové tabulky streamování. V DLT jsou toky definovány dvěma způsoby:
- Tok se definuje automaticky při vytváření dotazu, který aktualizuje tabulku streamování.
- DLT také nabízí funkce pro explicitní definování toků pro složitější zpracování, jako je přidávání do streamovací tabulky z více streamovacích zdrojů.
Tento článek popisuje implicitní toky, které se vytvoří při definování dotazu pro aktualizaci streamované tabulky, a poté poskytuje podrobnosti o syntaxi pro definování složitějších toků.
Co je tok?
V DLT je tok streamovací dotaz, který zpracovává zdrojová data postupně k aktualizaci cílové streamovací tabulky. Většina datových sad DLT, které vytvoříte v kanálu, definuje tok jako součást dotazu a nevyžaduje explicitní definování toku. Například tabulku streamování vytvoříte v DLT v jednom příkazu DDL místo použití samostatných příkazů tabulky a toku k vytvoření tabulky streamování:
Poznámka
Tento CREATE FLOW
příklad je k dispozici pouze pro ilustrativní účely a obsahuje klíčová slova, která nejsou platná syntaxe DLT.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Kromě výchozího toku definovaného dotazem poskytují rozhraní DLT Python a SQL přidávací tok funkce. Proces připojování toku umožňuje zpracování, které vyžaduje čtení dat z více streamovaných zdrojů pro aktualizaci jedné streamovací tabulky. Funkci připojení toku dat můžete použít například v případě, že máte existující streamovou tabulku a tok a chcete přidat nový streamovací zdroj, který zapisuje do této existující streamové tabulky.
Použití přidávacího toku k zápisu do streamované tabulky z více zdrojových datových proudů
Pomocí dekorátoru @append_flow
v rozhraní Pythonu nebo klauzule CREATE FLOW
v rozhraní SQL můžete zapisovat do streamované tabulky z více streamů. Použijte tok přidání ke zpracování úloh, jako je například následující:
- Přidejte streamované zdroje, které připojují data k existující streamované tabulce bez nutnosti úplné aktualizace. Můžete mít například tabulku, která kombinuje regionální data z každé oblasti, ve které pracujete. Při nasazení nových oblastí můžete do tabulky přidat nová data oblasti, aniž byste provedli úplnou aktualizaci. Viz Příklad: Zápis do streamované tabulky z více témat Kafka.
- Aktualizujte streamovací tabulku přidáním chybějících historických dat (obnovení). Máte například existující streamovací tabulku napsanou tématem Apache Kafka. Máte také historická data uložená v tabulce, kterou potřebujete vložit přesně jednou do streamované tabulky, a nemůžete je streamovat, protože zpracování zahrnuje provedení komplexní agregace před vložením dat. Viz příklad: Spuštění jednorázového obnovení dat.
- Kombinovat data z více zdrojů a zapisovat do jedné streamovací tabulky místo použití klauzule
UNION
v dotazu. Použití zpracování přírůstkového toku místoUNION
umožňuje aktualizovat cílovou tabulku přírůstkově bez spuštění úplné aktualizace. Viz Příklad: Použijte zpracování připojovacího toku místoUNION
.
Cílem výstupu záznamů při zpracování toku připojování může být buď existující, nebo nová tabulka. V případě dotazů v Pythonu vytvořte cílovou tabulku pomocí funkce create_streaming_table().
Důležitý
- Pokud potřebujete definovat omezení kvality dat s očekáváními, definujte očekávání cílové tabulky jako součást funkce
create_streaming_table()
nebo existující definice tabulky. V definici@append_flow
nelze definovat očekávání. - Toky jsou identifikované názvem toku a tento název se používá k identifikaci kontrolních bodů streamování. Použití názvu toku k identifikaci kontrolního bodu znamená následující:
- Pokud se stávající tok v potrubí přejmenuje, kontrolní bod se nepřenese a přejmenovaný tok je v podstatě novým tokem.
- Název toku v kanálu nemůžete znovu použít, protože stávající kontrolní bod neodpovídá nové definici toku.
Následuje syntaxe @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
append_target BY NAME
SELECT * FROM
source;
příklad : Zápis do streamované tabulky z několika témat Kafka
Následující příklady vytvoří streamingovou tabulku s názvem kafka_target
a zapisuje do ní ze dvou Kafka témat.
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Další informace o tabulkové funkci read_kafka()
, která se používá v dotazech SQL, najdete v referenční dokumentaci jazyka SQL u read_kafka.
V Pythonu můžete programově vytvořit více toků, které cílí na jednu tabulku. Následující příklad ukazuje tento vzor pro seznam témat Kafka.
Poznámka
Tento model má stejné požadavky jako použití smyčky for
k vytváření tabulek. Musíte explicitně předat funkci definující tok hodnotu Pythonového typu. Viz Vytvořte tabulky ve smyčce for
.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
příklad : Spuštění jednorázového obnovení dat
Následující příklady spustí dotaz, který připojí historická data do streamované tabulky:
Poznámka
Aby bylo zajištěno skutečné jednorázové doplnění, pokud je dotaz backfill součástí kanálu, který běží buď podle plánu, nebo nepřetržitě, odeberte tento dotaz po jednom spuštění kanálu. Pro přidání nových dat, pokud dorazí do adresáře "backfill", ponechte dotaz na místě.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
STREAM read_files(
"path/to/sourceDir",
format => "csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
STREAM read_files(
"path/to/backfill/data/dir",
format => "csv"
);
Příklad : Místo UNION
použijte append flow processing.
Místo použití dotazu s klauzulí UNION
můžete pomocí přidávacích dotazů toku kombinovat více zdrojů a zapisovat do jedné streamovací tabulky. Použití dotazů pro přidávání dat místo UNION
umožňuje přidávat do streamované tabulky z více zdrojů bez spuštění celkové aktualizace.
Následující příklad Pythonu obsahuje dotaz, který kombinuje více zdrojů dat s klauzulí UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Následující příklady nahrazují dotaz UNION
přidávacími dotazy toku:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);