Condividi tramite


Leggere le informazioni sullo stato di Structured Streaming

In Databricks Runtime 14.3 LTS e versioni successive nel calcolo configurato con modalità di accesso dedicato o senza isolamento, è possibile usare operazioni dataframe o funzioni con valori di tabella SQL per eseguire query sui dati e sui metadati dello stato di Structured Streaming. È possibile usare queste funzioni per osservare le informazioni sullo stato per le query con stato Structured Streaming, che possono essere utili per il monitoraggio e il debug.

È necessario avere accesso in lettura al percorso del checkpoint per una query in streaming al fine di eseguire query sui dati di stato o sui metadati. Le funzioni descritte in questo articolo forniscono l'accesso in sola lettura ai dati e ai metadati di stato. È possibile usare solo la semantica di lettura batch per eseguire query sulle informazioni sullo stato.

Nota

Non è possibile eseguire query sulle informazioni sullo stato per pipeline DLT, tabelle di streaming o viste materializzate. Non è possibile eseguire query sulle informazioni di stato utilizzando l'elaborazione serverless o l'elaborazione configurata con la modalità di accesso standard.

Leggere l'archivio di stato di Structured Streaming

È possibile leggere le informazioni sull'archivio di stato per le query Structured streaming eseguite in qualsiasi runtime di Databricks supportato. Usare la sintassi seguente:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Sono supportate le configurazioni seguenti:

Opzione Tipo Valore predefinito Descrizione
batchId Lungo ID del batch più recente Rappresenta il batch di destinazione da cui leggere. Specificare questa opzione per eseguire una query sulle informazioni relative a uno stato precedente della query. Il batch deve essere sottoposto a commit ma non ancora pulito.
operatorId Lungo 0 Rappresenta l'operatore di destinazione dal quale leggere. Questa opzione viene usata quando la query usa più operatori con stato.
storeName Stringa "default" Rappresenta il nome dell'archivio di stato di destinazione da cui leggere. Questa opzione viene usata quando l'operatore con stato utilizza più istanze della memoria di stato. È necessario specificare storeName o joinSide per un join stream-steam, ma non entrambi.
joinSide Stringa ("sinistra" o "destra") Rappresenta il lato di destinazione da cui leggere. Questa opzione viene utilizzata quando gli utenti vogliono leggere lo stato di uno stream-stream join.

I dati restituiti hanno lo schema seguente:

Colonna Tipo Descrizione
key Struttura (ulteriore tipo derivato dalla chiave di stato) Chiave per un record dell'operatore con stato nel checkpoint di stato.
value Struttura (ulteriore tipo derivato dal valore di stato) Il valore di un record dell'operatore con stato nel checkpoint di stato.
partition_id Intero Partizione del checkpoint di stato che contiene il record dell'operatore con stato.

Leggere i metadati di stato di Streaming strutturato

Importante

È necessario eseguire query di streaming in Databricks Runtime 14.2 o versioni successive per registrare i metadati dello stato. I file di metadati di stato non interrompono la compatibilità con le versioni precedenti. Se si sceglie di eseguire una query di streaming in Databricks Runtime 14.1 o versione successiva, i file di metadati di stato esistenti vengono ignorati e non vengono scritti nuovi file di metadati di stato.

È possibile leggere le informazioni sui metadati di stato per le query structured streaming eseguite in Databricks Runtime 14.2 o versione successiva. Usare la sintassi seguente:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

I dati restituiti hanno lo schema seguente:

Colonna Tipo Descrizione
operatorId Intero Numero ID intero dell'operatore stateful di streaming.
operatorName Intero Nome dell'operatore di streaming con stato.
stateStoreName Stringa Nome dell'archivio di stato dell'operatore.
numPartitions Intero Numero di partizioni dell'archivio di stato.
minBatchId Lungo ID batch minimo disponibile per l'interrogazione dello stato.
maxBatchId Lungo L'ID batch massimo disponibile per l'interrogazione dello stato.

Nota

I valori dell'ID batch forniti da minBatchId e maxBatchId riflettono lo stato al momento della scrittura del checkpoint. I batch precedenti vengono puliti automaticamente con l'esecuzione di micro batch, quindi il valore fornito qui non è garantito che sia ancora disponibile.