Delta táblák streamelésének olvasása és írása
A Delta Lake mélyen integrálva van a Spark strukturált streameléssel readStream
és writeStream
a . A Delta Lake számos olyan korlátozást leküzd, amelyek általában a streamelési rendszerekhez és fájlokhoz kapcsolódnak, beleértve a következőket:
- Kis késésű betöltéssel létrehozott kis fájlok szenesítése.
- A "pontosan egyszer" feldolgozás fenntartása egynél több streammel (vagy egyidejű kötegelt feladatokkal).
- Hatékonyan felderítheti, hogy mely fájlok újak, amikor fájlokat használ a stream forrásaként.
Feljegyzés
Ez a cikk a Delta Lake-táblák streamforrásként és fogadóként való használatát ismerteti. Ha tudni szeretné, hogyan tölthet be adatokat streamelő táblákkal a Databricks SQL-ben, olvassa el Adatok betöltése streamtáblákkal a Databricks SQL.
A Delta Lake-hez való stream-statikus illesztésekről további információt a Stream-statikus illesztések című témakörben talál.
Delta-tábla forrásként
A strukturált streamelés növekményesen olvassa be a Delta-táblákat. Bár a streamelési lekérdezés folyamatosan aktív egy Delta-táblán, az új rekordokat idempotensen dolgozzák fel, amikor a táblaverziók rögzítődnek a forrástáblában.
Az alábbi példakód egy streamelési olvasás konfigurálását mutatja be a táblanév vagy a fájl elérési útja alapján.
Python
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Fontos
Ha egy Delta-tábla sémája megváltozik, miután egy streamelési olvasás elkezdődött a táblán, a lekérdezés sikertelen lesz. A legtöbb sémamódosítás esetén újraindíthatja a streamet a sémaeltérés feloldásához és a feldolgozás folytatásához.
A Databricks Runtime 12.2 LTS-ben és alatta nem streamelhet olyan Delta-táblából, amelyen engedélyezve van az oszlopleképezés, amely nem additív sémafejlődésen ment keresztül, például oszlopok átnevezése vagy elvetése. Részletekért lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.
Bemeneti sebesség korlátozása
A mikro kötegek szabályozásához a következő lehetőségek állnak rendelkezésre:
-
maxFilesPerTrigger
: Hány új fájlt kell figyelembe venni minden mikrokötegben. Az alapértelmezett érték 1000. -
maxBytesPerTrigger
: Mennyi adat lesz feldolgozva az egyes mikrokötegekben. Ez a beállítás "soft max" értéket állít be, ami azt jelenti, hogy egy köteg körülbelül ennyi adatot dolgoz fel, és a korlátnál többet is feldolgozhat annak érdekében, hogy a streamlekérdezés előrehaladjon olyan esetekben, amikor a legkisebb bemeneti egység nagyobb ennél a korlátnál. Ez alapértelmezés szerint nincs beállítva.
Ha a maxBytesPerTrigger
-t a maxFilesPerTrigger
-gyel együtt használja, a mikroköteg addig dolgozza fel az adatokat, amíg el nem éri a maxFilesPerTrigger
vagy a maxBytesPerTrigger
korlátot.
Feljegyzés
Ha a forrástábla tranzakciói a logRetentionDuration
konfigurációs miatt törlődnek, és a streamelési lekérdezés megpróbálja feldolgozni ezeket a verziókat, a lekérdezés alapértelmezés szerint nem kerüli el az adatvesztést. A failOnDataLoss
opció false
beállításával figyelmen kívül hagyhatja az elveszett adatokat, és folytathatja a feldolgozást.
Delta Lake change data capture (CDC) hírcsatorna streamelése
A Delta Lake változási adatfolyam rögzíti egy Delta-tábla módosításait, beleértve a frissítéseket és a törléseket is. Ha engedélyezve van, a változási adatok adatfolyamából streamelhet, és logikát írhat a beszúrások, frissítések és törlések feldolgozására a következő táblákba. Bár a változáskövetési adatcsatorna adatkimenete kissé eltér az általa leírt Delta-táblától, ez megoldást nyújt a növekményes változások propagálására az alsóbb rétegbeli táblákra egy medál architektúrában.
Fontos
A Databricks Runtime 12.2 LTS-ben és alatta nem streamelhet a változásadatcsatornából olyan Delta-tábla esetében, amelynek oszlopleképezése engedélyezve van, és amely nem additív sémafejlődésen ment keresztül, például oszlopok átnevezése vagy elvetése. Lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.
Frissítések és törlések mellőzése
A strukturált streamelés nem kezeli a nem hozzáfűző bemeneteket, és kivételt okoz, ha bármilyen módosítás történik a forrásként használt táblán. Az alsóbb rétegben nem automatikusan propagált változások kezelésére két fő stratégia létezik:
- Törölheti a kimenetet és az ellenőrzőpontot, és az elejétől újraindíthatja a streamet.
- A következő két lehetőség közül választhat:
-
ignoreDeletes
: hagyja figyelmen kívül a partícióhatárokon adatokat törlő tranzakciókat. -
skipChangeCommits
: hagyja figyelmen kívül a meglévő rekordokat törlő vagy módosító tranzakciókat.skipChangeCommits
alösszegekignoreDeletes
.
-
Feljegyzés
A Databricks Runtime 12.2 LTS-ben és újabb skipChangeCommits
verziókban elavult az előző beállítás ignoreChanges
. A Databricks Runtime 11.3 LTS-ben és az alacsonyabb ignoreChanges
verzióban ez az egyetlen támogatott lehetőség.
A(z) ignoreChanges
szemantikája nagymértékben különbözik a következőtől: skipChangeCommits
. Ha engedélyezve van a ignoreChanges
, a forrástábla újraírt adatfájljai újra ki lesznek bocsátva egy adatmódosítási művelet után, például UPDATE
, MERGE INTO
, DELETE
(partíciókon belül) vagy OVERWRITE
. A változatlan sorok gyakran új sorok mellett jelennek meg, így az alsóbb rétegbeli fogyasztóknak képesnek kell lenniük az ismétlődések kezelésére. A törléseket a rendszer nem propagálja lefelé.
ignoreChanges
alösszegek ignoreDeletes
.
A(z) skipChangeCommits
teljes mértékben figyelmen kívül hagyja a fájlmódosítási műveleteket. A forrástáblában az adatmódosítási művelet( például UPDATE
, MERGE INTO
, DELETE
és OVERWRITE
) miatt átírt adatfájlok teljes mértékben figyelmen kívül lesznek hagyva. Annak érdekében, hogy a felsőbb szintű forrástáblákban bekövetkezett változásokat tükrözzük, külön logikát kell megvalósítani a módosítások átvezetéséhez.
A konfigurált számítási feladatok továbbra is ismert szemantikával ignoreChanges
működnek, de a Databricks az összes új számítási feladat használatát skipChangeCommits
javasolja. A számítási feladatok ignoreChanges
migrálása újrabontási logikát skipChangeCommits
igényel.
Példa
Tegyük fel például, hogy user_events
date
, user_email
és action
oszlopokkal rendelkezik, amelyeket date
particionált. A user_events
táblából streamelsz, és a GDPR követelményei miatt törölnöd kell az adatokat.
Ha a partícióhatárokon töröl (azaz a WHERE
egy partícióoszlopon van), a fájlok már érték szerint vannak szegmentálva, így a törlés egyszerűen eltávolítja ezeket a fájlokat a metaadatokból. Ha egy teljes adatpartíciót töröl, az alábbiakat használhatja:
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Ha több partícióban töröl adatokat (ebben a példában a szűrésre user_email
), használja a következő szintaxist:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Ha a user_email
-t a UPDATE
utasítással frissíti, akkor a szóban forgó user_email
-t tartalmazó fájl újraíródik. A módosított adatfájlok figyelmen kívül hagyása skipChangeCommits
.
Kezdeti pozíció megadása
Az alábbi beállítások segítségével a Delta Lake streamelési forrásának kiindulópontját a teljes tábla feldolgozása nélkül adhatja meg.
startingVersion
: A Delta Lake-verzió, amelyből kiindulni szeretne. A Databricks azt javasolja, hogy a legtöbb számítási feladat esetében kihagyja ezt a lehetőséget. Ha nincs beállítva, a stream a legújabb elérhető verziótól indul, beleértve az adott pillanatban a táblázat teljes pillanatképét.Ha meg van adva, a stream beolvassa a Delta-tábla minden módosítását a megadott verziótól kezdve (a teljes verziót is beleértve). Ha a megadott verzió már nem érhető el, a stream nem indul el. A véglegesítési verziókat a
version
parancs kimenetének DESCRIBE HISTORY oszlopából szerezheti be.Ha csak a legújabb módosításokat szeretné visszaadni, adja meg a következőt
latest
: .startingTimestamp
: A kezdéshez megadott időbélyeg. Az időbélyegen vagy után véglegesített összes táblamódosítást (beleértve) a streamolvasó felolvassa. Ha a megadott időbélyeg megelőzi az összes tábla véglegesítését, a streamelési olvasás a legkorábbi elérhető időbélyeggel kezdődik. Az alábbiak egyike:- Időbélyeg-sztring. Például:
"2019-01-01T00:00:00.000Z"
. - Dátumsztring. Például:
"2019-01-01"
.
- Időbélyeg-sztring. Például:
A két beállítás egyszerre nem állítható be. Ezek csak új streamelési lekérdezés indításakor lépnek érvénybe. Ha egy streamlekérdezés elindult, és a folyamat az ellenőrzőponton lett rögzítve, a rendszer figyelmen kívül hagyja ezeket a beállításokat.
Fontos
Bár a streamforrást elindíthatja egy megadott verzióról vagy időbélyegről, a streamforrás sémája mindig a Delta-tábla legújabb sémája. Győződjön meg arról, hogy a megadott verzió vagy időbélyeg után nem történt inkompatibilis sémamódosítás a Delta táblában. Ellenkező esetben előfordulhat, hogy a streamforrás helytelen eredményeket ad vissza, amikor helytelen sémával olvassa be az adatokat.
Példa
Tegyük fel például, hogy van egy tábla user_events
. Ha az 5. verzió óta szeretné elolvasni a módosításokat, használja a következőt:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Ha 2018-10-18 óta szeretné elolvasni a módosításokat, használja a következőt:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Kezdeti pillanatkép feldolgozása adatelhagyás nélkül
Ez a funkció a Databricks Runtime 11.3 LTS-en és újabb verziókban érhető el.
Ha deltatáblát használ streamforrásként, a lekérdezés először feldolgozza a táblában található összes adatot. Az ebben a verzióban található Delta-táblát kezdeti pillanatképnek nevezzük. Alapértelmezés szerint a Delta-tábla adatfájljai a legutóbb módosított fájl alapján lesznek feldolgozva. Az utolsó módosítási idő azonban nem feltétlenül a rekord eseményidejének sorrendjét jelöli.
Egy meghatározott vízjelet tartalmazó állapotalapú streamlekérdezésben a fájlok módosítási idő szerinti feldolgozása azt eredményezheti, hogy a rekordok feldolgozása helytelen sorrendben történik. Ez azt eredményezheti, hogy a rekordok a vízjel késői eseményeiként csökkennek.
Az adatcseppek problémáját az alábbi beállítás engedélyezésével kerülheti el:
- withEventTimeOrder: Azt jelzi, hogy a kezdeti pillanatképet eseményidőrenddel kell-e feldolgozni.
Ha az esemény időrendje engedélyezve van, a kezdeti pillanatképadatok eseményideje időgyűjtőkre van osztva. Minden mikro köteg feldolgoz egy gyűjtőt az időtartományon belüli adatok szűrésével. A maxFilesPerTrigger és a maxBytesPerTrigger konfigurációs beállítások továbbra is alkalmazhatók a mikrobatch méretének szabályozására, de csak hozzávetőlegesen a feldolgozás jellege miatt.
Az alábbi ábrán ez a folyamat látható:
A funkcióval kapcsolatos jelentős információk:
- Az adatcsepp-probléma csak akkor fordul elő, ha egy állapotalapú streamelési lekérdezés kezdeti Delta-pillanatképe az alapértelmezett sorrendben van feldolgozva.
- A stream lekérdezés elindítása után nem módosítható
withEventTimeOrder
a kezdeti pillanatkép feldolgozása. Ha módosítani szeretné azwithEventTimeOrder
újraindítást, törölnie kell az ellenőrzőpontot. - Ha olyan stream-lekérdezést futtat, amelyen engedélyezve van az EventTimeOrder, nem állíthatja vissza olyan DBR-verzióra, amely nem támogatja ezt a funkciót, amíg a kezdeti pillanatkép-feldolgozás be nem fejeződik. Ha vissza kell állítania, megvárhatja a kezdeti pillanatkép befejezését, vagy törölheti az ellenőrzőpontot, és újraindíthatja a lekérdezést.
- Ez a funkció nem támogatott a következő gyakori forgatókönyvekben:
- Az eseményidő oszlop egy generált oszlop, és nem-vetítéses átalakítások vannak a Delta-forrás és a vízjel között.
- Van egy vízjel, amelynek több Delta-forrása van a stream lekérdezésben.
- Ha az esemény időrendje engedélyezve van, a Delta kezdeti pillanatképének feldolgozása lassabb lehet.
- Minden mikro köteg megvizsgálja a kezdeti pillanatképet, hogy a megfelelő eseményidőtartományon belül szűrje az adatokat. A gyorsabb szűrőművelet érdekében javasoljuk, hogy egy Delta-forrásoszlopot használjon eseményidőként, hogy az adatok kihagyása alkalmazható legyen (ellenőrizze a Delta Lake adatkihagyását, ha alkalmazható). Emellett a tábla particionálása az eseményidő oszlop mentén tovább felgyorsíthatja a feldolgozást. A Spark felhasználói felületén ellenőrizheti, hogy egy adott mikro köteg hány deltafájlt vizsgál.
Példa
Tegyük fel, hogy van egy user_events
táblázatod, amely egy event_time
oszlopot tartalmaz. A streamelési lekérdezés egy összesítő lekérdezés. Ha biztosítani szeretné, hogy a pillanatképek kezdeti feldolgozása során ne csökkenjen az adat, a következőt használhatja:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Feljegyzés
Ezt a fürt Spark-konfigurációjával is engedélyezheti, amely az összes streamelési lekérdezésre vonatkozik: spark.databricks.delta.withEventTimeOrder.enabled true
Delta-táblázat fogadóként
A Strukturált streamelés használatával adatokat is írhat a Delta-táblába. A tranzakciónapló lehetővé teszi, hogy a Delta Lake egyszeri pontos feldolgozást garantáljon, még akkor is, ha más adatfolyamok vagy kötegelt lekérdezések futnak párhuzamosan a táblán.
Feljegyzés
A Delta Lake VACUUM
függvény eltávolítja a Delta Lake által nem kezelt összes fájlt, de kihagyja a _
kezdőkönyvtárakat. A Delta-táblákhoz tartozó egyéb adatok és metaadatok mellett az ellenőrzőpontokat biztonságosan tárolhatja olyan címtárstruktúra használatával, mint például a <table-name>/_checkpoints
.
Mérőszámok
Megtudhatja, hogy hány bájtot és hány fájlt kell még feldolgozni egy streamelési lekérdezési folyamatban, mint a numBytesOutstanding
metrikákat.numFilesOutstanding
További metrikák:
-
numNewListedFiles
: Azon Delta Lake-fájlok száma, amelyek a köteg hátralékának kiszámításához lettek felsorolva.-
backlogEndOffset
: A hátralék kiszámításához használt táblaverzió.
-
Ha egy jegyzetfüzetben futtatja a streamet, a streamelési lekérdezés folyamatának irányítópultján, a Nyers adatok lapon láthatja ezeket a metrikákat:
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}
Hozzáfűzési mód
Alapértelmezés szerint a streamek hozzáfűzési módban futnak, amely új rekordokat ad hozzá a táblához.
Használja a toTable
metódust táblákba való streameléskor, ahogyan az alábbi példában is látható:
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Kész mód
A strukturált streamelés segítségével a teljes táblázatot lecserélheti minden köteggel. Az egyik példahasználati eset egy összegzés kiszámítása összesítéssel:
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
Az előző példa folyamatosan frissít egy táblát, amely az ügyfelek által összesített események számát tartalmazza.
Az enyhébb késési követelményekkel rendelkező alkalmazások esetében egyszeri eseményindítókkal mentheti a számítási erőforrásokat. Ezekkel frissítheti az összefoglaló aggregációs táblákat egy adott ütemezés szerint, és csak az utolsó frissítés óta érkezett új adatokat dolgozza fel.
Upsert a streamelési lekérdezésekből a következő használatával: foreachBatch
A merge
és a foreachBatch
kombinációjával összetett upserteket írhat egy streamelési lekérdezésből egy Delta-táblába. Lásd: A foreachBatch használata tetszőleges adatelnyelőkbe való íráshoz című témakört.
Ez a minta számos alkalmazást tartalmaz, köztük a következőket:
- Stream-aggregátumok írása frissítési módban: Ez sokkal hatékonyabb, mint a teljes mód.
-
Adatbázis-módosítások adatfolyamának írása Delta-táblába: A változásadatok írására szolgáló egyesítési lekérdezés használható a
foreachBatch
, hogy folyamatosan alkalmazza a változásfolyamot egy Delta-táblára. -
Adatfolyam írása egy Delta-táblába deduplikálással: A deduplikációhoz csak beszúrást végző egyesítési lekérdezés használható a
foreachBatch
arra, hogy az adatokat folyamatosan, automatikus deduplikálással írja be (amelyek tartalmaznak ismétlődéseket).
Feljegyzés
- Győződjön meg arról, hogy a
merge
belsőforeachBatch
utasítás idempotens, mivel a streamelési lekérdezés újraindításai többször is alkalmazhatják a műveletet ugyanazon az adatkötegen. - Amikor
merge
használatbanforeachBatch
van, a streamelési lekérdezés bemeneti adatsebessége (a notebook sebességi grafikonján keresztülStreamingQueryProgress
jelentve és látható) a forrásnál az adatok létrehozásának tényleges sebességének többszöröseként jelenhet meg. Ennek az az oka, hogy a(z)merge
többször olvassa be a bemeneti adatokat, ami a bemeneti metrikák többszörözését okozza. Ha szűk keresztmetszetről van szó, gyorsítótárazhatja a kötegelt DataFrame-et a(z)merge
előtt , majdmerge
után törölheti.
Az alábbi példa bemutatja, hogyan használhatja az SQL-t foreachBatch
a feladat végrehajtásához:
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
A Delta Lake API-k használatával streamelési upserts-eket is végrehajthat, ahogyan az alábbi példában is látható:
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
idempotens táblaírások foreachBatch
Feljegyzés
A Databricks azt javasolja, hogy a foreachBatch
használata helyett minden frissíteni kívánt kimenethez külön streamelési írást konfiguráljon. Ennek az az oka, hogy a rendszer szerializálja a több táblába történő írást a foreachBatch használatakor, ami csökkenti a párhuzamosítást és növeli az általános késést.
A Delta-táblák az alábbi DataFrameWriter
lehetőségeket támogatják annak érdekében, hogy több táblába történő írás foreachBatch
során idempotens legyen.
-
txnAppId
: Egyedi sztring, amelyet az egyes DataFrame-írások továbbíthatnak. Használhatja például a StreamingQuery azonosítóttxnAppId
. -
txnVersion
: Egy monoton módon növekvő szám, amely tranzakciós verzióként működik.
A Delta Lake az ismétlődő írások kombinációját txnAppId
és txnVersion
azonosítását használja, és figyelmen kívül hagyja őket.
Ha egy kötegírás meghiúsul, a köteg újrafuttatása ugyanazzal az alkalmazással és kötegazonosítóval segíti a futtatókörnyezetet az ismétlődő írások helyes azonosításában és figyelmen kívül hagyásában. Az alkalmazásazonosító (txnAppId
) bármely felhasználó által létrehozott egyedi sztring lehet, és nem kell kapcsolódnia a streamazonosítóhoz. Lásd: A foreachBatch használata tetszőleges adatelnyelőkbe való íráshoz című témakört.
Figyelmeztetés
Ha törli a streamelési ellenőrzőpontot, és egy új ellenőrzőponttal újraindítja a lekérdezést, egy másikat txnAppId
kell megadnia. Az új ellenőrzőpontok a kötegazonosítóval kezdődnek 0
. A Delta Lake egyedi kulcsként használja a kötegazonosítót és a txnAppId
, és kihagyja a már látott értékeket tartalmazó kötegeket.
Az alábbi példakód ezt a mintát mutatja be:
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}