Adatok növekményes betöltése és feldolgozása DLT-folyamatokkal
Ez a cikk bemutatja, hogy mik azok a folyamatok, és hogyan használhatja a DLT-folyamatokban lévő folyamatokat az adatok forrásból egy célstreamelési táblába történő növekményes feldolgozásához. A DLT-ben a folyamatok kétféleképpen vannak definiálva:
- Amikor létrehoz egy streamelési táblát frissítő lekérdezést, a folyamat automatikusan meghatározásra kerül.
- A DLT emellett olyan funkciókat is biztosít, amelyek explicit módon definiálják a folyamatokat összetettebb feldolgozáshoz, például több streamforrásból származó streamtáblához való hozzáfűzést.
Ez a cikk azokat az implicit folyamatokat ismerteti, amelyek akkor jönnek létre, amikor lekérdezést határoz meg egy streamelési tábla frissítéséhez, majd részletesen ismerteti a szintaxist az összetettebb folyamatok definiálásához.
Mi az a folyamat?
A DLT-ben a folyamat egy streamelési lekérdezés, amely a forrásadatokat növekményesen dolgozza fel a célstreamelési tábla frissítéséhez. A folyamatokban létrehozott DLT-adathalmazok többsége a lekérdezés részeként határozza meg a folyamatot, és nincs szükség a folyamat explicit meghatározására. Létrehozhat például egy streamelési táblát a DLT-ben egyetlen DDL-parancsban ahelyett, hogy külön táblázatokat és folyamatutasságokat használ a streamelési tábla létrehozásához:
Jegyzet
Ez a CREATE FLOW
példa csak szemléltető célokra szolgál, és olyan kulcsszavakat tartalmaz, amelyek nem érvényesek A DLT szintaxisa.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
A lekérdezés által definiált alapértelmezett folyamat mellett a DLT Python és SQL felületei hozzáfűzési folyamat funkciót is biztosítanak. A hozzáfűzési folyamat támogatja a feldolgozást, amelyhez több streamforrás adatainak beolvasása szükséges egyetlen streamelési tábla frissítéséhez. Használhatja például a hozzáfűzési folyamat funkciót, ha már rendelkezik egy meglévő streamelési táblával és -folyamatokkal, és szeretne hozzáadni egy új streamforrást, amely ebbe a meglévő streamelési táblába ír.
A hozzáfűzési folyamat használata több forrásstreamből származó streamelési táblázatba való íráshoz
A @append_flow
dekoratőrrel a Python-felületen vagy az SQL-felületen található CREATE FLOW
záradékkal több streamforrásból származó streamelési táblába írhat. Használja a hozzáfűzési folyamatot a következő feladatok feldolgozásához:
- Olyan streamelési források hozzáadása, amelyek adatokat fűznek egy meglévő streamelési táblához teljes frissítés nélkül. Lehet például, hogy van egy táblázata, amely a regionális adatokat egyesíti minden régióból, amelyben dolgozik. Az új régiók bevezetésekor teljes frissítés nélkül hozzáadhatja az új régióadatokat a táblához. Lásd: Példa: Írás streamelési táblába több Kafka-témakörből.
- Frissítsen egy streamelési táblát hiányzó előzményadatok hozzáfűzésével (backfilling). Például van egy meglévő folyamatosan frissülő táblázat, amelybe egy Apache Kafka téma ír. Olyan táblában tárolt előzményadatokat is tárol, amelyeket pontosan egyszer kell beszúrnia a streamelési táblába, és nem tudja streamelni az adatokat, mert a feldolgozás magában foglalja az adatok beszúrása előtt végzett összetett összesítést. Lásd : Példa: Egy egyszeri adatvisszatöltés futtatása.
- Több forrásból származó adatok egyesítése és írás egyetlen streamelési táblába a lekérdezés
UNION
záradékának használata helyett. A hozzáfűzési folyamat használataUNION
helyett lehetővé teszi, hogy fokozatosan frissítse a céltáblát anélkül, hogy lefuttatna egy teljes frissítést. Lásd Példa: Használja a hozzáfűzési folyamatfeldolgozástUNION
helyett.
A hozzáfűzési folyamat feldolgozásának rekordkimenetének célja lehet egy meglévő tábla vagy egy új tábla. Python-lekérdezések esetén a create_streaming_table() függvénnyel hozzon létre egy céltáblát.
Fontos
- Ha elvárásokkalkell meghatároznia az adatminőségre vonatkozó korlátozásokat, az elvárásokat a céltáblán a
create_streaming_table()
függvény részeként vagy egy meglévő tábladefinícióban határozhatja meg. A@append_flow
definícióban nem definiálhat elvárásokat. - A folyamatokat egy folyamatnévazonosítja, és ez a név a streamelési ellenőrzőpontok azonosítására szolgál. A folyamatnév használata az ellenőrzőpont azonosításához a következőket jelenti:
- Ha egy meglévő folyamatot egy folyamatláncban átneveznek, az ellenőrzőpont nem öröklődik, és az átnevezett folyamat gyakorlatilag teljesen új folyamatnak számít.
- A folyamat neve nem használható újra, mert a meglévő ellenőrzőpont nem felel meg az új folyamatdefiníciónak.
A @append_flow
szintaxisa a következő:
Piton
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
append_target BY NAME
SELECT * FROM
source;
Példa: Írás streamelési táblába több Kafka-témakörből
Az alábbi példák létrehoznak egy kafka_target
nevű streamelési táblát, és két Kafka-témakörből írnak a streamelési táblába:
Piton
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Az SQL-lekérdezésekben használt read_kafka()
táblaértékű függvényről az SQL nyelvi referenciájában read_kafka talál további információt.
A Pythonban programozott módon hozhat létre több folyamatot, amelyek egyetlen táblát céloznak meg. Az alábbi példa ezt a mintát mutatja be a Kafka-témakörök listájához.
Jegyzet
Ez a minta ugyanazokkal a követelményekkel rendelkezik, mint egy for
hurok használata táblák létrehozásához. Explicit módon át kell adnia egy Python-értéket a folyamatot meghatározó függvénynek. Lásd a -t: Táblázatok létrehozása egy for
ciklusban.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Példa: Egyszeri adatvisszatöltés futtatása
Az alábbi példák egy lekérdezést futtatnak az előzményadatok streamelési táblához való hozzáfűzéséhez:
Jegyzet
Ha gondoskodni szeretne arról, hogy a visszatöltési lekérdezés egy ütemezett vagy folyamatosan futó folyamat része, akkor a folyamat egyszeri futtatása után távolítsa el a lekérdezést. Ha új adatokat szeretne hozzáfűzni, ha az a backfill könyvtárba érkezik, hagyja a lekérdezést helyben.
Piton
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
STREAM read_files(
"path/to/sourceDir",
format => "csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
STREAM read_files(
"path/to/backfill/data/dir",
format => "csv"
);
Példa: Használja a hozzáfűzési folyamat feldolgozását UNION
helyett
Ahelyett, hogy UNION
záradékkal rendelkező lekérdezést használ, a hozzáfűző folyamatlekérdezésekkel több forrást egyesíthet, és egyetlen streamelési táblába írhat. A hozzáfűzési művelet lekérdezések használata UNION
helyett lehetővé teszi, hogy több forrásból is hozzáfűzzön egy folyamatosan frissített táblához anélkül, hogy egy teljes frissítést futtatna.
A következő Python-példa egy olyan lekérdezést tartalmaz, amely több adatforrást egyesít egy UNION
záradékkal:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Az alábbi példák a UNION
lekérdezést fűzőfolyamat-lekérdezésekre cserélik:
Piton
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);