Vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozásához
Ez a cikk bemutatja a vízjelezés alapvető fogalmait, és javaslatokat nyújt a vízjelek általános állapotalapú streamelési műveletekben való használatára. Vízjeleket kell alkalmaznia az állapotalapú streamelési műveletekre, hogy elkerülje az állapotban tárolt adatok végtelen bővítését, ami memóriaproblémákat okozhat, és növelheti a feldolgozási késéseket a hosszan futó streamelési műveletek során.
Mi az a watermark?
A strukturált streamelés vízjelekkel szabályozza a küszöbértéket, hogy mennyi ideig dolgozza fel a frissítéseket egy adott állapotentitáshoz. Az állapotentitások gyakori példái a következők:
- Időbeli aggregációk window.
- Egyedi kulcsok egy join-ban két stream között.
Amikor egy watermark-t deklarál, meg kell adnia egy időbélyegmezőt és egy watermark küszöbértéket egy folyamatban lévő adatkereten. Az új adatok érkezésekor az állapotkezelő nyomon követi a megadott mező legutóbbi időbélyegét, és feldolgozza a késési küszöbértéken belüli összes rekordot.
Az alábbi példa egy 10 perces watermark küszöbértéket alkalmaz egy ablakos számlálóra:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Ebben a példában:
- A
event_time
column 10 perces watermark és 5 perces windowdefiniálására szolgál. - A rendszer minden megfigyelt, nem átfedésben lévő 5 perces ablakhoz
id
számokat gyűjt. - Az állapotinformációk az egyes számokra vonatkozóan mindaddig megmaradnak, amíg a window vége 10 perccel régebbi, mint a legutóbbi megfigyelt
event_time
.
Fontos
Watermark küszöbértékek garantálják, hogy a megadott küszöbértéken belül érkező rekordok feldolgozása a megadott lekérdezés szemantikája szerint történik. Előfordulhat, hogy a megadott küszöbértéken kívülre érkező késői rekordok lekérdezési metrikákkal dolgozhatók fel, de ez nem garantált.
Hogyan befolyásolják a vízjelek a feldolgozási időt és az átviteli sebességet?
A vízjelek a kimeneti módokkal együttműködve szabályozják az adatok fogadóba írásakor. Mivel a vízjelek csökkentik a feldolgozandó állapotinformációk teljes mennyiségét, a vízjelek hatékony használata elengedhetetlen a hatékony állapotalapú streamelési átviteli sebességhez.
Feljegyzés
Nem minden kimeneti mód támogatott minden állapotalapú művelethez.
Vízjelek és kimeneti mód ablakos összesítésekhez
Az alábbi table részletezi a watermark definiált időbélyeg alapján történő összesítő lekérdezések feldolgozását.
Kimeneti mód | Működés |
---|---|
Hozzáfűzés | A sorokat a cél table-ba írják, miután a watermark küszöbérték átlépése megtörtént. A késési küszöbérték alapján minden írás késik. A régi összesítési állapot a küszöbérték túllépése után el lesz ejtve. |
Update | A sorok a cél table-ba íródnak, amikor az eredményeket kiszámítják, és az új adatok érkezésekor frissíthetők és felülírhatók. A régi összesítési állapot a küszöbérték túllépése után el lesz ejtve. |
Kész | Az összesítési állapot nincs elvetve. A cél table minden triggerrel újraíródik. |
Vízjelek és kimenet stream-stream illesztésekhez
A több stream közötti illesztések csak a hozzáfűzési módot támogatják, és a megfeleltetett rekordok minden felderített kötegben meg vannak írva. Belső illesztések esetén a Databricks azt javasolja, hogy minden streamelési adatforráson watermark küszöbértéket állítson be. Ez lehetővé teszi az állapotinformációk elvetét a régi rekordok esetében. Vízjelek nélkül a strukturált streamelés megpróbálja join minden kulcsot a join mindkét oldaláról az egyes triggerekkel.
A strukturált streamelés speciális szemantikával támogatja a külső illesztéseket. A külső illesztések esetében kötelező a vízjelezés, mivel ez azt jelzi, hogy mikor kell null értékkel írni egy kulcsot, miután az nem egyezik. Vegye figyelembe, hogy bár a külső illesztések hasznosak lehetnek olyan rekordok rögzítéséhez, amelyek nem egyeznek meg az adatfeldolgozás során, mivel az illesztések csak függevény műveletként írnak a tables-ra, a hiányzó adatok csak a késési küszöbérték lejárta után lesznek rögzítve.
Késői adatküszöb szabályozása több watermark-szabályzattal a strukturált streamelésben
Ha több strukturált streambemenettel dolgozik, több vízjelet is set a késésben érkező adatok tűréshatárainak szabályozásához. A vízjelek konfigurálása lehetővé teszi az állapotinformációk szabályozását, és hatással van a késésre.
A streamelési lekérdezések több bemeneti adatfolyamot is tartalmazhatnak, amelyek egyesítve vagy egyesítve vannak. Az egyes bemeneti adatfolyamok eltérő küszöbértéket tartalmazhatnak a késői adatokhoz, amelyeket az állapotalapú műveletekhez el kell viselni. Adja meg ezeket a küszöbértékeket az egyes bemeneti adatfolyamok használatával withWatermarks("eventTime", delay)
. Az alábbiakban egy példa lekérdezést láthat stream-stream illesztésekkel.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
A lekérdezés futtatása során a strukturált stream egyenként követi nyomon az egyes bemeneti adatfolyamokban megfigyelt maximális eseményidőt, kiszámítja a vízjeleket a megfelelő késleltetés alapján, és kiválaszt egyetlen globális watermark-t, amely az állapotalapú műveletekhez használható. Alapértelmezés szerint a minimumot a globális watermark választja ki, mert biztosítja, hogy véletlenül ne essen adat túl későre, ha az egyik stream a többi mögött marad (például az egyik stream leállítja az adatok fogadását a felsőbb rétegbeli hibák miatt). Más szóval a globális watermark biztonságosan mozog a leglassabb stream ütemében, és a lekérdezés kimenete ennek megfelelően késik.
Ha gyorsabb eredményeket szeretne get, set a több watermark házirendet, hogy globális watermark ként a maximális értéket válassza ki az SQL-konfigurációs spark.sql.streaming.multipleWatermarkPolicy
max
(alapértelmezés szerint min
). Ez lehetővé teszi, hogy a globális watermark a leggyorsabb stream tempójában mozogjon. Ez a konfiguráció azonban elveti az adatokat a leglassabb streamekből. Emiatt a Databricks azt javasolja, hogy ezt a konfigurációt megfontoltan használja.
Duplikátumok eltávolítása watermark
A Databricks Runtime 13.3 LTS-ben és újabb verziókban egy watermark küszöbértéken belüli rekordokat deduplikálhat egy egyedi identifierhasználatával.
A strukturált streamelés pontosan egyszeri feldolgozási garanciákat biztosít, de nem deduplikálja automatikusan a rekordokat az adatforrásokból. Bármely megadott mezőn használható a dropDuplicatesWithinWatermark
a rekordok deduplikálására, lehetővé téve, hogy remove ismétlődő elemeket egy streamből, még akkor is, ha egyes mezők eltérőek (például az érkezési idő vagy az esemény ideje).
A megadott watermark belül érkező ismétlődő rekordok garantáltan kiesnek. Ez a garancia csak egy irányban szigorú, és a megadott küszöbértéken kívülre érkező duplikált rekordok is elvethetők. A duplikált események közötti watermark késleltetési küszöbértékét set az összes ismétlődés remove.
Meg kell adnia egy watermark-át a dropDuplicatesWithinWatermark
metódus használatához, ahogyan az alábbi példában is látható:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])