讀取結構化串流狀態資訊
在以專用或無隔離存取模式設定的計算上,在 Databricks Runtime 14.3 LTS 和更新版本中,您可以使用 DataFrame 作業或 SQL 數據表值函式來查詢結構化串流狀態數據和元數據。 您可使用這些函式來觀察結構化串流具狀態的查詢的狀態資訊,這對於監視和偵錯很有用。
您必須具有串流查詢檢查點路徑的讀取權限,才能查詢狀態資料或中繼資料。 本文所述的函式提供狀態資料和中繼資料的唯讀存取。 您僅能使用批次讀取語意來查詢狀態資訊。
注意
您無法查詢 DLT 管線、串流資料表或具體化檢視的狀態資訊。 您無法使用無伺服器計算或以標準存取模式設定的計算來查詢狀態資訊。
讀取結構化串流狀態存放區
您可讀取任何支援的 Databricks Runtime 中所執行結構化串流查詢的狀態存放區資訊。 使用下列語法:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
以下支援選用的組態:
選項 | 類型 | 預設值 | 說明 |
---|---|---|---|
batchId |
長 | 最新批次識別碼 | 表示要從中讀取的目標批次。 指定此選項來查詢先前查詢狀態的狀態資訊。 必須提交批次,但尚未進行清理。 |
operatorId |
長 | 0 | 表示要從中讀取的目標運算子。 當查詢使用多個具狀態運算子時,會使用此選項。 |
storeName |
字串 | 預設 | 表示要讀取的目標狀態存放區名稱。 當有狀態的運算符使用多個狀態存儲實例時,會使用此選項。 必須為串流連接指定 storeName 或 joinSide ,但不能同時指定兩者。 |
joinSide |
字串(「左」或「右」) | 表示要從目標端進行讀取。 當使用者想要從數據流聯結讀取狀態時,會使用此選項。 |
傳回的數據具有下列架構:
欄 | 類型 | 說明 |
---|---|---|
key |
結構 (衍生自狀態索引鍵的進一步類型) | 狀態檢查點中具狀態運算子記錄的索引鍵。 |
value |
結構 (衍生自狀態值的進一步類型) | 狀態檢查點中狀態運算子記錄的值。 |
partition_id |
整數 | 包含狀態操作記錄的狀態檢查點分區。 |
讀取結構化串流狀態中繼資料
重要
您必須在 Databricks Runtime 14.2 或更新版本上執行串流查詢,才能記錄狀態中繼資料。 狀態中繼資料檔案不會中斷回溯相容性。 如果您選擇在 Databricks Runtime 14.1 或更新版本上執行串流查詢,則會略過現有的狀態中繼資料檔案,也不會寫入任何新的狀態中繼資料檔案。
您可以讀取 Databricks Runtime 14.2 或更新版本上執行的結構化串流查詢狀態中繼資料資訊。 使用下列語法:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
傳回的數據具有下列架構:
列/欄 | 類型 | 說明 |
---|---|---|
operatorId |
整數 | 具狀態串流運算子的整數識別碼。 |
operatorName |
整數 | 具狀態串流運算子的名稱。 |
stateStoreName |
字串 | 運算子狀態存放區的名稱。 |
numPartitions |
整數 | 狀態存放區的分割區數目。 |
minBatchId |
長 | 可供查詢狀態的最小批次識別碼。 |
maxBatchId |
長的 | 可供查詢狀態的最大批次識別碼。 |
注意
minBatchId
和 maxBatchId
提供的批次標識碼值會反映檢查點寫入時的狀態。 舊批次會隨著微批次執行自動清理,因此此處提供的值不保證仍可供使用。