Megosztás a következőn keresztül:


Vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozásához

Ez a cikk bemutatja a vízjelezés alapvető fogalmait, és javaslatokat nyújt a vízjelek általános állapotalapú streamelési műveletekben való használatára. Vízjeleket kell alkalmaznia az állapotalapú streamelési műveletekre, hogy elkerülje az állapotban tárolt adatok végtelen bővítését, ami memóriaproblémákat okozhat, és növelheti a feldolgozási késéseket a hosszan futó streamelési műveletek során.

Mi az a watermark?

A strukturált streamelés vízjelekkel szabályozza a küszöbértéket, hogy mennyi ideig dolgozza fel a frissítéseket egy adott állapotentitáshoz. Az állapotentitások gyakori példái a következők:

  • Időbeli aggregációk window.
  • Egyedi kulcsok egy join-ban két stream között.

Amikor egy watermark-t deklarál, meg kell adnia egy időbélyegmezőt és egy watermark küszöbértéket egy folyamatban lévő adatkereten. Az új adatok érkezésekor az állapotkezelő nyomon követi a megadott mező legutóbbi időbélyegét, és feldolgozza a késési küszöbértéken belüli összes rekordot.

Az alábbi példa egy 10 perces watermark küszöbértéket alkalmaz egy ablakos számlálóra:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Ebben a példában:

  • A event_timecolumn 10 perces watermark és 5 perces windowdefiniálására szolgál.
  • A rendszer minden megfigyelt, nem átfedésben lévő 5 perces ablakhoz id számokat gyűjt.
  • Az állapotinformációk az egyes számokra vonatkozóan mindaddig megmaradnak, amíg a window vége 10 perccel régebbi, mint a legutóbbi megfigyelt event_time.

Fontos

Watermark küszöbértékek garantálják, hogy a megadott küszöbértéken belül érkező rekordok feldolgozása a megadott lekérdezés szemantikája szerint történik. Előfordulhat, hogy a megadott küszöbértéken kívülre érkező késői rekordok lekérdezési metrikákkal dolgozhatók fel, de ez nem garantált.

Hogyan befolyásolják a vízjelek a feldolgozási időt és az átviteli sebességet?

A vízjelek a kimeneti módokkal együttműködve szabályozják az adatok fogadóba írásakor. Mivel a vízjelek csökkentik a feldolgozandó állapotinformációk teljes mennyiségét, a vízjelek hatékony használata elengedhetetlen a hatékony állapotalapú streamelési átviteli sebességhez.

Feljegyzés

Nem minden kimeneti mód támogatott minden állapotalapú művelethez.

Vízjelek és kimeneti mód ablakos összesítésekhez

Az alábbi table részletezi a watermark definiált időbélyeg alapján történő összesítő lekérdezések feldolgozását.

Kimeneti mód Működés
Hozzáfűzés A sorokat a cél table-ba írják, miután a watermark küszöbérték átlépése megtörtént. A késési küszöbérték alapján minden írás késik. A régi összesítési állapot a küszöbérték túllépése után el lesz ejtve.
Update A sorok a cél table-ba íródnak, amikor az eredményeket kiszámítják, és az új adatok érkezésekor frissíthetők és felülírhatók. A régi összesítési állapot a küszöbérték túllépése után el lesz ejtve.
Kész Az összesítési állapot nincs elvetve. A cél table minden triggerrel újraíródik.

Vízjelek és kimenet stream-stream illesztésekhez

A több stream közötti illesztések csak a hozzáfűzési módot támogatják, és a megfeleltetett rekordok minden felderített kötegben meg vannak írva. Belső illesztések esetén a Databricks azt javasolja, hogy minden streamelési adatforráson watermark küszöbértéket állítson be. Ez lehetővé teszi az állapotinformációk elvetét a régi rekordok esetében. Vízjelek nélkül a strukturált streamelés megpróbálja join minden kulcsot a join mindkét oldaláról az egyes triggerekkel.

A strukturált streamelés speciális szemantikával támogatja a külső illesztéseket. A külső illesztések esetében kötelező a vízjelezés, mivel ez azt jelzi, hogy mikor kell null értékkel írni egy kulcsot, miután az nem egyezik. Vegye figyelembe, hogy bár a külső illesztések hasznosak lehetnek olyan rekordok rögzítéséhez, amelyek nem egyeznek meg az adatfeldolgozás során, mivel az illesztések csak függevény műveletként írnak a tables-ra, a hiányzó adatok csak a késési küszöbérték lejárta után lesznek rögzítve.

Késői adatküszöb szabályozása több watermark-szabályzattal a strukturált streamelésben

Ha több strukturált streambemenettel dolgozik, több vízjelet is set a késésben érkező adatok tűréshatárainak szabályozásához. A vízjelek konfigurálása lehetővé teszi az állapotinformációk szabályozását, és hatással van a késésre.

A streamelési lekérdezések több bemeneti adatfolyamot is tartalmazhatnak, amelyek egyesítve vagy egyesítve vannak. Az egyes bemeneti adatfolyamok eltérő küszöbértéket tartalmazhatnak a késői adatokhoz, amelyeket az állapotalapú műveletekhez el kell viselni. Adja meg ezeket a küszöbértékeket az egyes bemeneti adatfolyamok használatával withWatermarks("eventTime", delay) . Az alábbiakban egy példa lekérdezést láthat stream-stream illesztésekkel.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

A lekérdezés futtatása során a strukturált stream egyenként követi nyomon az egyes bemeneti adatfolyamokban megfigyelt maximális eseményidőt, kiszámítja a vízjeleket a megfelelő késleltetés alapján, és kiválaszt egyetlen globális watermark-t, amely az állapotalapú műveletekhez használható. Alapértelmezés szerint a minimumot a globális watermark választja ki, mert biztosítja, hogy véletlenül ne essen adat túl későre, ha az egyik stream a többi mögött marad (például az egyik stream leállítja az adatok fogadását a felsőbb rétegbeli hibák miatt). Más szóval a globális watermark biztonságosan mozog a leglassabb stream ütemében, és a lekérdezés kimenete ennek megfelelően késik.

Ha gyorsabb eredményeket szeretne get, set a több watermark házirendet, hogy globális watermark ként a maximális értéket válassza ki az SQL-konfigurációs spark.sql.streaming.multipleWatermarkPolicymax (alapértelmezés szerint min). Ez lehetővé teszi, hogy a globális watermark a leggyorsabb stream tempójában mozogjon. Ez a konfiguráció azonban elveti az adatokat a leglassabb streamekből. Emiatt a Databricks azt javasolja, hogy ezt a konfigurációt megfontoltan használja.

Duplikátumok eltávolítása watermark

A Databricks Runtime 13.3 LTS-ben és újabb verziókban egy watermark küszöbértéken belüli rekordokat deduplikálhat egy egyedi identifierhasználatával.

A strukturált streamelés pontosan egyszeri feldolgozási garanciákat biztosít, de nem deduplikálja automatikusan a rekordokat az adatforrásokból. Bármely megadott mezőn használható a dropDuplicatesWithinWatermark a rekordok deduplikálására, lehetővé téve, hogy remove ismétlődő elemeket egy streamből, még akkor is, ha egyes mezők eltérőek (például az érkezési idő vagy az esemény ideje).

A megadott watermark belül érkező ismétlődő rekordok garantáltan kiesnek. Ez a garancia csak egy irányban szigorú, és a megadott küszöbértéken kívülre érkező duplikált rekordok is elvethetők. A duplikált események közötti watermark késleltetési küszöbértékét set az összes ismétlődés remove.

Meg kell adnia egy watermark-át a dropDuplicatesWithinWatermark metódus használatához, ahogyan az alábbi példában is látható:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])