Esegui il tuo primo processo di Structured Streaming
Questo articolo fornisce esempi di codice e spiegazione dei concetti di base necessari per eseguire le prime query Structured Streaming in Azure Databricks. È possibile usare Structured Streaming per carichi di lavoro di elaborazione quasi in tempo reale e incrementali.
Structured Streaming è una delle diverse tecnologie che alimentano le tabelle di streaming in DLT. Databricks consiglia di usare DLT per tutti i nuovi carichi di lavoro ETL, inserimento e Structured Streaming. Vedere Che cos'è DLT?.
Nota
Sebbene DLT fornisca una sintassi leggermente modificata per dichiarare le tabelle di streaming, la sintassi generale per la configurazione delle letture e delle trasformazioni di streaming si applica a tutti i casi d'uso di streaming in Azure Databricks. DLT semplifica anche lo streaming gestendo informazioni sullo stato, metadati e numerose configurazioni.
Usare il caricatore automatico per leggere i dati di streaming dall'archiviazione di oggetti
L'esempio seguente illustra il caricamento di dati JSON con il caricatore automatico, che usa cloudFiles
per indicare il formato e le opzioni. L'opzione schemaLocation
abilita l'inferenza e l'evoluzione dello schema. Incollare il codice seguente in una cella del notebook di Databricks ed eseguire la cella per creare un DataFrame di streaming denominato raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Analogamente ad altre operazioni di lettura in Azure Databricks, la configurazione di una lettura in streaming non carica effettivamente i dati. È necessario attivare un'azione sui dati prima dell'inizio del flusso.
Nota
La chiamata display()
a un DataFrame di streaming avvia un processo di streaming. Per la maggior parte dei casi d'uso di Structured Streaming, l'azione che attiva un flusso dovrebbe essere scrivere dati in un sink. Vedere Considerazioni sulla produzione per Structured Streaming.
Eseguire una trasformazione di streaming
Structured Streaming supporta la maggior parte delle trasformazioni disponibili in Azure Databricks e Spark SQL. È anche possibile caricare i modelli MLflow come funzioni definite dall'utente ed eseguire stime di streaming come trasformazione.
L'esempio di codice seguente completa una semplice trasformazione per arricchire i dati JSON inseriti con informazioni aggiuntive usando le funzioni Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
L'oggetto risultante transformed_df
contiene istruzioni della query per caricare e trasformare ogni record man mano che arriva nella fonte di dati.
Nota
Structured Streaming considera le sorgenti di dati come set di dati illimitati o infiniti. Di conseguenza, alcune trasformazioni non sono supportate nei carichi di lavoro Structured Streaming perché richiedono l'ordinamento di un numero infinito di elementi.
La maggior parte delle aggregazioni e molti join richiedono la gestione delle informazioni sullo stato con limiti, finestre e modalità di output. Consulta Applicare filigrane per controllare le soglie di elaborazione dei dati.
Eseguire una scrittura di batch incrementale in Delta Lake
L'esempio seguente scrive in Delta Lake usando un percorso di file specificato e un checkpoint.
Importante
Assicurati sempre di specificare una posizione di checkpoint univoca per ogni scrittore di streaming configurato. Il checkpoint fornisce l'identità univoca per il tuo flusso, monitorando tutti i record elaborati e le informazioni di stato associate alla tua query di streaming.
L'impostazione availableNow
per il trigger indica a Structured Streaming di elaborare tutti i record non elaborati in precedenza dal set di dati di origine e quindi arrestarli, in modo da poter eseguire in modo sicuro il codice seguente senza doversi preoccupare di lasciare in esecuzione un flusso:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
In questo esempio non arrivano nuovi record nell'origine dati, quindi l'esecuzione ripetuta di questo codice non inserisce nuovi record.
Avviso
L'esecuzione di Structured Streaming può impedire la chiusura automatica delle risorse di calcolo. Per evitare costi imprevisti, assicurarsi di terminare le query di streaming.
Leggere i dati da Delta Lake, trasformare e scrivere in Delta Lake
Delta Lake offre un ampio supporto per l'uso di Structured Streaming sia come origine che come sink. Consulta le operazioni di lettura e scrittura in streaming delle tabelle Delta.
Nell'esempio seguente viene illustrata la sintassi di esempio per caricare in modo incrementale tutti i nuovi record da una tabella Delta, unirli con uno snapshot di un'altra tabella Delta e scriverli in una tabella Delta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
È necessario disporre di autorizzazioni appropriatamente configurate per leggere le tabelle di origine e scrivere nelle tabelle di destinazione, nonché nel percorso del checkpoint specificato. Compilare tutti i parametri indicati con parentesi angolari (<>
) utilizzando i valori pertinenti per le origini e le destinazioni dei dati.
Nota
DLT fornisce una sintassi completamente dichiarativa per la creazione di pipeline Delta Lake e gestisce automaticamente proprietà come trigger e checkpoint. Vedere Che cos'è DLT?.
Leggere i dati da Kafka, trasformare e scrivere in Kafka
Apache Kafka e altri bus di messaggistica offrono una certa latenza più bassa disponibile per set di dati di grandi dimensioni. È possibile usare Azure Databricks per applicare trasformazioni ai dati inseriti da Kafka e quindi scrivere nuovamente i dati in Kafka.
Nota
La scrittura di dati nell'archiviazione oggetti cloud comporta un sovraccarico di latenza aggiuntivo. Se si desidera archiviare dati da un bus di messaggistica in Delta Lake, ma è necessaria la latenza più bassa possibile per i carichi di lavoro di streaming, Databricks consiglia di configurare processi di streaming separati per inserire dati nella lakehouse e applicare trasformazioni quasi in tempo reale per i sink del bus di messaggistica downstream.
L'esempio di codice seguente illustra un modello semplice per arricchire i dati di Kafka unendo i dati in una tabella Delta e quindi scrivendo di nuovo in Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
È necessario disporre delle autorizzazioni appropriate configurate per l'accesso al servizio Kafka. Compilare tutti i parametri indicati con parentesi angolari (<>
) usando i valori pertinenti per le origini dati e i sink. Vedere Elaborazione del flusso con Apache Kafka e Azure Databricks.