共用方式為


在 Azure Databricks 上使用 Delta Lake 變更資料饋送

變更資料流可讓 Azure Databricks 追蹤 Delta 表格版本之間的行級變更。 在 Delta 資料表上啟用時,執行時間會記錄 寫入數據表中所有資料的變更事件 。 這包括數據列數據以及元數據,指出指定的數據列是否已插入、刪除或更新。

重要

變更數據摘要會與數據表歷程記錄一起運作,以提供變更資訊。 因為複製 Delta 數據表會建立個別的歷程記錄,因此複製數據表上的變更數據摘要與原始數據表的記錄不符。

逐步處理變更資料

Databricks 建議使用變更數據饋送結合結構化串流,以逐步處理 Delta 表的變更。 您必須使用 Azure Databricks 的結構化串流來自動追蹤數據表變更數據摘要的版本。

注意

DLT 提供的功能可讓您輕鬆地傳播變更數據,並將結果儲存為SCD(緩時變維度)類型1或類型2資料表。 請參閱 套用變更 API:使用 DLT簡化異動數據擷取。

若要從資料表讀取變更數據摘要,您必須在該數據表上啟用變更數據摘要。 請參閱啟用變更資料饋送

針對數據表設定數據流以讀取變更數據摘要時,將選項readChangeFeedtrue設定為 ,如下列語法範例所示:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

根據預設,當數據流第一次啟動時,數據流會作為INSERT 傳回資料表的最新快照,並在未來隨著變更資料進行更新。

變更資料提交作為 Delta Lake 交易的一部分,並會在新資料提交至資料表時同時變成可用。

您可以選擇性地指定起始版本。 請參閱 我是否應該指定起始版本?

變更數據摘要也支援批次執行,這需要指定起始版本。 請參閱 讀取批次查詢中的變更。

讀取變更數據時也支援速率限制等maxFilesPerTriggermaxBytesPerTriggerexcludeRegex選項。

除了起始快照版本以外的版本,速率限制可以具有原子性。 也就是說,整個提交版本會受到速率限制,或者會傳回整個提交。

我應該指定起始版本嗎?

如果您想要忽略特定版本之前發生的變更,您可以選擇性地指定起始版本。 您可以使用時間戳或 Delta 交易日誌中記錄的版本識別碼來指定版本。

注意

批次讀取需要起始版本,而且許多批次模式可以受益於設定選擇性的結束版本。

當您設定涉及變更數據摘要的結構化串流工作負載時,請務必瞭解如何指定起始版本會影響處理。

許多串流工作負載,尤其是新的數據處理管線,都受益於默認行為。 使用預設設定時,當資料流第一次將資料表中的所有現有記錄記錄為變更資料饋送中的INSERT作業時,便會處理第一個批次。

如果您的目標數據表已包含所有具有適當變更的記錄,請指定起始版本以避免將源數據表狀態當做 INSERT 事件處理。

以下範例語法用於從因檢查點損毀引起的串流失敗中恢復。 在此範例中,假設有下列條件:

  1. 在數據表建立時,源數據表上已啟用變更數據摘要。
  2. 目標下游資料表已處理所有變更,達到並包括第 75 版。
  3. 源數據表的版本歷程記錄適用於版本70和更新版本。

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

在此範例中,您也必須指定新的檢查點位置。

重要

如果您指定起始版本,如果數據表歷程記錄中不再存在起始版本,數據流將無法從新的檢查點啟動。 Delta Lake 會自動清除歷程記錄版本,這表示最終會刪除所有指定的起始版本。

請參閱 是否可以使用變更資料摘要來重播資料表的完整歷史?

讀取批次查詢中的變更

您可以使用批次查詢語法來讀取從特定版本開始的所有變更,或讀取指定版本範圍內的變更。

您可以將版本指定為整數,並以 格式 yyyy-MM-dd[ HH:mm:ss[.SSS]]指定時間戳做為字串。

查詢中包含開始和結束版本。 若要讀取從特定起始版本到最新版數據表的變更,請只指定起始版本。

如果您提供的版本較低或時間戳早於記錄變更事件的版本,也就是啟用變更數據摘要時,就會擲回錯誤,指出變更數據摘要未啟用。

下列語法範例示範如何搭配批次讀取使用開始和結束版本選項:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

注意

預設狀況下,如果使用者傳入的版本或時間戳超過數據表上最後一次提交,就會拋出錯誤 timestampGreaterThanLatestCommit 。 在 Databricks Runtime 11.3 LTS 及其以上版本中,如果使用者將以下配置設置為true,那麼變更數據摘要可以處理超出範圍的版本問題:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

如果您提供的開始版本大於數據表的最後一個認可,或比數據表上最後一次認可還新的開始時間戳,則啟用上述設定時,會傳回空的讀取結果。

如果您提供大於數據表上最後一次認可的結束版本,或比數據表上最後一次認可還新的結束時間戳,則在批次讀取模式中啟用上述設定時,會傳回開始版本與最後一個認可之間的所有變更。

變更數據摘要的架構為何?

當您從表格的變更數據流讀取時,會使用最新表格版本的架構。

注意

大部分的架構變更和演進作業都完全受到支援。 已啟用欄位對應的資料表不支援所有使用案例,因而顯示不同的行為。 請參閱 啟用欄位對應後的資料表變更資料饋送限制

除了 Delta 資料表架構中的數據行之外,變更數據摘要還包含可識別變更事件類型的元數據行:

欄位名稱 類型 價值觀
_change_type String insert、 、 update_preimageupdate_postimagedelete(1)
_commit_version Long 包含變更的 Delta 記錄檔或數據表版本。
_commit_timestamp 時間戳記 提交建立時相關聯的時間戳。

(1)preimage 是更新前的值, postimage 是更新之後的值。

注意

如果架構包含與這些新增數據行名稱相同的數據行,則您無法在數據表上啟用變更數據摘要。 在嘗試啟用變更數據摘要之前,重新命名數據表中的數據行,以解決此衝突。

啟用變更資料流

您只能讀取已啟用資料表的變更資料摘要。 您必須使用下列其中一種方法,明確啟用變更資料摘要選項:

  • 新增資料表:在 命令中delta.enableChangeDataFeed = true設定資料表屬性CREATE TABLE

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 現有資料表:在 命令中delta.enableChangeDataFeed = true設定資料表屬性ALTER TABLE

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 所有新的資料表

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

重要

只會記錄啟用變更數據摘要之後所做的變更。 不會擷取數據表的過去變更。

變更數據記憶體

啟用變更數據傳輸會導致數據表的存儲成本小幅增加。 變更數據記錄會在查詢執行時產生,而且通常小於重寫檔案的大小總計。

Azure Databricks 會在資料表目錄下的 _change_data 資料夾中記錄 UPDATEDELETEMERGE 作業的變更資料。 某些作業,例如僅插入作業和完整分割區刪除,並不會在 _change_data 目錄中產生資料,因為 Azure Databricks 可以直接從交易日誌中有效地計算變更資料擷取。

針對資料夾中資料檔 _change_data 的所有讀取都應該經過支援的 Delta Lake API。

資料夾中的 _change_data 檔案會遵循數據表的保留原則。 命令執行時 VACUUM 會刪除變更資料摘要數據。

我可以使用變更資料提要來重播資料表的整個歷程記錄嗎?

變更資料提要的用意並非做為資料表中所有變更的永久記錄。 變更數據摘要只會記錄啟用之後發生的變更。

變更數據摘要和 Delta Lake 可讓您一律重新建構源數據表的完整快照集,這表示您可以針對已啟用變更數據摘要的數據表啟動新的串流讀取,並擷取該數據表的目前版本,以及之後發生的所有變更。

您必須將變更數據摘要中的記錄視為暫時性的,而且只能供指定的保留時段存取。 Delta 事務歷史記錄會定期移除數據表版本及其對應的變更數據摘要版本。 從事務歷史記錄中移除版本時,您無法再讀取該版本的變更數據摘要。

如果您的使用案例需要維護數據表所有變更的永久歷程記錄,您應該使用累加邏輯,將記錄從變更數據摘要寫入新數據表。 下列程式代碼範例示範如何使用 trigger.AvailableNow,其會利用結構化串流的累加處理,但會以批次工作負載的形式處理可用的數據。 您可以將此工作負載與您的主要處理管線異步排程,以建立變更資料流的備份,用於稽核或完整重播之用。

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

變更已啟用欄位對應之數據表的資料匯入限制

在啟用 Delta 表格的欄位對應時,您可以刪除或重新命名表格中的欄位,而不需重寫現有資料的資料檔案。 啟用了欄位對應後,變更數據流在執行非加法的資料結構變更時會受到限制,例如重新命名或刪除欄位、變更數據類型或修改空值允許性。

重要

  • 您無法透過批次語意讀取在進行非加性架構變更的交易或範圍中的變更資料流。
  • 在 Databricks Runtime 12.2 LTS 和以下的數據表中,已啟用數據行對應且發生非加總架構變更的數據表不支援變更數據摘要上的串流讀取。 請參閱使用資料行對應和模式變更進行串流
  • 在 Databricks Runtime 11.3 LTS 及更早的版本中,如果數據表啟用了列對應並且經過了列重命名或刪除,則無法讀取變更數據源。

在 Databricks Runtime 12.2 LTS 和更新版本中,您可以針對已啟用數據行對應且發生非加總架構變更的數據表,對變更數據摘要執行批次讀取。 讀取作業會使用查詢中指定的數據表結束版本架構,而不是使用最新版數據表的架構。 如果指定的版本範圍跨越非加總架構變更,查詢仍會失敗。