A foreachBatch használata tetszőleges adatgyűjtőkbe való íráshoz
Ez a cikk a strukturált streamelés használatával foreachBatch
ismerteti a streamlekérdezés kimenetének írását olyan adatforrásokhoz, amelyek nem rendelkeznek meglévő streamelt fogadóval.
A kódminta streamingDF.writeStream.foreachBatch(...)
lehetővé teszi kötegelt függvények alkalmazását a streamelési lekérdezés minden mikro kötegének kimeneti adataira. A foreachBatch
használt függvények két paramétert használnak:
- Olyan DataFrame, amely egy mikro köteg kimeneti adatait tartalmazza.
- A mikro köteg egyedi azonosítója.
A Delta Lake-egyesítési műveleteket a strukturált streamelésben kell használnia foreachBatch
. Lásd: Upsert from streaming lekérdezések foreachBatch használatával.
További DataFrame-műveletek alkalmazása
Számos DataFrame- és adathalmaz-művelet nem támogatott a streamelési DataFrame-ekben, mert a Spark ezekben az esetekben nem támogatja a növekményes tervek generálásának támogatását. Ezen foreachBatch()
műveletek némelyikét alkalmazhatja az egyes mikroköteg-kimenetekre. Az foreachBatch()
és az SQL MERGE INTO
művelettel például frissítési módban írhat streamelési aggregációk kimenetét egy Delta-táblába. További részletekért lásd: MERGE INTO.
Fontos
-
foreachBatch()
csak legalább egyszer írható garanciát biztosít. Azonban a funkcióhoz megadottbatchId
-t használhatja a kimenet deduplikálására, így biztosítható az egyszeri végrehajtás garanciája. Mindkét esetben meg kell indokolnia a végpontok közötti szemantikát. -
foreachBatch()
nem működik a folyamatos feldolgozási móddal , mivel alapvetően egy streamelési lekérdezés mikroköteges végrehajtására támaszkodik. Ha folyamatos módban ír adatokat, használjaforeach()
helyette.
Egy üres adatkeret meghívható, foreachBatch()
és a felhasználói kódnak rugalmasnak kell lennie a megfelelő működéshez. Íme egy példa:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
A Databricks Runtime 14.0 viselkedési változásai foreachBatch
A Databricks Runtime 14.0-s és újabb verziókban a megosztott hozzáférési móddal konfigurált számításon a következő viselkedésváltozások érvényesek:
-
print()
parancsok írnak kimenetet az illesztőprogram-naplókba. - A függvényen belüli almodul nem érhető el
dbutils.widgets
. - A függvényben hivatkozott fájloknak, moduloknak és objektumoknak szerializálhatónak kell lenniük, és elérhetőnek kell lenniük a Sparkban.
Meglévő kötegelt adatforrások újrafelhasználása
A használatával foreachBatch()
meglévő kötegelt adatírókat használhat olyan adatgyűjtőkhöz, amelyek nem rendelkeznek strukturált streamelési támogatással. Íme néhány példa:
Számos más kötegelt adatforrás is használható a forrásból foreachBatch()
. Lásd: Csatlakozás adatforrásokhoz.
Írás több helyre
Ha egy streamelési lekérdezés kimenetét több helyre kell írnia, a Databricks több strukturált streamíró használatát javasolja a legjobb párhuzamosság és átviteli sebesség érdekében.
Ha foreachBatch
több fogadóba ír, szerializálja a streamelési írások végrehajtását, ami növelheti az egyes mikrokötegek késését.
Ha valóban használja a foreachBatch
-t több Delta-táblába való írásra, akkor tekintse meg a Idempotens táblaírásokat a ForeachBatch-ben.