Sdílet prostřednictvím


Použití změnového datového kanálu Delta Lake v Azure Databricks

Kanál změn dat umožňuje Službě Azure Databricks sledovat změny na úrovni řádků mezi verzemi tabulky Delta. Pokud je v tabulce Delta povolená, modul runtime zaznamenává změny událostí pro všechna data zapsaná do tabulky. To zahrnuje data řádků spolu s metadaty označujícími, jestli byl zadaný řádek vložený, odstraněný nebo aktualizovaný.

Důležité

Datový kanál změn funguje ve spojení s historií tabulek a poskytuje informace o změnách. Klonování tabulky Delta vytváří samostatnou historii, a proto se datový kanál změn u naklonovaných tabulek neshoduje s původní tabulkou.

Přírůstkové zpracování dat o změnách

Databricks doporučuje používat strukturované streamování v kombinaci s protokolem změn k postupnému zpracovávání změn z tabulek Delta. Strukturované streamování pro Azure Databricks musíte použít k automatickému sledování verzí pro datový kanál změn vaší tabulky.

Poznámka:

DLT poskytuje funkce pro snadné šíření dat změn a ukládání výsledků jako SCD (pomalu se měnící dimenze) typu 1 nebo typ 2 tabulek. Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocíDLT .

Pokud chcete číst datový kanál změn z tabulky, musíte ho v této tabulce povolit. Podívejte se na Povolení změn dat.

Nastavte možnost readChangeFeed na true, když konfigurujete datový proud proti tabulce pro čtení kanálu změnových dat, jak je znázorněno v následujícím příkladu syntaxe:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Ve výchozím nastavení datový proud vrátí nejnovější snímek tabulky při prvním spuštění streamu jako INSERT a následné změny jako změnová data.

Potvrzení změn dat probíhá jako součást transakce Delta Lake a stává se dostupným ve stejnou dobu, kdy se nová data zapisují do tabulky.

Volitelně můžete zadat počáteční verzi. Viz Mám zadat počáteční verzi?.

Kanál změn dat také podporuje dávkové spouštění, které vyžaduje zadání počáteční verze. Viz Čtení změn v dávkových dotazech.

Možnosti, jako jsou limity rychlosti (maxFilesPerTrigger, maxBytesPerTrigger) a excludeRegex jsou také podporovány při čtení dat o změnách.

Omezení rychlosti může být atomické pro jiné verze než počáteční verze snímku. To znamená, že celá verze potvrzení bude omezená rychlostí nebo se vrátí celé potvrzení.

Mám zadat počáteční verzi?

Pokud chcete ignorovat změny, ke kterým došlo před konkrétní verzí, můžete volitelně zadat počáteční verzi. Verzi můžete zadat pomocí časového razítka nebo čísla ID verze zaznamenaného v transakčním protokolu Delta.

Poznámka:

Pro čtení dávek je vyžadována počáteční verze a mnoho dávkových vzorů může těžit z možnosti nastavit volitelnou koncovou verzi.

Při konfiguraci úloh strukturovaného streamování zahrnujících datový kanál změn je důležité pochopit, jak určení počáteční verze ovlivňuje zpracování.

Mnohé úlohy streamování, zejména nové pipeliny pro zpracování dat, těží z výchozího chování. Při výchozím chování se první dávka zpracuje, když stream nejdříve zaznamená všechny existující záznamy v tabulce jako operace INSERT v kanálu změn dat.

Pokud cílová tabulka již obsahuje všechny záznamy s odpovídajícími změnami až do určitého bodu, zadejte počáteční verzi, aby se zabránilo zpracování stavu zdrojové tabulky jako INSERT událostí.

Příklad syntaxe níže ukazuje obnovení po selhání streamování, při kterém byl poškozen kontrolní bod. V tomto příkladu předpokládejme následující podmínky:

  1. U zdrojové tabulky byla při vytváření tabulky povolená změna datového kanálu.
  2. Cílová podřízená tabulka zpracovala všechny změny až do verze 75 a včetně ní.
  3. Historie verzí zdrojové tabulky je k dispozici pro verze 70 a vyšší.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

V tomto příkladu musíte také zadat nové umístění kontrolního bodu.

Důležité

Pokud zadáte počáteční verzi, stream se nepovede spustit z nového kontrolního bodu, pokud už v historii tabulek neexistuje výchozí verze. Delta Lake automaticky vyčistí historické verze, což znamená, že všechny zadané počáteční verze se nakonec odstraní.

Podívejte se na Mohutíme použít změnu kanálu dat k přehrání celé historie tabulky?.

Čtení změn v dávkových dotazech

Syntaxi dávkového dotazu můžete použít ke čtení všech změn počínaje konkrétní verzí nebo ke čtení změn v zadaném rozsahu verzí.

Jako celé číslo specifikujete verzi a časové razítko jako řetězec ve formátu yyyy-MM-dd[ HH:mm:ss[.SSS]].

Počáteční a koncové verze jsou v dotazech inkluzivní. Pokud chcete přečíst změny z konkrétní počáteční verze na nejnovější verzi tabulky, zadejte pouze počáteční verzi.

Pokud zadáte verzi nižší nebo časové razítko starší, než je to, které zaznamenalo události změn – tedy bylo starší než při spuštění záznamu změn – zobrazí se chyba, že záznam změn nebyl aktivován.

Následující příklady syntaxe ukazují použití možností počáteční a koncové verze při dávkovém čtení.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Poznámka:

Ve výchozím nastavení platí, že pokud uživatel předá verzi nebo časové razítko překračující poslední potvrzení v tabulce, vyvolá se chyba timestampGreaterThanLatestCommit . V Databricks Runtime 11.3 LTS a vyšší může datový kanál pro změny zpracovat případ verze mimo rozsah, pokud uživatel nastaví následující konfiguraci:true

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Pokud zadáte počáteční verzi větší než poslední potvrzení v tabulce nebo novější časové razítko spuštění než poslední potvrzení v tabulce, vrátí se po povolení předchozí konfigurace prázdný výsledek čtení.

Pokud zadáte koncovou verzi větší než poslední potvrzení v tabulce nebo novější časové razítko ukončení než poslední potvrzení v tabulce, vrátí se při povolení předchozí konfigurace v režimu dávkového čtení všechny změny mezi počáteční verzí a posledním potvrzením.

Jaké je schéma kanálu změn dat?

Při čtení z datového kanálu změn pro tabulku se použije schéma nejnovější verze tabulky.

Poznámka:

Většina operací změny schématu a vývoje je plně podporovaná. Tabulka s povoleným mapováním sloupců nepodporuje všechny případy použití a demonstruje jiné chování. Viz Omezení kanálu změn dat pro tabulky s povoleným mapováním sloupců.

Kromě datových sloupců ze schématu tabulky Delta obsahuje datový kanál změn sloupce metadat, které identifikují typ události změny:

Název sloupce Typ Hodnoty
_change_type řetězec insert, update_preimage , update_postimage, delete(1)
_commit_version Dlouhý Verze protokolu Delta nebo tabulky obsahující změnu.
_commit_timestamp Časové razítko Časové razítko přidružené k vytvoření potvrzení.

(1)preimage je hodnota před aktualizací, postimage je hodnota po aktualizaci.

Poznámka:

Pokud schéma obsahuje sloupce se stejnými názvy jako tyto přidané sloupce, nelze v tabulce povolit datový kanál změn. Před pokusem o povolení kanálu změn dat přejmenujte sloupce v tabulce, abyste tento konflikt vyřešili.

Povolení toku změn dat

Datový proud změn můžete číst jen u povolených tabulek. Možnost změn datového kanálu musíte povolit explicitně pomocí jedné z následujících metod:

  • Nová tabulka: Nastavte vlastnost delta.enableChangeDataFeed = true tabulky v CREATE TABLE příkazu.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Existující tabulka: Nastavte vlastnost delta.enableChangeDataFeed = true tabulky v ALTER TABLE příkazu.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Všechny nové tabulky:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Důležité

Zaznamenávají se pouze změny provedené po povolení datového kanálu změn. Předchozí změny v tabulce se nezachytí.

Změna úložiště dat

Povolení funkce změnového datového kanálu způsobí malé zvýšení nákladů na úložiště pro tabulku. Při spuštění dotazu se vygenerují záznamy změn dat a jsou obecně mnohem menší než celková velikost přepsaných souborů.

Azure Databricks zaznamenává změny dat pro UPDATE, DELETE a MERGE operace ve složce _change_data pod adresářem tabulky. Některé operace, jako jsou operace jen pro vložení a odstranění celého oddílu, negenerují data v _change_data adresáři, protože Azure Databricks dokáže efektivně vypočítat datový kanál změn přímo z transakčního protokolu.

Všechna čtení datových souborů ve _change_data složce by měla projít podporovanými rozhraními Delta Lake API.

Soubory ve _change_data složce se řídí zásadami uchovávání informací v tabulce. Údaje o změnách datového kanálu se odstraní při spuštění příkazu VACUUM.

Můžu pomocí kanálu změn dat přehrát celou historii tabulky?

Datový kanál změn není určen jako trvalý záznam všech změn v tabulce. Kanál změn dat zaznamenává pouze změny, ke kterým dojde po jeho povolení.

Změnový kanál a Delta Lake vám umožňují vždy zrekonstruovat úplný snímek zdrojové tabulky, což znamená, že můžete zahájit nové streamové čtení proti tabulce, která má povolený kanál změn, a zachytit aktuální verzi této tabulky a všechny změny, ke kterým dojde následně.

Záznamy v datovém kanálu změn musíte považovat za přechodné a přístupné pouze pro zadané okno uchovávání informací. Protokol transakcí Delta v pravidelných intervalech odebere verze tabulek a jejich odpovídající verze datového kanálu změn. Při odebrání verze z transakčního protokolu už nemůžete číst datový kanál změn pro danou verzi.

Pokud váš případ použití vyžaduje zachování trvalé historie všech změn v tabulce, měli byste k zápisu záznamů z datového kanálu změn do nové tabulky použít přírůstkovou logiku. Následující příklad kódu ukazuje použití trigger.AvailableNow, který využívá inkrementální zpracování strukturovaného streamování, ale zpracovává dostupná data jako dávkovou úlohu. Tuto úlohu můžete naplánovat asynchronně s hlavními kanály zpracování a vytvořit zálohu datového kanálu změn pro účely auditování nebo úplné přehrání.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Změna omezení datového kanálu pro tabulky s povoleným mapováním sloupců

Pokud je u tabulky Delta povolené mapování sloupců, můžete v tabulce přetáhnout nebo přejmenovat sloupce bez přepsání datových souborů pro existující data. Při povoleném mapování sloupců má datový kanál změn omezení po provedení neaditivních změn schématu, jako je přejmenování nebo zrušení sloupce, změna datového typu nebo změny nulovatelnosti.

Důležité

  • Nelze číst datový kanál změn pro transakci nebo rozsah, ve kterém dochází k neaditivní změně schématu, pomocí sémantiky dávky.
  • Ve verzích Databricks Runtime 12.2 LTS a starších tabulky s povoleným mapováním sloupců, u kterých došlo k nepřidávajícím změnám schématu, nepodporují streamované čtení z kanálu změn. Viz Streamování s mapováním sloupců a změnami schématu.
  • V Databricks Runtime 11.3 LTS a níže nemůžete číst datový kanál změn pro tabulky s povoleným mapováním sloupců, u kterých došlo k přejmenování nebo vyřazení sloupců.

Ve službě Databricks Runtime 12.2 LTS a vyšší můžete provádět dávkové čtení změnového datového proudu pro tabulky s povoleným mapováním sloupců, u kterých došlo k neaditivním změnám schématu. Místo použití schématu nejnovější verze tabulky operace čtení používají schéma koncové verze tabulky zadané v dotazu. Dotazy stále selžou, pokud zadaný rozsah verzí zahrnuje změnu schématu, která není aditivní.