Condividi tramite


Considerazioni sulla produzione per Structured Streaming

Questo articolo contiene raccomandazioni per la pianificazione di carichi di lavoro Structured Streaming usando processi in Azure Databricks.

Databricks consiglia di eseguire sempre le operazioni seguenti:

  • Rimuovere il codice non necessario dai notebook che potrebbero restituire risultati, ad esempio display e count.
  • Non eseguire carichi di lavoro Structured Streaming usando il calcolo multiuso. Pianificare sempre i flussi come lavori utilizzando il calcolo job.
  • Programmare i processi usando la modalità Continuous.
  • Non abilitare la scalabilità automatica per il calcolo per i processi Structured Streaming.

Alcuni carichi di lavoro traggono vantaggio da quanto segue:

Azure Databricks ha introdotto DLT per ridurre le complessità della gestione dell'infrastruttura di produzione per i carichi di lavoro Structured Streaming. Databricks consiglia di usare DLT per le nuove pipeline di streaming strutturato. Vedere Che cos'è DLT?.

Nota

La scalabilità automatica del calcolo ha dei limiti quando si riduce la dimensione del cluster per carichi di lavoro di streaming strutturati. Databricks consiglia di usare DLT con scalabilità automatica avanzata per i carichi di lavoro di streaming. Vedere Ottimizzare l'utilizzo del cluster delle pipeline DLT con scalabilità automatica avanzata.

Progettare carichi di lavoro di streaming per aspettarsi un errore

Databricks consiglia di configurare sempre i processi di streaming per il riavvio automatico in caso di errore. Alcune funzionalità, inclusa l'evoluzione dello schema, presuppongono che i carichi di lavoro Structured Streaming siano configurati per riprovare automaticamente. Vedere Configurare processi Structured Streaming per riavviare le query di streaming in caso di errore.

Alcune operazioni, ad esempio foreachBatch, forniscono garanzie di tipo almeno una volta anziché esattamente una volta. Per queste operazioni, è necessario fare in modo che la pipeline di elaborazione sia idempotente. Vedere Usare foreachBatch per scrivere sink di dati arbitrari.

Nota

Quando una query viene riavviata, viene processato il micro-batch pianificato durante esecuzione precedente. Se il processo non è riuscito a causa di un errore di memoria insufficiente o se è stato annullato manualmente un processo a causa di un micro batch sovradimensionato, potrebbe essere necessario aumentare le prestazioni del calcolo per elaborare correttamente il micro batch.

Se si modificano le configurazioni tra le esecuzioni, queste configurazioni si applicano al primo nuovo batch pianificato. Consulta Ripristinare dopo le modifiche in una query di Structured Streaming.

Quando viene ritentata un'attività?

È possibile pianificare più attività come parte di un processo di Azure Databricks. Quando si configura un processo usando il trigger continuo, non è possibile impostare dipendenze tra le attività.

È possibile scegliere di pianificare più flussi in un singolo processo usando uno degli approcci seguenti:

  • Attività multiple: definire un processo con attività multiple che eseguono carichi di lavoro di streaming usando il trigger continuo.
  • Query multiple: definire query di streaming multiple nel codice sorgente per una singola attività.

È anche possibile combinare queste strategie. Nella tabella seguente vengono confrontati questi approcci.

Attività multiple Query multiple
Come viene condiviso il calcolo? Databricks consiglia di distribuire processi di calcolo con dimensioni appropriate per ogni attività di streaming. Facoltativamente, è possibile condividere il calcolo tra le attività. Tutte le query condividono lo stesso calcolo. È possibile assegnare facoltativamente query ai pool dei processi di pianificazione.
Come vengono gestiti i tentativi? Tutte le attività devono fallire prima che il lavoro venga ripetuto. L'attività ritenta se una query non riesce.

Configurare i processi di Structured Streaming per riavviare le query di streaming in caso di errore

Databricks consiglia di configurare tutti i carichi di lavoro di streaming usando il trigger continuo. Vedere Eseguire processi in modo continuo.

Il trigger continuo fornisce il comportamento seguente per impostazione predefinita:

  • Impedisce più di un'esecuzione simultanea dell'attività.
  • Avvia una nuova esecuzione quando un'esecuzione precedente ha esito negativo.
  • Usa il backoff esponenziale per le ripetizioni.

Databricks consiglia di usare sempre il calcolo processi anziché il calcolo multiuso per la pianificazione dei flussi di lavoro. In caso di errore del processo e di successivo tentativo, vengono distribuite nuove risorse di calcolo.

Nota

Non è necessario usare streamingQuery.awaitTermination() o spark.streams.awaitAnyTermination(). I processi impediscono automaticamente il completamento di un'esecuzione quando una query di streaming è attiva.

Utilizzare pool di pianificazione per query di streaming multiple

È possibile configurare i pool di pianificazione per assegnare capacità di calcolo alle query durante l'esecuzione di query di streaming multiple dallo stesso codice sorgente.

Per impostazione predefinita, tutte le query avviate in un notebook vengono eseguite nello stesso pool di pianificazione equo. I job Apache Spark generati dai trigger di tutte le query di streaming in un notebook sono eseguiti uno dopo l'altro secondo l'ordine FIFO (First In, First Out). Ciò può causare ritardi non necessari nelle query, perché non condividono in modo efficiente le risorse del cluster.

I pool del pianificatore consentono di dichiarare quali query di Structured Streaming condividono le risorse di calcolo.

Nell'esempio seguente, query1 viene assegnato a un pool dedicato, mentre query2 e query3 condividono un pool di scheduling.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Nota

La configurazione della proprietà locale deve trovarsi nella stessa cella del notebook in cui si avvia la query di streaming.

Per altri dettagli, vedere la documentazione di Apache Fair Scheduler.