Leggere le informazioni sullo stato di Structured Streaming
In Databricks Runtime 14.3 LTS e versioni successive nel calcolo configurato con modalità di accesso dedicato o senza isolamento, è possibile usare operazioni dataframe o funzioni con valori di tabella SQL per eseguire query sui dati e sui metadati dello stato di Structured Streaming. È possibile usare queste funzioni per osservare le informazioni sullo stato per le query con stato Structured Streaming, che possono essere utili per il monitoraggio e il debug.
È necessario avere accesso in lettura al percorso del checkpoint per una query in streaming al fine di eseguire query sui dati di stato o sui metadati. Le funzioni descritte in questo articolo forniscono l'accesso in sola lettura ai dati e ai metadati di stato. È possibile usare solo la semantica di lettura batch per eseguire query sulle informazioni sullo stato.
Nota
Non è possibile eseguire query sulle informazioni sullo stato per pipeline DLT, tabelle di streaming o viste materializzate. Non è possibile eseguire query sulle informazioni di stato utilizzando l'elaborazione serverless o l'elaborazione configurata con la modalità di accesso standard.
Leggere l'archivio di stato di Structured Streaming
È possibile leggere le informazioni sull'archivio di stato per le query Structured streaming eseguite in qualsiasi runtime di Databricks supportato. Usare la sintassi seguente:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Sono supportate le configurazioni seguenti:
Opzione | Tipo | Valore predefinito | Descrizione |
---|---|---|---|
batchId |
Lungo | ID del batch più recente | Rappresenta il batch di destinazione da cui leggere. Specificare questa opzione per eseguire una query sulle informazioni relative a uno stato precedente della query. Il batch deve essere sottoposto a commit ma non ancora pulito. |
operatorId |
Lungo | 0 | Rappresenta l'operatore di destinazione dal quale leggere. Questa opzione viene usata quando la query usa più operatori con stato. |
storeName |
Stringa | "default" | Rappresenta il nome dell'archivio di stato di destinazione da cui leggere. Questa opzione viene usata quando l'operatore con stato utilizza più istanze della memoria di stato. È necessario specificare storeName o joinSide per un join stream-steam, ma non entrambi. |
joinSide |
Stringa ("sinistra" o "destra") | Rappresenta il lato di destinazione da cui leggere. Questa opzione viene utilizzata quando gli utenti vogliono leggere lo stato di uno stream-stream join. |
I dati restituiti hanno lo schema seguente:
Colonna | Tipo | Descrizione |
---|---|---|
key |
Struttura (ulteriore tipo derivato dalla chiave di stato) | Chiave per un record dell'operatore con stato nel checkpoint di stato. |
value |
Struttura (ulteriore tipo derivato dal valore di stato) | Il valore di un record dell'operatore con stato nel checkpoint di stato. |
partition_id |
Intero | Partizione del checkpoint di stato che contiene il record dell'operatore con stato. |
Leggere i metadati di stato di Streaming strutturato
Importante
È necessario eseguire query di streaming in Databricks Runtime 14.2 o versioni successive per registrare i metadati dello stato. I file di metadati di stato non interrompono la compatibilità con le versioni precedenti. Se si sceglie di eseguire una query di streaming in Databricks Runtime 14.1 o versione successiva, i file di metadati di stato esistenti vengono ignorati e non vengono scritti nuovi file di metadati di stato.
È possibile leggere le informazioni sui metadati di stato per le query structured streaming eseguite in Databricks Runtime 14.2 o versione successiva. Usare la sintassi seguente:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
I dati restituiti hanno lo schema seguente:
Colonna | Tipo | Descrizione |
---|---|---|
operatorId |
Intero | Numero ID intero dell'operatore stateful di streaming. |
operatorName |
Intero | Nome dell'operatore di streaming con stato. |
stateStoreName |
Stringa | Nome dell'archivio di stato dell'operatore. |
numPartitions |
Intero | Numero di partizioni dell'archivio di stato. |
minBatchId |
Lungo | ID batch minimo disponibile per l'interrogazione dello stato. |
maxBatchId |
Lungo | L'ID batch massimo disponibile per l'interrogazione dello stato. |
Nota
I valori dell'ID batch forniti da minBatchId
e maxBatchId
riflettono lo stato al momento della scrittura del checkpoint. I batch precedenti vengono puliti automaticamente con l'esecuzione di micro batch, quindi il valore fornito qui non è garantito che sia ancora disponibile.