Caricare ed elaborare i dati in modo incrementale con flussi DLT
Questo articolo illustra i flussi e come usare i flussi nelle pipeline DLT per elaborare in modo incrementale i dati da un'origine a una tabella di streaming di destinazione. In DLT i flussi vengono definiti in due modi:
- Un flusso viene definito automaticamente quando si crea una query che aggiorna una tabella di streaming.
- DLT offre anche funzionalità per definire in modo esplicito i flussi per un'elaborazione più complessa, ad esempio l'aggiunta a una tabella di streaming da più origini di streaming.
Questo articolo illustra i flussi impliciti creati quando si definisce una query per aggiornare una tabella di streaming e vengono quindi forniti dettagli sulla sintassi per definire flussi più complessi.
Che cos'è un flusso?
In DLT un flusso è una query di streaming che elabora i dati di origine in modo incrementale per aggiornare una tabella di streaming di destinazione. La maggior parte dei set di dati DLT creati in una pipeline definisce il flusso come parte della query e non richiede la definizione esplicita del flusso. Ad esempio, si crea una tabella di streaming in DLT in un singolo comando DDL anziché usare istruzioni di tabella e flusso separate per creare la tabella di streaming:
Nota
Questo esempio CREATE FLOW
viene fornito solo a scopo illustrativo e include parole chiave non valide per la sintassi DLT.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Oltre al flusso predefinito definito da una query, le interfacce DLT Python e SQL forniscono funzionalità di accodamento. Il flusso di append supporta l'elaborazione che richiede la lettura dei dati da più sorgenti di streaming per aggiornare una tabella di streaming unica. Ad esempio, è possibile utilizzare la funzionalità di accodamento del flusso quando si dispone di una tabella e un flusso di streaming esistenti e si desidera aggiungere una nuova origine di streaming che scrive in questa tabella di streaming esistente.
Utilizzare append flow per scrivere in una tabella di streaming da più flussi di origine
Usare il decoratore @append_flow
nell'interfaccia Python o la clausola CREATE FLOW
nell'interfaccia SQL per scrivere in una tabella di streaming da più fonti di streaming. Usare il flusso di aggiunta per l'elaborazione di attività come le seguenti:
- Aggiungere origini di streaming che aggiungono dati a una tabella di streaming esistente senza richiedere un aggiornamento completo. Ad esempio, potrebbe essere presente una tabella che combina i dati a livello di area di ogni area in cui si opera. Man mano che vengono implementate nuove aree, è possibile aggiungere i dati della nuova area alla tabella senza eseguire un aggiornamento completo. Vedi Esempio: Scrivere in una tabella di streaming da più argomenti Kafka.
- Aggiornare una tabella di streaming aggiungendo dati cronologici mancanti (riempimento). Ad esempio, si dispone di una tabella di streaming esistente scritta da un topic Apache Kafka. I dati cronologici sono archiviati anche in una tabella che è necessario inserire esattamente una volta nella tabella di streaming e non è possibile trasmettere i dati perché l'elaborazione include l'esecuzione di un'aggregazione complessa prima di inserire i dati. Consulta Esempio: Esegui un backfill di dati una tantum.
- Combinare i dati provenienti da più origini e scriverli su una singola tabella di streaming anziché usare la clausola
UNION
in una query. L'uso dell'elaborazione del flusso di accodamento al posto diUNION
consente di aggiornare la tabella di destinazione in modo incrementale senza dover eseguire un aggiornamento completo. Vedi Esempio: Utilizzare l'elaborazione del flusso di aggiunta anzichéUNION
.
La destinazione per i record restituiti dall'elaborazione del flusso di accodamento può essere una tabella esistente o una nuova tabella. Per le query Python, usare la funzione create_streaming_table() per creare una tabella di destinazione.
Importante
- Se è necessario definire vincoli di qualità dei dati con aspettative, si definiscono le aspettative sulla tabella di destinazione come parte della funzione
create_streaming_table()
o su una definizione di tabella esistente. Non è possibile definire le aspettative nella definizione di@append_flow
. - I flussi vengono identificati da un nome di flusso e questo nome viene usato per identificare i checkpoint di streaming. L'uso del nome del flusso per identificare il checkpoint indica quanto segue:
- Se un flusso esistente in una pipeline viene rinominato, il checkpoint non viene mantenuto e il flusso rinominato si trasforma in un flusso completamente nuovo.
- Non è possibile riutilizzare un nome di flusso in una pipeline, perché il checkpoint esistente non corrisponde alla nuova definizione del flusso.
Di seguito è riportata la sintassi per @append_flow
:
Pitone
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
append_target BY NAME
SELECT * FROM
source;
Esempio: Scrivere in una tabella di streaming da più argomenti Kafka
Gli esempi seguenti creano una tabella di streaming denominata kafka_target
e scrive in tale tabella di streaming da due argomenti Kafka:
Pitone
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Per altre informazioni sulla funzione con valori di tabella read_kafka()
usata nelle query SQL, vedere read_kafka nelle informazioni di riferimento sul linguaggio SQL.
In Python è possibile creare più flussi a livello di codice destinati a una singola tabella. Nell'esempio seguente viene illustrato questo modello per un elenco di argomenti Kafka.
Nota
Questo modello ha gli stessi requisiti dell'uso di un ciclo for
per creare tabelle. È necessario passare in modo esplicito un valore Python alla funzione che definisce il flusso. Vedere Creare tabelle in un ciclo for
.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Esempio: Eseguire un backfill di dati una tantum
Gli esempi seguenti eseguono una query per aggiungere dati cronologici a una tabella di streaming:
Nota
Per garantire un vero backfill una tantum quando la query di backfill fa parte di una pipeline che viene eseguita su base pianificata o continuativa, rimuovere la query dopo che la pipeline è stata eseguita una sola volta. Per aggiungere nuovi dati se arrivano nella directory backfill, lasciare la query lì.
Pitone
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
STREAM read_files(
"path/to/sourceDir",
format => "csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
STREAM read_files(
"path/to/backfill/data/dir",
format => "csv"
);
Esempio di : usare l'elaborazione del flusso di accodamento anziché UNION
Anziché usare una query con una clausola UNION
, è possibile utilizzare query a flusso continuo per combinare più origini e scrivere su una singola tabella di streaming. L'uso di query di accodamento anziché UNION
consente di accodare a una tabella di streaming da più origini senza eseguire un aggiornamento completo.
L'esempio Python seguente include una query che combina più origini dati con una clausola UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Gli esempi seguenti sostituiscono la query UNION
con query di flusso di aggiunte.
Pitone
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);