Strukturált streamelési ellenőrzőpontok
Az ellenőrzőpontok és az előre írt naplók együttműködnek a strukturált streamelési számítási feladatok feldolgozási garanciáinak biztosításához. Az ellenőrzőpont nyomon követi a lekérdezést azonosító információkat, beleértve az állapotadatokat és a feldolgozott rekordokat. Amikor törli a fájlokat egy ellenőrzőpont-könyvtárban, vagy új ellenőrzőpont-helyre vált, a lekérdezés következő futtatása újraindul.
Minden lekérdezésnek más ellenőrzőponttal kell rendelkeznie. Több lekérdezésnek soha nem szabad ugyanazt a helyet megosztania.
Ellenőrzőpontok engedélyezése strukturált streamelési lekérdezésekhez
A streamelési lekérdezés futtatása előtt meg kell adnia a checkpointLocation
beállítást, ahogyan az alábbi példában is látható:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Feljegyzés
Egyes fogadók, például a jegyzetfüzetek kimenete display()
és a memory
fogadó automatikusan létrehoz egy ideiglenes ellenőrzőpont-helyet, ha kihagyja ezt a beállítást. Ezek az ideiglenes ellenőrzőpontok nem biztosítják a hibatűrést vagy az adatkonzisztencia garanciáját, és előfordulhat, hogy nem lesznek megfelelően megtisztítva. A Databricks azt javasolja, hogy mindig adjon meg ellenőrzőpont-helyet ezeknek a fogadóknak.
Helyreállítás strukturált streamelési lekérdezés módosításai után
Az ugyanabból az ellenőrzőpont-helyről történő újraindítások között korlátozva van, hogy a streamelési lekérdezések milyen módosításokat hajthatnak végre. Íme néhány olyan módosítás, amely vagy nem engedélyezett, vagy a módosítás hatása nem megfelelően van meghatározva. Mindegyik esetében:
- Az engedélyezett kifejezés azt jelenti, hogy elvégezheti a megadott módosítást, de a lekérdezéstől és a változástól függ, hogy a hatás szemantikája megfelelően van-e definiálva.
- A nem engedélyezett kifejezés azt jelenti, hogy nem szabad elvégeznie a megadott módosítást, mivel az újraindított lekérdezés valószínűleg kiszámíthatatlan hibákkal fog meghiúsulni.
-
sdf
A streamelt DataFrame/Dataset értéket jelöli, amely a .-valsparkSession.readStream
jön létre.
A strukturált streamelési lekérdezések változásainak típusai
- A bemeneti források számának vagy típusának (vagyis különböző forrásának) módosítása: Ez nem engedélyezett.
-
A bemeneti források paramétereinek változásai: A forrástól és a lekérdezéstől függ, hogy ez engedélyezett-e, és hogy a változás szemantikája megfelelően van-e definiálva. Szeretnénk ismertetni néhány példát.
A sebességkorlátok hozzáadása, törlése és módosítása engedélyezett:
spark.readStream.format("kafka").option("subscribe", "article")
felhasználóként a(z)
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Az előfizetett cikkek és fájlok módosítása általában nem engedélyezett, mivel az eredmények kiszámíthatatlanok:
spark.readStream.format("kafka").option("subscribe", "article")
spark.readStream.format("kafka").option("subscribe", "newarticle")
- Az eseményindító időközének változásai: Módosíthatja az eseményindítókat a növekményes kötegek és az időintervallumok között. Lásd: Az eseményindítók futások közötti időközeinek módosítása.
-
A kimeneti fogadó típusának változásai: A fogadók egyes kombinációi közötti változások engedélyezettek. Ezt eseti alapon kell ellenőrizni. Szeretnénk ismertetni néhány példát.
- A Kafka-fogadó fájlelnyelője engedélyezett. A Kafka csak az új adatokat fogja látni.
- A Kafka-fogadó és a fájl fogadó nem engedélyezett.
- A Kafka-fogadó foreach értékre változott, vagy fordítva engedélyezett.
-
A kimeneti fogadó paramétereinek változásai: A fogadótól és a lekérdezéstől függ, hogy ez engedélyezett-e, és hogy a változás szemantikája megfelelően van-e definiálva. Szeretnénk ismertetni néhány példát.
- A fájlelső kimeneti könyvtárának módosítása nem engedélyezett:
sdf.writeStream.format("parquet").option("path", "/somePath")
sdf.writeStream.format("parquet").option("path", "/anotherPath")
- A kimeneti témakör módosítása engedélyezett:
sdf.writeStream.format("kafka").option("topic", "topic1")
sdf.writeStream.format("kafka").option("topic", "topic2")
- A felhasználó által definiált foreach-fogadó (azaz a
ForeachWriter
kód) módosítása engedélyezett, de a módosítás szemantikája a kódtól függ.
- A fájlelső kimeneti könyvtárának módosítása nem engedélyezett:
-
A vetítési/szűrési/ térképszerű műveletek változásai: Bizonyos esetekben engedélyezettek. Például:
- A szűrők hozzáadása/törlése engedélyezett: a következőhöz
sdf.selectExpr("a")
:sdf.where(...).selectExpr("a").filter(...)
. - Az azonos kimeneti sémával rendelkező vetítések változásai a következőre
sdf.selectExpr("stringColumn AS json").writeStream
engedélyezettek:sdf.select(to_json(...).as("json")).writeStream
- A különböző kimeneti sémával rendelkező vetítések változásai feltételesen engedélyezve vannak:
sdf.selectExpr("a").writeStream
csak akkor engedélyezett, ha a kimeneti fogadó engedélyezi a séma váltásátsdf.selectExpr("b").writeStream
"a"
."b"
- A szűrők hozzáadása/törlése engedélyezett: a következőhöz
-
Állapotalapú műveletek változásai: A streamelési lekérdezések egyes műveleteinek állapotadatokat kell fenntartaniuk az eredmény folyamatos frissítéséhez. A strukturált streamelés automatikusan ellenőrzi az állapotadatokat a hibatűrő tárolóra (például DBFS, Azure Blob Storage) és visszaállítja azokat az újraindítás után. Ez azonban feltételezi, hogy az állapotadatok sémája az újraindítások során változatlan marad. Ez azt jelenti, hogy a streamelési lekérdezés állapotalapú műveleteinek módosításai (vagyis hozzáadásai, törlései vagy sémamódosításai) nem engedélyezettek az újraindítások között. Íme azoknak az állapotalapú műveleteknek a listája, amelyek sémája nem módosítható az újraindítások között az állapot helyreállítása érdekében:
-
Stream-összesítés: Például
sdf.groupBy("a").agg(...)
. A csoportosítási kulcsok vagy összesítések számának vagy típusának módosítása nem engedélyezett. -
Stream-deduplikáció: Például
sdf.dropDuplicates("a")
. A csoportosítási kulcsok vagy összesítések számának vagy típusának módosítása nem engedélyezett. -
Stream-stream illesztés: Például
sdf1.join(sdf2, ...)
(azaz mindkét bemenet a következővelsparkSession.readStream
jön létre: ). A séma vagy az egyenrangú oszlopok módosítása nem engedélyezett. Az illesztés típusának (külső vagy belső) módosítása nem engedélyezett. Az illesztés feltételének egyéb változásai nem definiálva vannak. -
Tetszőleges állapotalapú művelet: Például.
sdf.groupByKey(...).mapGroupsWithState(...)
sdf.groupByKey(...).flatMapGroupsWithState(...)
A felhasználó által definiált állapot sémájának és az időtúllépés típusának módosítása nem engedélyezett. A felhasználó által definiált állapotleképezési függvényben bármilyen változás megengedett, de a változás szemantikai hatása a felhasználó által definiált logikától függ. Ha valóban támogatni szeretné az állapotséma módosításait, akkor a séma áttelepítését támogató kódolási/dekódolási sémával explicit módon bájtokra kódolhatja/dekódolhatja az összetett állapotadat-struktúrákat. Ha például Avro kódolású bájtként menti az állapotot, módosíthatja az Avro-state-sémát a lekérdezés újraindítása között, mivel ez visszaállítja a bináris állapotot.
-
Stream-összesítés: Például