Condividi tramite


Le API APPLY CHANGES: semplificano la Change Data Capture con DLT

DLT semplifica la cattura dei dati modificati (CDC) con le API APPLY CHANGES e APPLY CHANGES FROM SNAPSHOT. L'interfaccia usata dipende dall'origine dei dati delle modifiche:

  • Usare APPLY CHANGES per gestire le modifiche provenienti da un feed di dati delle modifiche (CDF).
  • Usare APPLY CHANGES FROM SNAPSHOT (anteprima pubblica) per elaborare le modifiche negli snapshot del database.

In precedenza, l'istruzione MERGE INTO veniva comunemente usata per l'elaborazione dei record CDC in Azure Databricks. Tuttavia, MERGE INTO può produrre risultati non corretti a causa di record fuori sequenza o può richiedere una logica complessa per riordinare i record.

L'API APPLY CHANGES è supportata nelle interfacce DLT SQL e Python. L'API APPLY CHANGES FROM SNAPSHOT è supportata nell'interfaccia Python DLT.

Sia APPLY CHANGES che APPLY CHANGES FROM SNAPSHOT supportano l'aggiornamento delle tabelle utilizzando SCD di tipo 1 e di tipo 2.

  • Usare scD tipo 1 per aggiornare direttamente i record. La cronologia non viene mantenuta per le registrazioni aggiornate.
  • Usare SCD tipo 2 per conservare uno storico dei record, sia per tutti gli aggiornamenti sia per quelli di un insieme specificato di colonne.

Per la sintassi e altri riferimenti, vedere:

Nota

Questo articolo descrive come aggiornare le tabelle nella pipeline DLT in base alle modifiche apportate ai dati di origine. Per informazioni su come registrare ed eseguire query sulle modifiche a livello di riga per le tabelle Delta, vedere Usare il feed di dati delle modifiche Delta Lake in Azure Databricks.

Requisiti

Per usare le API CDC, è necessario configurare la pipeline per usare pipeline DLT serverless o DLT Pro o Advancededizioni.

Come viene implementato CDC con l'API APPLY CHANGES?

Gestendo automaticamente i record out-of-sequence, l'API APPLY CHANGES in DLT garantisce l'elaborazione corretta dei record CDC e rimuove la necessità di sviluppare logica complessa per la gestione dei record fuori sequenza. È necessario specificare una colonna nei dati di origine in cui eseguire la sequenza dei record, che DLT interpreta come rappresentazione monotonicamente crescente dell'ordinamento corretto dei dati di origine. DLT gestisce automaticamente i dati non in ordine. Per le modifiche del tipo 2 SCD, DLT propaga i valori di sequenza appropriati alle colonne __START_AT e __END_AT della tabella di destinazione. Deve essere presente un aggiornamento distinto per chiave a ogni valore di sequenziazione e i valori di sequenziazione NULL non sono supportati.

Per eseguire l'elaborazione CDC con APPLY CHANGES, creare prima una tabella di streaming e quindi usare l'istruzione APPLY CHANGES INTO in SQL o la funzione apply_changes() in Python per specificare l'origine, le chiavi e la sequenziazione per il feed di modifiche. Per creare la tabella di streaming di destinazione, usare l'istruzione CREATE OR REFRESH STREAMING TABLE in SQL o la funzione create_streaming_table() in Python. Consultare gli esempi di elaborazione SCD di tipo 1 e tipo 2.

Per informazioni dettagliate sulla sintassi, vedere il riferimento SQL o il riferimento Python .

Come viene implementato CDC con l'API APPLY CHANGES FROM SNAPSHOT?

Importante

L'API APPLY CHANGES FROM SNAPSHOT si trova in anteprima pubblica.

APPLY CHANGES FROM SNAPSHOT è un'API dichiarativa che determina in modo efficiente le modifiche nei dati di origine confrontando una serie di snapshot in ordine e quindi esegue l'elaborazione necessaria per l'elaborazione CDC dei record negli snapshot. APPLY CHANGES FROM SNAPSHOT è supportato solo dall'interfaccia Python DLT.

APPLY CHANGES FROM SNAPSHOT supporta l'inserimento di snapshot da più tipi di origine:

  • Usare l'inserimento periodico di snapshot per inserire snapshot da una tabella o una vista esistente. APPLY CHANGES FROM SNAPSHOT dispone di un'interfaccia semplice e semplificata per supportare periodicamente l'inserimento di snapshot da un oggetto di database esistente. Viene inserito un nuovo snapshot con ogni aggiornamento della pipeline e il tempo di inserimento viene usato come versione dello snapshot. Quando una pipeline viene eseguita in modalità continua, più snapshot vengono elaborati con ogni aggiornamento della pipeline durante un periodo determinato dall'impostazione dell'intervallo di trigger per il flusso che include l'elaborazione di APPLY CHANGES FROM SNAPSHOT.
  • Usare l'inserimento di snapshot cronologici per elaborare i file contenenti snapshot del database, ad esempio gli snapshot generati da un database Oracle o MySQL o da un data warehouse.

Per eseguire l'elaborazione CDC da qualsiasi tipo di origine con APPLY CHANGES FROM SNAPSHOT, si crea prima una tabella di streaming e si usa quindi la funzione apply_changes_from_snapshot() in Python per definire lo snapshot, le chiavi e altri argomenti necessari per implementare l'elaborazione. Vedi gli esempi di inserimento di snapshot periodici e di inserimento di snapshot cronologici.

Gli snapshot passati all'API devono essere in ordine crescente in base alla versione. Se DLT rileva uno snapshot non ordinato, viene generato un errore.

Per informazioni dettagliate sulla sintassi, consultare il riferimento DLT Python.

Limitazioni

La colonna utilizzata per la sequenziazione deve essere un tipo di dati ordinabile.

esempio : elaborazione scD di tipo 1 e scD tipo 2 con dati di origine CDF

Le sezioni seguenti forniscono esempi di query DLT SCD di tipo 1 e tipo 2 che aggiornano le tabelle di destinazione in base agli eventi di origine da un feed di dati delle modifiche che:

  1. Crea nuovi record utente.
  2. Elimina un record utente.
  3. Aggiorna i record utente. Nell'esempio SCD di tipo 1, le ultime operazioni UPDATE arrivano in ritardo e vengono eliminate dalla tabella di destinazione, dimostrando la gestione degli eventi non ordinati.

Gli esempi seguenti presuppongono familiarità con la configurazione e l'aggiornamento delle pipeline DLT. Consulta Esercitazione: Esegui la tua prima pipeline DLT.

Per eseguire questi esempi, è necessario iniziare creando un set di dati di esempio. Vedi Generare dati di test.

Di seguito sono riportati i record di input per questi esempi:

userId nome città operazione numeroSequenza
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 nullo nullo CANCELLARE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Se si rimuove il commento dalla riga finale nei dati di esempio, verrà inserito il record seguente che specifica dove devono essere troncati i record:

userId nome città operazione numeroSequenza
nullo nullo nullo TRONCA 3

Nota

Tutti gli esempi seguenti includono opzioni per specificare sia DELETE che TRUNCATE operazioni, ma ognuna è facoltativa.

Aggiornamenti del tipo 1 del processo

Nell'esempio seguente viene illustrata l'elaborazione degli aggiornamenti del tipo 1 di scD:

Pitone

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Dopo aver eseguito l'esempio scD di tipo 1, la tabella di destinazione contiene i record seguenti:

userId nome città
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

Dopo aver eseguito l'esempio SCD di tipo 1 con un record aggiuntivo TRUNCATE, i record 124 e 126 vengono troncati a causa dell'operazione TRUNCATE eseguita su sequenceNum=3, e la tabella di destinazione contiene il seguente record:

userId nome città
125 Mercedes Guadalajara

Aggiornamenti del processo SCD tipo 2 di

Nell'esempio seguente viene illustrata l'elaborazione degli aggiornamenti del tipo 2 di scD:

Pitone

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Dopo aver eseguito l'esempio scD di tipo 2, la tabella di destinazione contiene i record seguenti:After running the SCD type 2 example, the target table contains the following records:

userId nome città __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 nullo
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 nullo
126 Giglio Cancun 2 nullo

È anche possibile specificare un sottoinsieme di colonne di output da tracciare per la cronologia nella tabella di destinazione. Le modifiche apportate ad altre colonne vengono aggiornate sul posto anziché generare nuovi record di cronologia. L'esempio seguente illustra l'esclusione della colonna city dal rilevamento:

L'esempio seguente dimostra come utilizzare la cronologia delle modifiche con il tipo SCD 2.

Pitone

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Dopo aver eseguito questo esempio senza il record aggiuntivo TRUNCATE, la tabella di destinazione contiene i record seguenti:

userId nome città __INIZIO_A __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 nullo
125 Mercedes Guadalajara 2 nullo
126 Giglio Cancun 2 nullo

Generare dati di test

Il codice seguente viene fornito per generare un set di dati di esempio da usare nelle query di esempio presenti in questa esercitazione. Supponendo di avere le credenziali appropriate per creare un nuovo schema e creare una nuova tabella, è possibile eseguire queste istruzioni con un notebook o Databricks SQL. Il codice seguente non è destinato a essere eseguito come parte di una pipeline DLT:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

: Esempio: elaborazione periodica di snapshot

Nell'esempio seguente viene illustrata l'elaborazione scD di tipo 2 che inserisce snapshot di una tabella archiviata in mycatalog.myschema.mytable. I risultati dell'elaborazione vengono scritti in una tabella denominata target.

mycatalog.myschema.mytable registrazioni al timestamp 2024-01-01 00:00:00

Chiave Valore
1 a1
2 a2

mycatalog.myschema.mytable registrazioni al timestamp 2024-01-01 12:00:00

Chiave Valore
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Dopo l'elaborazione degli snapshot, la tabella di destinazione contiene i record seguenti:

Chiave Valore __START_AT __END_AT
1 a1 01/01/2024 00:00 2024-01-01 12:00:00
2 a2 01-01-2024 00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 nullo
3 a3 2024-01-01 12:00:00 nullo

esempio: elaborazione di snapshot cronologici

L'esempio seguente illustra l'elaborazione del tipo SCD 2 che aggiorna una tabella di destinazione in base agli eventi di origine di due snapshot archiviati in un sistema di archiviazione cloud:

Snapshot in timestamp, archiviato in /<PATH>/filename1.csv

Chiave Colonna di Tracciamento ColonnaNonTracciabile
1 a1 b1
2 a2 b2
4 a4 b4

Snapshot in timestamp + 5, archiviato in /<PATH>/filename2.csv

Chiave Colonna di Tracciamento ColonnaNonTracciata
2 a2_new b2
3 a3 b3
4 a4 b4_new

L'esempio di codice seguente illustra l'elaborazione degli aggiornamenti di tipo 2 SCD utilizzando questi snapshot.

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Dopo l'elaborazione degli snapshot, la tabella di destinazione contiene i record seguenti:

Chiave Colonna di Tracciamento ColonnaNonTracciamento __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 nullo
3 a3 b3 2 nullo
4 a4 b4_new 1 nullo

Aggiungere, modificare o eliminare dati in una tabella di streaming di destinazione

Se la pipeline pubblica tabelle in Unity Catalog, è possibile usare istruzioni DML (Data Manipulation Language), incluse le istruzioni insert, update, delete e merge, per modificare le tabelle di streaming di destinazione create da istruzioni APPLY CHANGES INTO.

Nota

  • Le istruzioni DML che modificano lo schema di tabella di una tabella di streaming non sono supportate. Assicurarsi che le istruzioni DML non tentino di evolvere lo schema della tabella.
  • Le istruzioni DML che aggiornano una tabella di streaming possono essere eseguite solo in un cluster unity catalog condiviso o in un sql warehouse usando Databricks Runtime 13.3 LTS e versioni successive.
  • Poiché lo streaming richiede origini dati solo accodabili, se l'elaborazione necessita dello streaming da una tabella di origine con modifiche (ad esempio tramite istruzioni DML), impostare il flag skipChangeCommits durante la lettura della tabella di streaming di origine. Quando viene impostata skipChangeCommits, le transazioni che eliminano o modificano i record nella tabella di origine vengono ignorate. Se l'elaborazione non richiede una tabella di streaming, è possibile utilizzare una vista materializzata (che non ha la restrizione di solo accodamento) come tabella di destinazione.

Poiché DLT usa una colonna SEQUENCE BY specificata e propaga i valori di sequenziazione appropriati alle colonne __START_AT e __END_AT della tabella di destinazione (per il tipo SCD 2), è necessario assicurarsi che le istruzioni DML usino valori validi per queste colonne per mantenere l'ordinamento corretto dei record. Vedere Come viene implementato CDC con l'API APPLY CHANGES?.

Per altre informazioni sull'uso di istruzioni DML con tabelle di streaming, vedere Aggiungere, modificare o eliminare dati in una tabella di streaming.

Nell'esempio seguente viene inserito un record attivo con una sequenza iniziale pari a 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Leggere un feed di dati delle modifiche da una tabella di destinazione APPLY CHANGES

In Databricks Runtime 15.2 e versioni successive è possibile leggere un feed di dati delle modifiche da una tabella di streaming che è la destinazione di APPLY CHANGES o APPLY CHANGES FROM SNAPSHOT query nello stesso modo in cui si legge un feed di dati delle modifiche da altre tabelle Delta. Per leggere il feed di dati delle modifiche da una tabella di streaming di destinazione, è necessario quanto segue:

  • La tabella di streaming di destinazione deve essere pubblicata in Unity Catalog. Consulta Usa il Catalogo Unity con le pipeline DLT.
  • Per leggere il feed di dati delle modifiche dalla tabella di streaming di destinazione, è necessario usare Databricks Runtime 15.2 o versione successiva. Per leggere il feed di dati delle modifiche in un'altra pipeline DLT, la pipeline deve essere configurata per l'uso di Databricks Runtime 15.2 o versione successiva.

Puoi leggere il feed di dati delle modifiche da una tabella di streaming di destinazione, creata in una pipeline DLT, nello stesso modo in cui leggi un feed di dati delle modifiche da altre tabelle Delta. Per altre informazioni sull'uso della funzionalità del feed di dati delle modifiche Delta, inclusi esempi in Python e SQL, vedere Usare il feed di dati delle modifiche Delta Lake in Azure Databricks.

Nota

Il record del feed di dati delle modifiche include metadati che identificano il tipo di evento di modifica. Quando un record viene aggiornato in una tabella, i metadati, in genere, per i record di modifica associati includono valori _change_type impostati su update_preimage, e eventi update_postimage.

Tuttavia, i valori _change_type sono diversi se vengono eseguiti aggiornamenti alla tabella di streaming di destinazione che includono la modifica dei valori della chiave primaria. Quando le modifiche includono aggiornamenti alle chiavi primarie, i campi dei metadati _change_type sono impostati per gli eventi insert e delete. Le modifiche apportate alle chiavi primarie possono verificarsi quando vengono apportati aggiornamenti manuali a uno dei campi chiave con un'istruzione UPDATE o MERGE oppure, per le tabelle di tipo 2, quando il campo __start_at cambia in modo da riflettere un valore della sequenza iniziale precedente.

La query APPLY CHANGES determina i valori della chiave primaria, che differiscono per l'elaborazione di tipo SCD 1 e SCD 2.

  • Per l'elaborazione SCD tipo 1 e l'interfaccia Python DLT, la chiave primaria è il valore del parametro keys nella funzione apply_changes(). Per l'interfaccia SQL DLT la chiave primaria è la colonna definita dalla clausola KEYS nell'istruzione APPLY CHANGES INTO.
  • Per il tipo 2, la chiave primaria è il parametro keys o la clausola KEYS più il valore restituito dall'operazione di coalesce(__START_AT, __END_AT), dove __START_AT e __END_AT sono le colonne corrispondenti della tabella di streaming di destinazione.

Ottenere dati sui record elaborati da una query DLT CDC

Nota

Le metriche seguenti vengono acquisite solo dalle query APPLY CHANGES e non dalle query APPLY CHANGES FROM SNAPSHOT.

Le metriche seguenti vengono acquisite dalle query APPLY CHANGES:

  • num_upserted_rows: numero di righe di output che vengono inserite nel set di dati durante un aggiornamento.
  • num_deleted_rows: numero di righe di output esistenti eliminate dal set di dati durante un aggiornamento.

La metrica num_output_rows, l'output per i flussi non-CDC, non viene acquisita per le query apply changes.

Quali oggetti dati vengono usati per l'elaborazione DLT CDC?

Nota

  • Queste strutture di dati si applicano solo all'elaborazione APPLY CHANGES, non all'elaborazione APPLY CHANGES FROM SNAPSHOT.
  • Queste strutture di dati si applicano solo quando la tabella di destinazione viene pubblicata nel metastore Hive. Se una pipeline pubblica in Unity Catalog, le tabelle di supporto interne non sono accessibili agli utenti.

Quando si dichiara la tabella di destinazione nel metastore Hive, vengono create due strutture di dati:

  • Visualizzazione che usa il nome assegnato alla tabella di destinazione.
  • Tabella di backup interna utilizzata da DLT per gestire l'elaborazione CDC. Questa tabella viene denominata aggiungendo __apply_changes_storage_ all'inizio del nome della tabella di destinazione.

Ad esempio, se si dichiara una tabella di destinazione denominata dlt_cdc_target, verrà visualizzata una vista denominata dlt_cdc_target e una tabella denominata __apply_changes_storage_dlt_cdc_target nel metastore. La creazione di una vista consente a DLT di filtrare le informazioni aggiuntive (ad esempio, tombstones e versioni) necessarie per gestire dati fuori ordine. Per visualizzare i dati elaborati, eseguire una query sulla vista di destinazione. Poiché lo schema della tabella __apply_changes_storage_ potrebbe cambiare per supportare funzionalità o miglioramenti futuri, non è consigliabile eseguire una query sulla tabella per l'uso in produzione. Se si aggiungono manualmente dati alla tabella, si presume che i record siano precedenti ad altre modifiche perché le colonne della versione mancano.