Megosztás a következőn keresztül:


Delta táblák streamelésének olvasása és írása

A Delta Lake mélyen integrálva van a Spark strukturált streameléssel readStream és writeStreama . A Delta Lake számos olyan korlátozást leküzd, amelyek általában a streamelési rendszerekhez és fájlokhoz kapcsolódnak, beleértve a következőket:

  • Kis késésű betöltéssel létrehozott kis fájlok szenesítése.
  • A "pontosan egyszer" feldolgozás fenntartása egynél több streammel (vagy egyidejű kötegelt feladatokkal).
  • Hatékonyan felderítheti, hogy mely fájlok újak, amikor fájlokat használ a stream forrásaként.

Feljegyzés

Ez a cikk a Delta Lake-táblák streamforrásként és fogadóként való használatát ismerteti. Ha tudni szeretné, hogyan tölthet be adatokat streamelő táblákkal a Databricks SQL-ben, olvassa el Adatok betöltése streamtáblákkal a Databricks SQL.

A Delta Lake-hez való stream-statikus illesztésekről további információt a Stream-statikus illesztések című témakörben talál.

Delta-tábla forrásként

A strukturált streamelés növekményesen olvassa be a Delta-táblákat. Bár a streamelési lekérdezés folyamatosan aktív egy Delta-táblán, az új rekordokat idempotensen dolgozzák fel, amikor a táblaverziók rögzítődnek a forrástáblában.

Az alábbi példakód egy streamelési olvasás konfigurálását mutatja be a táblanév vagy a fájl elérési útja alapján.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Fontos

Ha egy Delta-tábla sémája megváltozik, miután egy streamelési olvasás elkezdődött a táblán, a lekérdezés sikertelen lesz. A legtöbb sémamódosítás esetén újraindíthatja a streamet a sémaeltérés feloldásához és a feldolgozás folytatásához.

A Databricks Runtime 12.2 LTS-ben és alatta nem streamelhet olyan Delta-táblából, amelyen engedélyezve van az oszlopleképezés, amely nem additív sémafejlődésen ment keresztül, például oszlopok átnevezése vagy elvetése. Részletekért lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.

Bemeneti sebesség korlátozása

A mikro kötegek szabályozásához a következő lehetőségek állnak rendelkezésre:

  • maxFilesPerTrigger: Hány új fájlt kell figyelembe venni minden mikrokötegben. Az alapértelmezett érték 1000.
  • maxBytesPerTrigger: Mennyi adat lesz feldolgozva az egyes mikrokötegekben. Ez a beállítás "soft max" értéket állít be, ami azt jelenti, hogy egy köteg körülbelül ennyi adatot dolgoz fel, és a korlátnál többet is feldolgozhat annak érdekében, hogy a streamlekérdezés előrehaladjon olyan esetekben, amikor a legkisebb bemeneti egység nagyobb ennél a korlátnál. Ez alapértelmezés szerint nincs beállítva.

Ha a maxBytesPerTrigger-t a maxFilesPerTrigger-gyel együtt használja, a mikroköteg addig dolgozza fel az adatokat, amíg el nem éri a maxFilesPerTrigger vagy a maxBytesPerTrigger korlátot.

Feljegyzés

Ha a forrástábla tranzakciói a logRetentionDurationkonfigurációs miatt törlődnek, és a streamelési lekérdezés megpróbálja feldolgozni ezeket a verziókat, a lekérdezés alapértelmezés szerint nem kerüli el az adatvesztést. A failOnDataLoss opció false beállításával figyelmen kívül hagyhatja az elveszett adatokat, és folytathatja a feldolgozást.

Delta Lake change data capture (CDC) hírcsatorna streamelése

A Delta Lake változási adatfolyam rögzíti egy Delta-tábla módosításait, beleértve a frissítéseket és a törléseket is. Ha engedélyezve van, a változási adatok adatfolyamából streamelhet, és logikát írhat a beszúrások, frissítések és törlések feldolgozására a következő táblákba. Bár a változáskövetési adatcsatorna adatkimenete kissé eltér az általa leírt Delta-táblától, ez megoldást nyújt a növekményes változások propagálására az alsóbb rétegbeli táblákra egy medál architektúrában.

Fontos

A Databricks Runtime 12.2 LTS-ben és alatta nem streamelhet a változásadatcsatornából olyan Delta-tábla esetében, amelynek oszlopleképezése engedélyezve van, és amely nem additív sémafejlődésen ment keresztül, például oszlopok átnevezése vagy elvetése. Lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.

Frissítések és törlések mellőzése

A strukturált streamelés nem kezeli a nem hozzáfűző bemeneteket, és kivételt okoz, ha bármilyen módosítás történik a forrásként használt táblán. Az alsóbb rétegben nem automatikusan propagált változások kezelésére két fő stratégia létezik:

  • Törölheti a kimenetet és az ellenőrzőpontot, és az elejétől újraindíthatja a streamet.
  • A következő két lehetőség közül választhat:
    • ignoreDeletes: hagyja figyelmen kívül a partícióhatárokon adatokat törlő tranzakciókat.
    • skipChangeCommits: hagyja figyelmen kívül a meglévő rekordokat törlő vagy módosító tranzakciókat. skipChangeCommits alösszegek ignoreDeletes.

Feljegyzés

A Databricks Runtime 12.2 LTS-ben és újabb skipChangeCommits verziókban elavult az előző beállítás ignoreChanges. A Databricks Runtime 11.3 LTS-ben és az alacsonyabb ignoreChanges verzióban ez az egyetlen támogatott lehetőség.

A(z) ignoreChanges szemantikája nagymértékben különbözik a következőtől: skipChangeCommits. Ha engedélyezve van a ignoreChanges, a forrástábla újraírt adatfájljai újra ki lesznek bocsátva egy adatmódosítási művelet után, például UPDATE, MERGE INTO, DELETE (partíciókon belül) vagy OVERWRITE. A változatlan sorok gyakran új sorok mellett jelennek meg, így az alsóbb rétegbeli fogyasztóknak képesnek kell lenniük az ismétlődések kezelésére. A törléseket a rendszer nem propagálja lefelé. ignoreChanges alösszegek ignoreDeletes.

A(z) skipChangeCommits teljes mértékben figyelmen kívül hagyja a fájlmódosítási műveleteket. A forrástáblában az adatmódosítási művelet( például UPDATE, MERGE INTO, DELETEés OVERWRITE) miatt átírt adatfájlok teljes mértékben figyelmen kívül lesznek hagyva. Annak érdekében, hogy a felsőbb szintű forrástáblákban bekövetkezett változásokat tükrözzük, külön logikát kell megvalósítani a módosítások átvezetéséhez.

A konfigurált számítási feladatok továbbra is ismert szemantikával ignoreChanges működnek, de a Databricks az összes új számítási feladat használatát skipChangeCommits javasolja. A számítási feladatok ignoreChanges migrálása újrabontási logikát skipChangeCommits igényel.

Példa

Tegyük fel például, hogy user_eventsdate, user_emailés action oszlopokkal rendelkezik, amelyeket dateparticionált. A user_events táblából streamelsz, és a GDPR követelményei miatt törölnöd kell az adatokat.

Ha a partícióhatárokon töröl (azaz a WHERE egy partícióoszlopon van), a fájlok már érték szerint vannak szegmentálva, így a törlés egyszerűen eltávolítja ezeket a fájlokat a metaadatokból. Ha egy teljes adatpartíciót töröl, az alábbiakat használhatja:

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Ha több partícióban töröl adatokat (ebben a példában a szűrésre user_email), használja a következő szintaxist:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Ha a user_email-t a UPDATE utasítással frissíti, akkor a szóban forgó user_email-t tartalmazó fájl újraíródik. A módosított adatfájlok figyelmen kívül hagyása skipChangeCommits .

Kezdeti pozíció megadása

Az alábbi beállítások segítségével a Delta Lake streamelési forrásának kiindulópontját a teljes tábla feldolgozása nélkül adhatja meg.

  • startingVersion: A Delta Lake-verzió, amelyből kiindulni szeretne. A Databricks azt javasolja, hogy a legtöbb számítási feladat esetében kihagyja ezt a lehetőséget. Ha nincs beállítva, a stream a legújabb elérhető verziótól indul, beleértve az adott pillanatban a táblázat teljes pillanatképét.

    Ha meg van adva, a stream beolvassa a Delta-tábla minden módosítását a megadott verziótól kezdve (a teljes verziót is beleértve). Ha a megadott verzió már nem érhető el, a stream nem indul el. A véglegesítési verziókat a version parancs kimenetének DESCRIBE HISTORY oszlopából szerezheti be.

    Ha csak a legújabb módosításokat szeretné visszaadni, adja meg a következőt latest: .

  • startingTimestamp: A kezdéshez megadott időbélyeg. Az időbélyegen vagy után véglegesített összes táblamódosítást (beleértve) a streamolvasó felolvassa. Ha a megadott időbélyeg megelőzi az összes tábla véglegesítését, a streamelési olvasás a legkorábbi elérhető időbélyeggel kezdődik. Az alábbiak egyike:

    • Időbélyeg-sztring. Például: "2019-01-01T00:00:00.000Z".
    • Dátumsztring. Például: "2019-01-01".

A két beállítás egyszerre nem állítható be. Ezek csak új streamelési lekérdezés indításakor lépnek érvénybe. Ha egy streamlekérdezés elindult, és a folyamat az ellenőrzőponton lett rögzítve, a rendszer figyelmen kívül hagyja ezeket a beállításokat.

Fontos

Bár a streamforrást elindíthatja egy megadott verzióról vagy időbélyegről, a streamforrás sémája mindig a Delta-tábla legújabb sémája. Győződjön meg arról, hogy a megadott verzió vagy időbélyeg után nem történt inkompatibilis sémamódosítás a Delta táblában. Ellenkező esetben előfordulhat, hogy a streamforrás helytelen eredményeket ad vissza, amikor helytelen sémával olvassa be az adatokat.

Példa

Tegyük fel például, hogy van egy tábla user_events. Ha az 5. verzió óta szeretné elolvasni a módosításokat, használja a következőt:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Ha 2018-10-18 óta szeretné elolvasni a módosításokat, használja a következőt:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Kezdeti pillanatkép feldolgozása adatelhagyás nélkül

Ez a funkció a Databricks Runtime 11.3 LTS-en és újabb verziókban érhető el.

Ha deltatáblát használ streamforrásként, a lekérdezés először feldolgozza a táblában található összes adatot. Az ebben a verzióban található Delta-táblát kezdeti pillanatképnek nevezzük. Alapértelmezés szerint a Delta-tábla adatfájljai a legutóbb módosított fájl alapján lesznek feldolgozva. Az utolsó módosítási idő azonban nem feltétlenül a rekord eseményidejének sorrendjét jelöli.

Egy meghatározott vízjelet tartalmazó állapotalapú streamlekérdezésben a fájlok módosítási idő szerinti feldolgozása azt eredményezheti, hogy a rekordok feldolgozása helytelen sorrendben történik. Ez azt eredményezheti, hogy a rekordok a vízjel késői eseményeiként csökkennek.

Az adatcseppek problémáját az alábbi beállítás engedélyezésével kerülheti el:

  • withEventTimeOrder: Azt jelzi, hogy a kezdeti pillanatképet eseményidőrenddel kell-e feldolgozni.

Ha az esemény időrendje engedélyezve van, a kezdeti pillanatképadatok eseményideje időgyűjtőkre van osztva. Minden mikro köteg feldolgoz egy gyűjtőt az időtartományon belüli adatok szűrésével. A maxFilesPerTrigger és a maxBytesPerTrigger konfigurációs beállítások továbbra is alkalmazhatók a mikrobatch méretének szabályozására, de csak hozzávetőlegesen a feldolgozás jellege miatt.

Az alábbi ábrán ez a folyamat látható:

Kezdeti pillanatkép

A funkcióval kapcsolatos jelentős információk:

  • Az adatcsepp-probléma csak akkor fordul elő, ha egy állapotalapú streamelési lekérdezés kezdeti Delta-pillanatképe az alapértelmezett sorrendben van feldolgozva.
  • A stream lekérdezés elindítása után nem módosítható withEventTimeOrder a kezdeti pillanatkép feldolgozása. Ha módosítani szeretné az withEventTimeOrder újraindítást, törölnie kell az ellenőrzőpontot.
  • Ha olyan stream-lekérdezést futtat, amelyen engedélyezve van az EventTimeOrder, nem állíthatja vissza olyan DBR-verzióra, amely nem támogatja ezt a funkciót, amíg a kezdeti pillanatkép-feldolgozás be nem fejeződik. Ha vissza kell állítania, megvárhatja a kezdeti pillanatkép befejezését, vagy törölheti az ellenőrzőpontot, és újraindíthatja a lekérdezést.
  • Ez a funkció nem támogatott a következő gyakori forgatókönyvekben:
    • Az eseményidő oszlop egy generált oszlop, és nem-vetítéses átalakítások vannak a Delta-forrás és a vízjel között.
    • Van egy vízjel, amelynek több Delta-forrása van a stream lekérdezésben.
  • Ha az esemény időrendje engedélyezve van, a Delta kezdeti pillanatképének feldolgozása lassabb lehet.
  • Minden mikro köteg megvizsgálja a kezdeti pillanatképet, hogy a megfelelő eseményidőtartományon belül szűrje az adatokat. A gyorsabb szűrőművelet érdekében javasoljuk, hogy egy Delta-forrásoszlopot használjon eseményidőként, hogy az adatok kihagyása alkalmazható legyen (ellenőrizze a Delta Lake adatkihagyását, ha alkalmazható). Emellett a tábla particionálása az eseményidő oszlop mentén tovább felgyorsíthatja a feldolgozást. A Spark felhasználói felületén ellenőrizheti, hogy egy adott mikro köteg hány deltafájlt vizsgál.

Példa

Tegyük fel, hogy van egy user_events táblázatod, amely egy event_time oszlopot tartalmaz. A streamelési lekérdezés egy összesítő lekérdezés. Ha biztosítani szeretné, hogy a pillanatképek kezdeti feldolgozása során ne csökkenjen az adat, a következőt használhatja:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Feljegyzés

Ezt a fürt Spark-konfigurációjával is engedélyezheti, amely az összes streamelési lekérdezésre vonatkozik: spark.databricks.delta.withEventTimeOrder.enabled true

Delta-táblázat fogadóként

A Strukturált streamelés használatával adatokat is írhat a Delta-táblába. A tranzakciónapló lehetővé teszi, hogy a Delta Lake egyszeri pontos feldolgozást garantáljon, még akkor is, ha más adatfolyamok vagy kötegelt lekérdezések futnak párhuzamosan a táblán.

Feljegyzés

A Delta Lake VACUUM függvény eltávolítja a Delta Lake által nem kezelt összes fájlt, de kihagyja a _kezdőkönyvtárakat. A Delta-táblákhoz tartozó egyéb adatok és metaadatok mellett az ellenőrzőpontokat biztonságosan tárolhatja olyan címtárstruktúra használatával, mint például a <table-name>/_checkpoints.

Mérőszámok

Megtudhatja, hogy hány bájtot és hány fájlt kell még feldolgozni egy streamelési lekérdezési folyamatban, mint a numBytesOutstanding metrikákat.numFilesOutstanding További metrikák:

  • numNewListedFiles: Azon Delta Lake-fájlok száma, amelyek a köteg hátralékának kiszámításához lettek felsorolva.
    • backlogEndOffset: A hátralék kiszámításához használt táblaverzió.

Ha egy jegyzetfüzetben futtatja a streamet, a streamelési lekérdezés folyamatának irányítópultján, a Nyers adatok lapon láthatja ezeket a metrikákat:

{
  "sources": [
    {
      "description": "DeltaSource[file:/path/to/source]",
      "metrics": {
        "numBytesOutstanding": "3456",
        "numFilesOutstanding": "8"
      }
    }
  ]
}

Hozzáfűzési mód

Alapértelmezés szerint a streamek hozzáfűzési módban futnak, amely új rekordokat ad hozzá a táblához.

Használja a toTable metódust táblákba való streameléskor, ahogyan az alábbi példában is látható:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Kész mód

A strukturált streamelés segítségével a teljes táblázatot lecserélheti minden köteggel. Az egyik példahasználati eset egy összegzés kiszámítása összesítéssel:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

Az előző példa folyamatosan frissít egy táblát, amely az ügyfelek által összesített események számát tartalmazza.

Az enyhébb késési követelményekkel rendelkező alkalmazások esetében egyszeri eseményindítókkal mentheti a számítási erőforrásokat. Ezekkel frissítheti az összefoglaló aggregációs táblákat egy adott ütemezés szerint, és csak az utolsó frissítés óta érkezett új adatokat dolgozza fel.

Upsert a streamelési lekérdezésekből a következő használatával: foreachBatch

A merge és a foreachBatch kombinációjával összetett upserteket írhat egy streamelési lekérdezésből egy Delta-táblába. Lásd: A foreachBatch használata tetszőleges adatelnyelőkbe való íráshoz című témakört.

Ez a minta számos alkalmazást tartalmaz, köztük a következőket:

  • Stream-aggregátumok írása frissítési módban: Ez sokkal hatékonyabb, mint a teljes mód.
  • Adatbázis-módosítások adatfolyamának írása Delta-táblába: A változásadatok írására szolgáló egyesítési lekérdezés használható a foreachBatch, hogy folyamatosan alkalmazza a változásfolyamot egy Delta-táblára.
  • Adatfolyam írása egy Delta-táblába deduplikálással: A deduplikációhoz csak beszúrást végző egyesítési lekérdezés használható a foreachBatch arra, hogy az adatokat folyamatosan, automatikus deduplikálással írja be (amelyek tartalmaznak ismétlődéseket).

Feljegyzés

  • Győződjön meg arról, hogy a merge belső foreachBatch utasítás idempotens, mivel a streamelési lekérdezés újraindításai többször is alkalmazhatják a műveletet ugyanazon az adatkötegen.
  • Amikor merge használatban foreachBatchvan, a streamelési lekérdezés bemeneti adatsebessége (a notebook sebességi grafikonján keresztül StreamingQueryProgress jelentve és látható) a forrásnál az adatok létrehozásának tényleges sebességének többszöröseként jelenhet meg. Ennek az az oka, hogy a(z) merge többször olvassa be a bemeneti adatokat, ami a bemeneti metrikák többszörözését okozza. Ha szűk keresztmetszetről van szó, gyorsítótárazhatja a kötegelt DataFrame-et a(z) merge előtt , majd merge után törölheti.

Az alábbi példa bemutatja, hogyan használhatja az SQL-t foreachBatch a feladat végrehajtásához:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

A Delta Lake API-k használatával streamelési upserts-eket is végrehajthat, ahogyan az alábbi példában is látható:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

idempotens táblaírások foreachBatch

Feljegyzés

A Databricks azt javasolja, hogy a foreachBatchhasználata helyett minden frissíteni kívánt kimenethez külön streamelési írást konfiguráljon. Ennek az az oka, hogy a rendszer szerializálja a több táblába történő írást a foreachBatch használatakor, ami csökkenti a párhuzamosítást és növeli az általános késést.

A Delta-táblák az alábbi DataFrameWriter lehetőségeket támogatják annak érdekében, hogy több táblába történő írás foreachBatch során idempotens legyen.

  • txnAppId: Egyedi sztring, amelyet az egyes DataFrame-írások továbbíthatnak. Használhatja például a StreamingQuery azonosítót txnAppId.
  • txnVersion: Egy monoton módon növekvő szám, amely tranzakciós verzióként működik.

A Delta Lake az ismétlődő írások kombinációját txnAppId és txnVersion azonosítását használja, és figyelmen kívül hagyja őket.

Ha egy kötegírás meghiúsul, a köteg újrafuttatása ugyanazzal az alkalmazással és kötegazonosítóval segíti a futtatókörnyezetet az ismétlődő írások helyes azonosításában és figyelmen kívül hagyásában. Az alkalmazásazonosító (txnAppId) bármely felhasználó által létrehozott egyedi sztring lehet, és nem kell kapcsolódnia a streamazonosítóhoz. Lásd: A foreachBatch használata tetszőleges adatelnyelőkbe való íráshoz című témakört.

Figyelmeztetés

Ha törli a streamelési ellenőrzőpontot, és egy új ellenőrzőponttal újraindítja a lekérdezést, egy másikat txnAppIdkell megadnia. Az új ellenőrzőpontok a kötegazonosítóval kezdődnek 0. A Delta Lake egyedi kulcsként használja a kötegazonosítót és a txnAppId, és kihagyja a már látott értékeket tartalmazó kötegeket.

Az alábbi példakód ezt a mintát mutatja be:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}