Megosztás a következőn keresztül:


Kimeneti mód kiválasztása strukturált streameléshez

Ez a cikk az állapotalapú streamelési kimeneti mód kiválasztását ismerteti. Csak az aggregációkat tartalmazó állapotalapú streamekhez van szükség kimeneti mód konfigurálására.

Az illesztések csak a hozzáfűző kimeneti módot támogatják, és a kimeneti mód nem befolyásolja a deduplikációt. Az tetszőleges állapotalapú operátorok mapGroupsWithState saját flatMapGroupsWithState egyéni logikájukkal bocsátanak ki rekordokat, így a stream kimeneti módja nem befolyásolja a működésüket.

Állapot nélküli streamelés esetén minden kimeneti mód ugyanúgy viselkedik.

A kimeneti mód helyes konfigurálásához ismernie kell az állapotalapú streamelést, a vízjeleket és az eseményindítókat. Tekintse meg az alábbi cikkeket:

Mi az a kimeneti mód?

A strukturált streamelési lekérdezés kimeneti módja határozza meg, hogy mely rekordokból bocsátanak ki a lekérdezés operátorai az egyes eseményindítók során. A kibocsátható rekordok három típusa:

  • A későbbi feldolgozás során nem módosuló rekordok.
  • Az utolsó eseményindító óta módosult rekordok.
  • Az állapottábla összes rekordja.

Az állapotalapú operátorok számára fontos tudni, hogy milyen típusú rekordokat kell kibocsátani, mert egy állapotalapú operátor által létrehozott sor eseményindítóról triggerre válthat. Ha például egy streamelési aggregációs operátor több sort kap egy adott ablakhoz, az adott ablak összesítési értékei az eseményindítók között változhatnak.

Az állapot nélküli operátorok esetében a rekordtípusok megkülönböztetése nem befolyásolja az operátor viselkedését. Az állapot nélküli operátor által az eseményindító során kibocsátott rekordok mindig az adott eseményindító során feldolgozott forrásrekordok.

Elérhető kimeneti módok

Három kimeneti mód van, amelyek egy adott eseményindító során kibocsátandó rekordokat jeleznek egy operátornak:

Kimeneti mód Leírás
Hozzáfűzési mód (alapértelmezett) Alapértelmezés szerint a streamelési lekérdezések hozzáfűzési módban futnak. Ebben a módban az operátorok csak olyan sorokat bocsátanak ki, amelyek nem változnak a jövőbeli eseményindítókban. Az állapotalapú műveletek a vízjel segítségével határozzák meg, hogy mikor történik ez.
Frissítési mód Frissítési módban az operátorok kibocsátják az eseményindító során módosult összes sort, még akkor is, ha a kibocsátott rekord megváltozhat egy későbbi eseményindítóban.
Kész mód A teljes mód csak streamelési aggregációkkal működik. Teljes módban az operátor által valaha létrehozott összes eredménysor az alsóbb rétegben lesz kibocsátva.

Gyártási szempontok

Számos állapotalapú streamelési művelet esetén választania kell a hozzáfűzési és a frissítési mód között. Az alábbi szakaszok olyan szempontokat vázolnak fel, amelyek tájékoztathatják a döntést.

Feljegyzés

A teljes mód néhány alkalmazásból áll, de az adatskálázáskor rosszul működik. A Databricks a materializált nézetek használatát javasolja a teljes módhoz és a növekményes feldolgozáshoz kapcsolódó szemantikai garanciák lekéréséhez számos állapotalapú művelet esetében. Tekintse meg a materializált nézeteket a Databricks SQL-ben.

Alkalmazás szemantikája

Az alkalmazás szemantikája leírja, hogyan használják az alsóbb rétegbeli alkalmazások a streamelési adatokat.

Ha az alsóbb rétegbeli szolgáltatásoknak minden alsóbb rétegbeli íráshoz egyetlen műveletet kell elvégezniük, a legtöbb esetben használjon hozzáfűzési módot. Ha például egy alsóbb rétegbeli értesítési szolgáltatás értesítést küld a fogadóba írt összes új rekordról, a hozzáfűzési mód biztosítja, hogy minden rekord csak egyszer legyen megírva. A frissítési mód minden alkalommal megírja a rekordot, amikor az állapotinformáció megváltozik, ami számos frissítést eredményezne.

Ha az utólagos szolgáltatásoknak friss eredményekre van szükségük, a frissítési mód biztosítja, hogy a kimeneti szolgáltatás a lehető legfrissebb maradjon. Ilyen például egy olyan gépi tanulási modell, amely valós időben olvassa be a funkciókat, vagy egy valós idejű aggregátumokat nyomon követő elemzési irányítópultot.

Operátor- és fogadókompatibilitás

A strukturált streamelés nem támogatja az Apache Sparkban elérhető összes műveletet, és egyes streamelési műveletek nem támogatottak minden kimeneti módban. Az operátorokra vonatkozó korlátozásokról további információt az OSS streamelési dokumentációban talál.

Nem minden fogadó támogatja az összes kimeneti módot. A Unity Catalog által felügyelt táblákat támogató Delta Lake és a Kafka is támogatja az összes kimeneti módot. A szinkompatibilitással kapcsolatos további információkért tekintse meg az OSS streamelési dokumentációt.

Késés és költség

A kimeneti mód hatással van arra, hogy mennyi időnek kell eltelnie egy rekord írása előtt, és az írott adatok gyakorisága és mennyisége hatással lehet a streamelési folyamatok költségeire.

A hozzáfűzési mód arra kényszeríti az állapotalapú operátorokat, hogy csak az állapotalapú eredmények véglegesítése után bocsássanak ki eredményeket, ami legalább a vízjel késleltetésének idejéig tart. A hozzáfűzési kimeneti üzemmódban a 1 hour vízjel késleltetése azt jelenti, hogy a rekordok legalább 1 órás késleltetéssel rendelkeznek, mielőtt lefelé terjedő irányba kerülnének.

A frissítési mód triggerenként egy írási műveletet eredményez minden összesítési értékre. Ha a fogadó írásonkénti és rekordonkénti díjat számít fel, ez költségessé válhat, ha a rekordokat többször frissítik, mielőtt a vízjelkésleltetés letelik.

Példák konfigurációra

Az alábbi példakódok a kimeneti mód konfigurálását mutatják be a Unity Catalog-táblákba irányuló frissítések streameléséhez:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Lásd az OSS dokumentációt a PySpark DataStreamWriter.outputMode vagy a Scala DataStreamWriter.outputMode esetében.

Példa állapotalapú streamelési és kimeneti módokra

Az alábbi példa azt szolgálja, hogy segítsen megérteni, hogyan lép kölcsönhatásba a kimeneti mód a vízjelekkel állapotalapú streamelés esetén.

Fontolja meg a folyamatos adatáramlás aggregátumát, amely kiszámítja az óránként generált teljes bevételt egy áruházban 15 perces vízjelkésleltetéssel. Az első mikrobatch a következő rekordokat dolgozza fel:

  • 15 USD 14:40-kor
  • $10, 14:30
  • $30 15:10-kor

Ezen a ponton a motor vízjele 14:55, mert kivonja a 15 percet (a késést) a látott maximális időből (15:10). A streamelési aggregációs operátor állapota a következő:

  • [2pm, 3pm]: 25 USD
  • [3pm, 4pm]: 30 USD

Az alábbi táblázat azt ismerteti, hogy mi történne az egyes kimeneti módban:

Kimeneti mód Eredmény és ok
Hozzáfűzés A streamelési aggregációs operátor nem bocsát ki semmit a lefelé irányuló folyamat során. Ennek az az oka, hogy mindkét ablak megváltozhat, amikor az új értékek megjelennek egy későbbi eseményindítóval: a 2:55-ös vízjel azt jelzi, hogy a 14:55 utáni rekordok továbbra is megérkezhetnek, és ezek a rekordok a [2pm, 3pm] ablakba vagy a [3pm, 4pm] ablakba eshetnek.
Frissít Az operátor mindkét rekordot kibocsátja, mert mindkét rekord frissítéseket kapott.
Kész Az operátor az összes rekordot kibocsátja.

Tegyük fel, hogy a stream még egy rekordot kap:

  • $20 15:20-kor

A vízjel 15:05-re frissül, mert a motor 15 percet von le a 15:20-ból. Ezen a ponton a streamelési aggregációs operátor állapota a következő:

  • [2pm, 3pm]: 25 USD
  • [3pm, 4pm]: 50 USD

Az alábbi táblázat azt ismerteti, hogy mi történne az egyes kimeneti módban:

Kimeneti mód Eredmény és ok
Hozzáfűzés Az adatfolyam-aggregációs operátor megfigyeli, hogy a 15:05-ös időbélyeg nagyobb, mint a [2pm, 3pm] ablak vége. A vízjel definíciója szerint az ablak már nem változik, ezért kibocsátja a [2pm, 3pm] ablakot.
Frissít A streamelési összesítési operátor kibocsátja a [3pm, 4pm] ablakot, mert az állapot értéke 30 usd-ről 50 usd-re módosult.
Kész Az operátor az összes rekordot kibocsátja.

Az alábbiakban összefoglaljuk, hogyan viselkednek az állapotalapú operátorok az egyes hozzáfűzési módban:

  • Hozzáfűzési módban a vízjel késleltetése után egyszerre írja meg a rekordokat.
  • Frissítési módban írjon olyan rekordokat, amelyek az előző eseményindító óta megváltoztak.
  • Teljes módban írja meg az állapotalapú operátor által valaha létrehozott összes rekordot.