Megosztás a következőn keresztül:


Strukturált streamelési ellenőrzőpontok

Az ellenőrzőpontok és az előre írt naplók együttműködnek a strukturált streamelési számítási feladatok feldolgozási garanciáinak biztosításához. Az ellenőrzőpont nyomon követi a lekérdezést azonosító információkat, beleértve az állapotadatokat és a feldolgozott rekordokat. Amikor törli a fájlokat egy ellenőrzőpont-könyvtárban, vagy új ellenőrzőpont-helyre vált, a lekérdezés következő futtatása újraindul.

Minden lekérdezésnek más ellenőrzőponttal kell rendelkeznie. Több lekérdezésnek soha nem szabad ugyanazt a helyet megosztania.

Ellenőrzőpontok engedélyezése strukturált streamelési lekérdezésekhez

A streamelési lekérdezés futtatása előtt meg kell adnia a checkpointLocation beállítást, ahogyan az alábbi példában is látható:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Feljegyzés

Egyes fogadók, például a jegyzetfüzetek kimenete display() és a memory fogadó automatikusan létrehoz egy ideiglenes ellenőrzőpont-helyet, ha kihagyja ezt a beállítást. Ezek az ideiglenes ellenőrzőpontok nem biztosítják a hibatűrést vagy az adatkonzisztencia garanciáját, és előfordulhat, hogy nem lesznek megfelelően megtisztítva. A Databricks azt javasolja, hogy mindig adjon meg ellenőrzőpont-helyet ezeknek a fogadóknak.

Helyreállítás strukturált streamelési lekérdezés módosításai után

Az ugyanabból az ellenőrzőpont-helyről történő újraindítások között korlátozva van, hogy a streamelési lekérdezések milyen módosításokat hajthatnak végre. Íme néhány olyan módosítás, amely vagy nem engedélyezett, vagy a módosítás hatása nem megfelelően van meghatározva. Mindegyik esetében:

  • Az engedélyezett kifejezés azt jelenti, hogy elvégezheti a megadott módosítást, de a lekérdezéstől és a változástól függ, hogy a hatás szemantikája megfelelően van-e definiálva.
  • A nem engedélyezett kifejezés azt jelenti, hogy nem szabad elvégeznie a megadott módosítást, mivel az újraindított lekérdezés valószínűleg kiszámíthatatlan hibákkal fog meghiúsulni.
  • sdf A streamelt DataFrame/Dataset értéket jelöli, amely a .-val sparkSession.readStreamjön létre.

A strukturált streamelési lekérdezések változásainak típusai

  • A bemeneti források számának vagy típusának (vagyis különböző forrásának) módosítása: Ez nem engedélyezett.
  • A bemeneti források paramétereinek változásai: A forrástól és a lekérdezéstől függ, hogy ez engedélyezett-e, és hogy a változás szemantikája megfelelően van-e definiálva. Szeretnénk ismertetni néhány példát.
    • A sebességkorlátok hozzáadása, törlése és módosítása engedélyezett:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      felhasználóként a(z)

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Az előfizetett cikkek és fájlok módosítása általában nem engedélyezett, mivel az eredmények kiszámíthatatlanok: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Az eseményindító időközének változásai: Módosíthatja az eseményindítókat a növekményes kötegek és az időintervallumok között. Lásd: Az eseményindítók futások közötti időközeinek módosítása.
  • A kimeneti fogadó típusának változásai: A fogadók egyes kombinációi közötti változások engedélyezettek. Ezt eseti alapon kell ellenőrizni. Szeretnénk ismertetni néhány példát.
    • A Kafka-fogadó fájlelnyelője engedélyezett. A Kafka csak az új adatokat fogja látni.
    • A Kafka-fogadó és a fájl fogadó nem engedélyezett.
    • A Kafka-fogadó foreach értékre változott, vagy fordítva engedélyezett.
  • A kimeneti fogadó paramétereinek változásai: A fogadótól és a lekérdezéstől függ, hogy ez engedélyezett-e, és hogy a változás szemantikája megfelelően van-e definiálva. Szeretnénk ismertetni néhány példát.
    • A fájlelső kimeneti könyvtárának módosítása nem engedélyezett: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • A kimeneti témakör módosítása engedélyezett: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • A felhasználó által definiált foreach-fogadó (azaz a ForeachWriter kód) módosítása engedélyezett, de a módosítás szemantikája a kódtól függ.
  • A vetítési/szűrési/ térképszerű műveletek változásai: Bizonyos esetekben engedélyezettek. Például:
    • A szűrők hozzáadása/törlése engedélyezett: a következőhözsdf.selectExpr("a"): sdf.where(...).selectExpr("a").filter(...) .
    • Az azonos kimeneti sémával rendelkező vetítések változásai a következőre sdf.selectExpr("stringColumn AS json").writeStreamengedélyezettek: sdf.select(to_json(...).as("json")).writeStream
    • A különböző kimeneti sémával rendelkező vetítések változásai feltételesen engedélyezve vannak: sdf.selectExpr("a").writeStream csak akkor engedélyezett, ha a kimeneti fogadó engedélyezi a séma váltásátsdf.selectExpr("b").writeStream"a"."b"
  • Állapotalapú műveletek változásai: A streamelési lekérdezések egyes műveleteinek állapotadatokat kell fenntartaniuk az eredmény folyamatos frissítéséhez. A strukturált streamelés automatikusan ellenőrzi az állapotadatokat a hibatűrő tárolóra (például DBFS, Azure Blob Storage) és visszaállítja azokat az újraindítás után. Ez azonban feltételezi, hogy az állapotadatok sémája az újraindítások során változatlan marad. Ez azt jelenti, hogy a streamelési lekérdezés állapotalapú műveleteinek módosításai (vagyis hozzáadásai, törlései vagy sémamódosításai) nem engedélyezettek az újraindítások között. Íme azoknak az állapotalapú műveleteknek a listája, amelyek sémája nem módosítható az újraindítások között az állapot helyreállítása érdekében:
    • Stream-összesítés: Például sdf.groupBy("a").agg(...). A csoportosítási kulcsok vagy összesítések számának vagy típusának módosítása nem engedélyezett.
    • Stream-deduplikáció: Például sdf.dropDuplicates("a"). A csoportosítási kulcsok vagy összesítések számának vagy típusának módosítása nem engedélyezett.
    • Stream-stream illesztés: Például sdf1.join(sdf2, ...) (azaz mindkét bemenet a következővel sparkSession.readStreamjön létre: ). A séma vagy az egyenrangú oszlopok módosítása nem engedélyezett. Az illesztés típusának (külső vagy belső) módosítása nem engedélyezett. Az illesztés feltételének egyéb változásai nem definiálva vannak.
    • Tetszőleges állapotalapú művelet: Például. sdf.groupByKey(...).mapGroupsWithState(...)sdf.groupByKey(...).flatMapGroupsWithState(...) A felhasználó által definiált állapot sémájának és az időtúllépés típusának módosítása nem engedélyezett. A felhasználó által definiált állapotleképezési függvényben bármilyen változás megengedett, de a változás szemantikai hatása a felhasználó által definiált logikától függ. Ha valóban támogatni szeretné az állapotséma módosításait, akkor a séma áttelepítését támogató kódolási/dekódolási sémával explicit módon bájtokra kódolhatja/dekódolhatja az összetett állapotadat-struktúrákat. Ha például Avro kódolású bájtként menti az állapotot, módosíthatja az Avro-state-sémát a lekérdezés újraindítása között, mivel ez visszaállítja a bináris állapotot.