使用 DLT 流程以累加方式載入和處理數據
本文說明什麼是流程,以及如何使用 DLT 管線中的流程,以累加方式將數據從來源處理到目標串流數據表。 在 DLT 中,流程會以兩種方式定義:
- 當您建立更新串流數據表的查詢時,會自動定義流程。
- DLT 也提供功能來明確定義流程,以進行更複雜的處理,例如從多個串流來源附加至串流數據表。
本文討論當您定義查詢以更新串流數據表時所建立的隱含流程,然後提供語法的詳細數據來定義更複雜的流程。
什麼是流程?
在 DLT 中,流程 是串流查詢,會以累加方式處理源數據以更新目標串流數據表。 您在管線中建立的大部分 DLT 數據集都會將流程定義為查詢的一部分,而且不需要明確定義流程。 例如,您會在 DLT 中以單一 DDL 命令建立串流數據表,而不是使用個別的數據表和流程語句來建立串流數據表:
注意
此 CREATE FLOW
範例僅供說明之用,並包含無效 DLT 語法的關鍵詞。
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
除了查詢所定義的預設流程之外,DLT Python 和 SQL 介面還提供 附加流程 功能。 附加流程支援需要從多個串流來源讀取數據的處理,以更新單一串流數據表。 例如,當您有現有的串流數據表和流程,而且想要新增寫入此現有串流數據表的串流來源時,您可以使用附加流程功能。
使用附加流程從多個來源數據流寫入串流數據表
使用 Python 介面中的 @append_flow
裝飾器,或 SQL 介面中的 CREATE FLOW
子句,從多個串流來源寫入流式表。 使用附加流程來處理工作,例如:
- 新增串流來源,以將數據附加至現有的串流數據表,而不需要完整重新整理。 例如,您可能會有一個結合您所營運的每個區域的區域數據的表格。 隨著新區域推出,您可以將新的區域數據新增至數據表,而不需要執行完整重新整理。 請參閱 範例:從多個 Kafka 主題寫入串流資料表。
- 藉由附加遺漏的歷程記錄數據來更新串流數據表(回填)。 例如,您有一個由 Apache Kafka 主題寫入的現有串流表。 您還有歷史數據存儲在一個資料表中,您需要將其精確插入至串流資料表一次,而且您無法直接串流數據,因為在插入數據之前,您的處理需要執行複雜的匯總。 請參閱 範例:執行一次性數據回填。
- 結合多個來源的數據並寫入至單一串流數據表,而不是在查詢中使用
UNION
子句。 使用追加流程處理而不是UNION
,您可以以增量方式更新目標資料表,而不需要執行 完整重新整理更新。 請參閱 範例:使用附加流程處理,而不是UNION
。
附加流程處理所輸出記錄的目標可以是現有的數據表或新的數據表。 針對 Python 查詢,請使用 create_streaming_table() 函式來建立目標數據表。
重要
- 如果您需要使用 期望來定義資料品質限制條件,請在目標數據表上定義相關期望,作為
create_streaming_table()
函數或現有數據表定義的一部分。 您無法在@append_flow
定義中定義預期。 - 流程是由 流程名稱來識別,而此名稱則用來識別串流檢查點。 使用流程名稱來識別檢查點表示下列各項:
- 如果管線中的現有流程已重新命名,檢查點不會延續,且重新命名的流程實際上是全新的流程。
- 您無法重複使用管線中的流程名稱,因為現有的檢查點不符合新的流程定義。
以下是 @append_flow
的語法:
蟒
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
append_target BY NAME
SELECT * FROM
source;
範例:從多個 Kafka 主題寫入串流數據表
下列範例會建立名為 kafka_target
的串流數據表,並從兩個 Kafka 主題寫入該串流數據表:
蟒
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
若要深入瞭解 SQL 查詢中使用的 read_kafka()
數據表值函式,請參閱 SQL 語言參考中的 read_kafka。
在 Python 中,您可以透過程式設計方式建立以單一數據表為目標的多個流程。 下列範例顯示 Kafka 主題清單的這個模式。
注意
此模式的需求與使用 for
迴圈來建立數據表的需求相同。 您必須明確地將 Python 值傳遞至定義流程的函式。 請參閱 在 for
循環中建立資料表。
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
範例:執行一次性數據回填
下列範例會執行查詢,將歷程記錄數據附加至串流數據表:
注意
為了確保當回填查詢是屬於定期或持續運行的管線的一部分時能夠真正實現一次性回填,請在管線運行一次後移除該查詢。 若要在到達回填目錄時附加新數據,請將查詢保留原位。
蟒
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
STREAM read_files(
"path/to/sourceDir",
format => "csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
STREAM read_files(
"path/to/backfill/data/dir",
format => "csv"
);
範例:使用附加流程處理,而不是使用 UNION
您可以使用附加流程查詢來結合多個來源並寫入至單一串流數據表,而不是使用具有 UNION
子句的查詢。 使用附加式流程查詢,而不是 UNION
,可讓您從多個來源將資料附加至串流數據表,避免執行 完整重新整理。
下列 Python 範例包含結合多個數據源與 UNION
子句的查詢:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
下列範例會將 UNION
查詢取代為附加流程查詢:
蟒
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);