共用方式為


串流和增量擷取

Azure Databricks 使用 Apache Spark 結構化串流來備份與擷取工作負載相關聯的許多產品,包括:

  • 自動載入器
  • COPY INTO
  • DLT 管線
  • Databricks SQL 中的實體化檢視表和串流表

本文討論串流和累加批處理語意之間的一些差異,並提供在 Databricks 中為所需語意設定擷取工作負載的高階概觀。

串流和累加批次擷取之間的差異為何?

可能的引入工作流程組態範圍從近乎實時的處理到不頻繁的增量批次處理。 這兩種模式都使用 Apache Spark 結構化串流來提供增量處理,但具有不同的語意。 為了簡單起見,本文將近乎即時的擷取稱為 串流擷取 ,而較不常的累加式處理則為 累加批次擷取

串流匯入

串流在資料擷取和資料表更新的上下文中,是指 Azure Databricks 使用持續運行的基礎架構,以微批處理方式將記錄從來源擷取到接收端的近乎即時數據處理。 除非發生停止擷取失敗,否則串流工作負載會持續從設定的數據源擷取更新。

增量批次匯入

增量批次匯入是指在短暫的任務中從數據源處理所有新記錄的模式。 累加批次擷取通常會根據排程進行,但也可以根據檔案抵達手動觸發。

累加批次 擷取與 批次 擷取不同,因為它會自動偵測數據源中的新記錄,並忽略已擷取的記錄。

使用作業進行資料導入

Databricks 作業可讓您編排包含筆記本、程式庫、DLT 管線和 Databricks SQL 查詢的工作流程及排定任務。

注意

您可以使用所有 Azure Databricks 計算類型和工作類型來設定累加批次擷取。 只有在傳統作業計算和 DLT 的生產環境中才支援串流擷取。

作業有兩種主要作業模式:

  • 連續作業 會在發生失敗時自動重試。 此模式適用於串流擷取。
  • 觸發的作業 會在觸發時執行工作。 觸發程式包括:
    • 依指定排程執行作業的時間型觸發程式。
    • 檔案位於指定位置時執行作業的檔案型觸發程式。
    • 其他觸發程式,例如 REST API 呼叫、執行 Azure Databricks CLI 命令,或按兩下工作區 UI 中的 [ 立即 執行] 按鈕。

針對累加批次工作負載,使用 AvailableNow 觸發模式設定您的作業,如下所示:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

對於串流工作負載,預設觸發間隔為 processingTime ="500ms"。 下列範例示範如何每隔 5 秒處理一個微批次:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

重要

無伺服器任務不支援 Scala、連續模式或基於時間的觸發間隔,用於結構化串流。 如果您需要近乎即時的資料輸入語意,請使用傳統工作。

使用 DLT 資料導入

類似於任務,DLT 管線可以在觸發或連續模式中執行。 針對具有串流數據表的近乎即時串流語意,請使用連續模式。

使用串流表格來配置雲端物件儲存、Apache Kafka、Amazon Kinesis、Google Pub/Sub 或 Apache Pulsar 的串流或增量批次擷取。

Lakeflow Connect 會使用 DLT 設定從連接的系統擷取管線。 請參閱 Lakeflow Connect

具體化檢視保證相當於批次工作負載的作業語意,但可以優化許多作業,以累加方式計算結果。 請參閱具體化檢視的增量更新

使用 Databricks SQL 匯入

您可以使用串流數據表,從雲端物件記憶體、Apache Kafka、Amazon Kinesis、Google Pub/Sub 或 Apache Pulsar 設定累加批次擷取。

您可以使用具現化視圖來設定 Delta 來源的累加批處理。 如需具體化檢視,請參閱 累加式重新整理。

COPY INTO 提供熟悉的 SQL 語法,用於雲端物件記憶體中數據檔的累加批處理。 COPY INTO 行為類似於串流數據表針對雲端物件記憶體所支援的模式,但並非所有預設設定都相當於所有支援的檔案格式。