共用方式為


結構化串流的生產考量

本文對於如何在 Azure Databricks 上以工作方式排程處理結構化串流工作負載提供建議。

Databricks 建議一律執行下列動作:

  • 從會傳回結果的筆記本中移除不必要的程式碼,例如 displaycount
  • 請勿使用所有用途計算來執行結構化串流工作負載。 始終使用 工作計算 環境將串流設定成工作並排程。
  • 使用 Continuous 模式排程工作。
  • 請勿為結構化串流作業啟用計算資源的自動調整功能。

某些工作負載受益於下列各項:

Azure Databricks 引進 DLT,以減少管理結構化串流工作負載生產基礎結構的複雜性。 Databricks 建議針對新的結構化串流管線使用 DLT。 請參閱 什麼是 DLT?

注意

在縮小結構化串流工作負載的叢集大小時,計算自動調整有其限制。 Databricks 建議針對串流工作負載使用增強自動調整功能的 DLT。 請參閱 透過強化自動縮放優化 DLT 管線的叢集效能。

設計串流工作負載時,需考慮可能失敗的情況

Databricks 建議一律設定串流工作,以在失敗時自動重新啟動。 某些功能,包括架構演進,假設結構化串流工作負載已設定為自動重試。 請參閱 <設定結構化串流工作,以在失敗時重新啟動串流查詢>。

某些運算,例如 foreachBatch,保證至少執行一次,而非精確執行一次。 針對這些運算,您應該確保處理流水線具有冪等性。 請參閱使用 foreachBatch 寫入任意資料接收器

注意

當查詢重新啟動時,會處理在先前執行中安排的微批次。 如果您的工作因記憶體不足錯誤而失敗,或因為微批次過大而手動取消工作,您可能需要提升計算資源以成功處理該微批次。

如果您在執行之間變更設定,這些設定將會套用至第一個新批次計畫。 請參閱 <在結構化串流查詢變更之後的復原>。

工作何時重試?

您可以將多個任務排程為 Azure Databricks 作業的一部分。 當您使用連續觸發器設定作業時,無法設定工作之間的相依性。

您可以選擇使用下列其中一種方法,在單一工作中安排多個資料流:

  • 多個任務:定義一個使用連續觸發器執行串流工作負載的多任務作業。
  • 多個查詢:在單一工作的原始程式碼中定義多個串流查詢。

您也可以合併這些策略。 下表比較這些方法。

多項工作 多個查詢
如何共用計算? Databricks 建議依據每個串流任務適當地分配計算資源。 您可以選用跨工作共用計算。 所有查詢都會共用相同的計算。 您可以選擇性地將查詢指派給 排程者集區
重試如何處理? 所有任務都必須在工作重試之前失敗。 如果有任何查詢失敗,工作會重試。

設定結構化串流工作以在失敗時重新啟動串流查詢

Databricks 建議使用連續觸發設定所有串流工作負載。 請參閱持續執行作業

連續觸發預設會提供下列行為:

  • 防止多個並行執行工作。
  • 在上一次執行失敗時開始新的執行。
  • 使用指數退避進行重試。

Databricks 建議在排程工作流程時一律使用作業計算,而不是通用計算。 在工作失敗並重試時,會部署新的計算資源。

注意

您不需要使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 當串流查詢處於作用中狀態時,工作會自動防止作業完成。

針對多個串流查詢使用排程器集區

您可以從相同的原始程式碼執行多個串流查詢時,設定排程集區將計算容量指派給查詢。

預設情況下,筆記本中開始的所有查詢都會在相同的公平排程池中執行。 來自筆記本中所有串流查詢的觸發所產生的 Apache Spark 工作會按照「先進先出」(FIFO)順序逐個執行。 這可能會導致查詢中不必要的延遲,因為它們無法有效率地共用叢集資源。

排程者集區允許您宣告哪些結構化串流查詢共用計算資源。

下列範例將 query1 指派至專用集區,而 query2query3 則共用排程者集區。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

注意

本地屬性設定必須位於您啟動串流查詢的相同筆記本單元格中。

如需其他詳細資料,請參閱Apache 公平調度器文件