Delta Lake változásadat-hírcsatorna használata az Azure Databricksben
A változásadatcsatorna lehetővé teszi, hogy az Azure Databricks nyomon kövesse a Delta-tábla verziói közötti sorszintű változásokat. Ha egy Delta-táblában engedélyezve van, a futtatókörnyezet rekordjai a táblába írt összes adat eseményeinek megváltoznak. Ide tartoznak a soradatok, valamint a metaadatok, amelyek jelzik, hogy a megadott sor be lett-e szúrva, törölve vagy frissítve.
Fontos
A változásadatcsatorna a táblaelőzményekkel együtt működik a változásadatok megadásához. Mivel a Delta-tábla klónozása külön előzményt hoz létre, a klónozott táblák változási adatcsatornája nem egyezik meg az eredeti táblával.
Változásadatok növekményes feldolgozása
A Databricks a változásadatcsatorna és a strukturált streamelés együttes használatát javasolja a Delta-táblák változásainak növekményes feldolgozásához. Az Azure Databricks strukturált streameléssel automatikusan nyomon követheti a tábla változásadatcsatornájának verzióit.
Feljegyzés
A DLT lehetővé teszi a változásadatok egyszerű propagálását és az eredmények tárolását SCD (lassan változó dimenzió) 1- vagy 2-es típusú táblákként. Lásd: A MÓDOSÍTÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése A DLThasználatával.
Ha a változásadatcsatornát egy táblából szeretné olvasni, engedélyeznie kell a változási adatcsatornát a táblán. Tekintse meg a változási adatcsatorna engedélyezését.
Állítsa be a readChangeFeed
beállítást true
-re, amikor egy adatfolyamot konfigurál egy táblára az adatmódosítási folyam olvasásához, ahogyan az az alábbi szintaxisbeli példában látható.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
Alapértelmezés szerint, amikor a stream először elindul, visszaadja a tábla legújabb pillanatképét INSERT
-ként, és a jövőbeli változásokat változásadatok formájában.
A Delta Lake-tranzakció részeként az adatelkötelezések változnak, és az új adatok táblába történő elkötelezésével egyidejűleg elérhetővé válnak.
Igény szerint megadhat egy kezdő verziót. Lásd: Kezdő verzió megadása?.
Az adatcsatorna módosítása támogatja a kötegelt végrehajtást is, amelyhez meg kell adni egy kezdő verziót. Lásd Változások megtekintése kötegelt lekérdezésekben.
Az olyan lehetőségek, mint a sebességkorlátok (maxFilesPerTrigger
), maxBytesPerTrigger
és excludeRegex
a változásadatok olvasásakor is támogatottak.
A sebességkorlátozás a kezdő pillanatkép-verziótól eltérő verziók esetében lehet atomi. Ez azt jelenti, hogy a commit verzió sebességkorlátozva lesz, vagy a teljes commit vissza lesz adva.
Meg kell adnom egy kezdő verziót?
Ha figyelmen kívül szeretné hagyni az adott verzió előtt történt módosításokat, megadhat egy kezdő verziót. A verziót időbélyeg használatával vagy a Delta tranzakciónaplóban rögzített verzióazonosító-számmal adhatja meg.
Megjegyzés
A kötegolvasásokhoz kezdő verzióra van szükség, és számos kötegminta hasznos lehet az opcionális befejezési verzió beállításában.
Ha strukturált streamelési számítási feladatokat konfigurál, beleértve a változási adatcsatornát, fontos tisztában lenni azzal, hogy a kezdő verzió megadása hogyan befolyásolja a feldolgozást.
Számos streamelési számítási feladat, különösen az új adatfeldolgozási folyamatok kihasználják az alapértelmezett viselkedést. Az alapértelmezett viselkedéssel az első köteg akkor lesz feldolgozva, amikor a stream először rögzíti a tábla összes meglévő rekordját INSERT
műveletként a változásadat-csatornában.
Ha a céltábla már tartalmazza a megfelelő módosításokkal rendelkező összes rekordot egy adott pontig, adjon meg egy kezdő verziót, hogy elkerülje a forrástábla állapotának INSERT
eseményekként való feldolgozását.
Az alábbi példa szintaxisa helyreállítható egy olyan streamelési hibából, amelyben az ellenőrzőpont sérült. Ebben a példában a következő feltételeket feltételezzük:
- A módosítási adatcsatorna engedélyezve lett a forrástáblában a tábla létrehozásakor.
- A cél táblázat feldolgozta az összes módosítást a 75-ös verzióig, beleértve a 75-ös verziót is.
- A forrástábla verzióelőzményei a 70-es és újabb verziókhoz érhetők el.
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
Ebben a példában meg kell adnia egy új ellenőrzőpont-helyet is.
Fontos
Ha kezdő verziót ad meg, a stream nem indul el egy új ellenőrzőpontról, ha a kezdő verzió már nem szerepel a táblaelőzményekben. A Delta Lake automatikusan törli a korábbi verziókat, ami azt jelenti, hogy az összes megadott kezdőverzió törlődik.
Lásd Használhatom a változásadatcsatornát egy tábla teljes előzményeinek lejátszásához?.
A változások olvasása kötegelt lekérdezésekben
A batch-lekérdezés szintaxisával beolvashatja az összes módosítást egy adott verziótól kezdve, vagy beolvashatja a módosításokat egy megadott verziótartományon belül.
A verziót egész számként, az időbélyegeket pedig sztringként adja meg.yyyy-MM-dd[ HH:mm:ss[.SSS]]
A lekérdezések tartalmazzák a kezdő és a befejező verziót. Ha egy adott kezdő verzióról a tábla legújabb verziójára szeretné beolvasni a módosításokat, csak a kezdő verziót adja meg.
Ha egy korábbi verziót vagy időbélyeget ad meg, mint amelyik változáseseményeket rögzített – vagyis amikor a változásadatcsatorna engedélyezve volt – hibaüzenet jelenik meg, amely azt jelzi, hogy a változásadatcsatorna nem volt engedélyezve.
Az alábbi szintaxisbeli példák bemutatják a kezdő és záró verzió beállítások kötegolvasásokkal való használatát:
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Python
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Megjegyzés
Alapértelmezés szerint, ha egy felhasználó egy táblán az utolsó véglegesítést meghaladó verziót vagy időbélyeget ad át, a timestampGreaterThanLatestCommit
hiba jelenik meg. A Databricks Runtime 11.3 LTS és újabb verzióiban az adatcsatorna módosítása képes kezelni a tartományon kívüli verzió esetét, ha a felhasználó a következő konfigurációt true
állítja be:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Ha egy táblában az utolsó véglegesítésnél nagyobb kezdőverziót vagy egy tábla utolsó véglegesítésénél újabb kezdési időbélyeget ad meg, akkor az előző konfiguráció engedélyezésekor a rendszer üres olvasási eredményt ad vissza.
Ha egy táblában az utolsó véglegesítésnél nagyobb végverziót vagy egy tábla utolsó véglegesítésénél újabb befejezési időbélyeget ad meg, akkor amikor az előző konfiguráció engedélyezve van kötegelt olvasási módban, a rendszer visszaadja a kezdő verzió és az utolsó véglegesítés közötti összes módosítást.
Mi a változásadatcsatorna sémája?
Amikor egy tábla változásadatcsatornájából olvas, a rendszer a legújabb táblaverzió sémáját használja.
Feljegyzés
A legtöbb sémamódosítási és evolúciós művelet teljes mértékben támogatott. Az oszlopleképezést engedélyező táblázat nem támogatja az összes használati esetet, és eltérő viselkedést mutat. Lásd Az oszlopleképezést engedélyező táblák adatcsatorna-korlátozásainak módosításacímű témakört.
A Változástábla sémájának adatoszlopai mellett a változásadatcsatorna metaadatoszlopokat is tartalmaz, amelyek azonosítják a változásesemény típusát:
Oszlop neve | Típus | Értékrend |
---|---|---|
_change_type |
Sztring |
insert , update_preimage , update_postimage delete (1) |
_commit_version |
Hosszú | A módosítást tartalmazó Delta-napló vagy táblaverzió. |
_commit_timestamp |
Időbélyegző | A véglegesítés létrehozásakor társított időbélyeg. |
(1)preimage
a frissítés előtti érték, postimage
a frissítés utáni érték.
Feljegyzés
Nem engedélyezhető a tábla adatcsatornájának módosítása, ha a séma a hozzáadott oszlopokkal azonos nevű oszlopokat tartalmaz. Nevezze át a tábla oszlopait az ütközés feloldásához, mielőtt engedélyezni szeretné az adatcsatorna módosítását.
Változásadat-folyam engedélyezése
Csak az engedélyezett táblák változásadatcsatornája olvasható. Explicit módon engedélyeznie kell a változásadatcsatorna-beállítást az alábbi módszerek egyikével:
Új tábla: Állítsa be a táblatulajdonságot
delta.enableChangeDataFeed = true
aCREATE TABLE
parancsban.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Meglévő tábla: Állítsa be a táblatulajdonságot
delta.enableChangeDataFeed = true
aALTER TABLE
parancsban.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Minden új tábla:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Fontos
Csak a változásadatcsatorna engedélyezése után végrehajtott módosítások lesznek rögzítve. A rendszer nem rögzíti a tábla korábbi módosításait.
Adattárolás módosítása
A változásadatcsatorna engedélyezése kis mértékben növeli a táblák tárolási költségeit. A változó adatrekordok a lekérdezés futtatásakor jönnek létre, és általában sokkal kisebbek, mint az újraírt fájlok teljes mérete.
Az Azure Databricks-rekordok módosítják UPDATE
, DELETE
és MERGE
műveletek adatait a táblakönyvtár alatti _change_data
mappában. Egyes műveletek, például a csak beszúrási műveletek és a teljes partíció törlések, nem hoznak létre adatokat a _change_data
könyvtárban, mert az Azure Databricks hatékonyan tudja kiszámítani a változásadatcsatornát közvetlenül a tranzakciónaplóból.
A mappában lévő _change_data
adatfájlokra irányuló összes olvasásnak támogatott Delta Lake API-kon kell keresztülmennie.
A _change_data
mappában lévő fájlok a tábla adatmegőrzési szabályzatát követik. Az adatcsatornák adatai törlődnek a VACUUM
parancs futtatásakor.
Használhatok módosítási adatcsatornát egy tábla teljes előzményeinek lejátszásához?
A változásadatcsatorna nem szolgál a tábla összes módosításának állandó rekordjaként. Az adatcsatorna módosítása csak az engedélyezés után bekövetkező változásokat rögzíti.
Az adatcsatorna módosítása és a Delta Lake lehetővé teszi, hogy mindig rekonstruáljon egy forrástábla teljes pillanatképét, ami azt jelenti, hogy elindíthat egy új streamelést egy táblán, amelyen engedélyezve van a változásadatcsatorna, és rögzítheti a tábla aktuális verzióját, valamint az azt követő összes módosítást.
A változásadat-csatornában lévő rekordokat átmenetiként kell kezelnie, és csak egy megadott adatmegőrzési időszakhoz kell hozzáférnie. A Delta tranzakciónapló rendszeres időközönként eltávolítja a táblaverziókat és azok megfelelő változásadatcsatorna-verzióit. Ha eltávolít egy verziót a tranzakciónaplóból, a továbbiakban nem olvashatja el az adott verzióhoz tartozó változásadatcsatornát.
Ha a használati esethez egy tábla összes módosításának állandó előzményeit kell megőriznie, növekményes logikával rekordokat kell írnia a változásadatcsatornából egy új táblába. Az alábbi példakód bemutatja, hogyan használható a trigger.AvailableNow
, amely a strukturált streamelés inkrementális feldolgozását alkalmazza, de a rendelkezésre álló adatokat kötegelt számítási feladatként dolgozza fel. Ezt a számítási feladatot aszinkron módon ütemezheti a fő feldolgozási folyamatokkal, így naplózási célból vagy teljes újrajátszhatóság érdekében biztonsági másolatot készíthet a változási adatcsatornáról.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Az oszlopleképezést engedélyező táblák adatcsatorna-korlátozásainak módosítása
Ha egy Delta-táblában engedélyezve van az oszlopleképezés, a meglévő adatok adatfájljainak újraírása nélkül is elvetheti vagy átnevezheti a tábla oszlopait. Ha az oszlopleképezés engedélyezve van, a változásadatcsatorna korlátozásokkal rendelkezik a nem additív sémamódosítások, például az oszlopok átnevezése vagy elvetése, az adattípus módosítása vagy a nullbilitás módosítása után.
Fontos
- Olyan tranzakció vagy tartomány esetén, amelyben nem additív sémamódosítás történik, a változási adatok adatfolyama nem olvasható be kötegelt feldolgozás során.
- A Databricks Runtime 12.2 LTS és az alatti verziókban az oszlopleképezést engedélyező táblák, amelyeknél nem additív sémamódosítások történtek, nem támogatják a változásadat-csatorna streaming olvasását. Lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.
- A Databricks Runtime 11.3 LTS-ben és az alábbi verziókban nem olvasható a változási adatfolyam azoknál a tábláknál, amelyeknél oszlopleképezés engedélyezett, és oszlopok át lettek nevezve vagy el lettek távolítva.
A Databricks Runtime 12.2 LTS-ben és újabb verziókban kötegelt beolvasásokat végezhet a változásadatcsatornán olyan táblák esetében, amelyek oszlopleképezése engedélyezve van, és amelyek nem additív sémamódosításokat tapasztaltak. A tábla legújabb verziójának sémája helyett az olvasási műveletek a lekérdezésben megadott tábla végverziójának sémáját használják. A lekérdezések továbbra is sikertelenek maradnak, ha a megadott verziótartomány nem additív sémamódosításra terjed ki.