Megosztás a következőn keresztül:


Állapotalapú feldolgozás optimalizálása a DLT-ben vízjelekkel

Az állapotban tárolt adatok hatékony kezeléséhez használjon vízjeleket az állapotalapú adatfolyamok DLT-ben történő feldolgozásakor, beleértve az összesítéseket, illesztéseket és deduplikációkat. Ez a cikk bemutatja, hogyan használhat vízjeleket a DLT-lekérdezésekben, és példákat tartalmaz az ajánlott műveletekre.

Jegyzet

Annak érdekében, hogy az aggregációkat végrehajtó lekérdezések növekményesen legyenek feldolgozva, és ne legyenek teljesen újrafordítve az egyes frissítésekkel, vízjeleket kell használnia.

Mi az a vízjel?

A streamfeldolgozásban a vízjeles egy Apache Spark-szolgáltatás, amely az állapotalapú műveletek, például összesítések végrehajtásakor meghatározhat egy időalapú küszöbértéket az adatok feldolgozásához. Az érkező adatok feldolgozása addig történik, amíg el nem éri a küszöbértéket, és ekkor a küszöbérték által meghatározott időablak bezárul. A vízjelekkel elkerülhetők a lekérdezések feldolgozása során felmerülő problémák, főként nagyobb adathalmazok vagy hosszú ideig futó feldolgozás esetén. Előfordulhat, hogy ezek a problémák közé tartozik az eredmények előállításának jelentős késése, sőt akár memóriahiányos (OOM) hibák is, a feldolgozás során az állapotban felhalmozódott adatmennyiség miatt. Mivel a streamelési adatok eredendően rendezetlenek, a vízjelek támogatják az olyan műveletek helyes kiszámítását is, mint az időablak-összesítések.

Ha többet szeretne megtudni a vízjelek streamfeldolgozásban való használatáról, tekintse meg Vízjelezés az Apache Spark strukturált leképezésű adatfolyamokban és Vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozása érdekében.

Hogyan definiálhat vízjelet?

A vízjel meghatározásához meg kell adnia egy időbélyegmezőt és egy olyan értéket, amely késői adatok érkezésének időküszöbét adja meg. Az adatok késésnek minősülnek, ha a megadott időküszöb után érkeznek. Ha például a küszöbérték 10 perc, a 10 perces küszöbérték után érkező rekordok elvethetők.

Mivel a megadott küszöbérték után érkező rekordok elvethetők, fontos a késési és a helyességi követelményeknek megfelelő küszöbérték kiválasztása. Ha kisebb küszöbértéket választ, a rekordok hamarabb lesznek kibocsátva, de azt is jelenti, hogy a késői rekordok nagyobb valószínűséggel lesznek elvetve. A nagyobb küszöbérték hosszabb várakozást, de az adatok teljességét is jelentheti. A nagyobb állapotméret miatt a nagyobb küszöbértékek további számítási erőforrásokat is igényelhetnek. Mivel a küszöbérték az adatoktól és a feldolgozási követelményektől függ, a feldolgozás tesztelése és monitorozása fontos az optimális küszöbérték meghatározásához.

A Python withWatermark() függvényével definiálhat vízjelet. Az SQL-ben a WATERMARK záradék használatával definiáljon vízjelet:

Piton

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Vízjelek használata stream-stream csatlakoztatáskor

Stream-stream illesztések esetén meg kell adnia az illesztés mindkét oldalán egy vízjelet, valamint egy időintervallum-kifejezést. Mivel minden illesztési forrás hiányosan tekinti meg az adatokat, az időintervallum záradék szükséges ahhoz, hogy tájékoztassa a streamelési motort, ha nem lehet további egyezéseket létrehozni. Az időintervallum záradéknak ugyanazokat a mezőket kell használnia, mint a vízjelek definiálásához.

Mivel előfordulhat, hogy az egyes streamek különböző küszöbértékeket igényelnek a vízjelekhez, a streameknek nem kell azonos küszöbértékekkel rendelkezniük. A hiányzó adatok elkerülése érdekében a streamelési motor egy globális vízjelet tart fenn a leglassabb stream alapján.

Az alábbi példa egy hirdetésmegjelenések streamjéhez kapcsolódik, valamint a felhasználói kattintások streamjéhez a hirdetésekre. Ebben a példában a kattintásnak a megjelenést követő 3 percen belül kell történnie. A 3 perces időintervallum leteltét követően a már nem egyeztethető állapot sorait a rendszer elveti.

Piton

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Ablakos összesítések végrehajtása vízjelekkel

A streamelési adatok gyakori állapotalapú művelete egy ablakos összesítés. Az ablakos aggregációk hasonlóak a csoportosított aggregációkhoz, azzal a kivételrel, hogy a megadott ablak részét képező sorok összesítő értékei lesznek visszaadva.

Egy ablak definiálható egy adott hosszként, és az összesítési művelet az adott ablak részét képező összes sorban is végrehajtható. A Spark Streaming három ablaktípust támogat:

  • Rögzített ablakok: Rögzített méretű, nem átfedésben lévő és folyamatos időintervallumok sorozata. Egy bemeneti rekord csak egyetlen ablakhoz tartozik.
  • Tolóablakok: A tolóablakokhoz hasonlóan a tolóablakok rögzített méretűek, de az ablakok átfedésben lehetnek, és egy rekord több ablakba is eshet.

Amikor az adatok az ablak végén és a vízjel hosszán túl érkeznek, az ablakhoz nem fogadnak el új adatokat, az összesítés eredményét kibocsátják, és az ablak állapotát elvetik.

Az alábbi példa egy rögzített ablak használatával 5 percenként kiszámítja a megjelenítések összegét. Ebben a példában a kijelölési záradék az aliast impressions_windowhasználja, majd magát az ablakot a GROUP BY záradék részeként definiálja. Az ablaknak ugyanazon időbélyegoszlopon kell alapulnia, mint a vízjelnek, a példában szereplő clickTimestamp oszlopnak.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Hasonló példa a Pythonban az óránkénti egységes időablakok nyereségének kiszámítására.

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Streamelési rekordok deduplikálása

A strukturált streamelés pontosan egyszeri feldolgozási garanciával rendelkezik, de nem törli automatikusan az adatforrásokból származó rekordok duplikálását. Mivel például számos üzenetsor legalább egyszer garantálja a garanciát, ismétlődő rekordokra kell számítani az üzenetsorok egyikéből való olvasáskor. A dropDuplicatesWithinWatermark() függvénnyel bármely megadott mező rekordjait eltávolíthatja, és eltávolíthatja az ismétlődéseket a streamből, még akkor is, ha egyes mezők eltérnek (például az esemény ideje vagy az érkezési idő). Meg kell adnia egy vízjelet a dropDuplicatesWithinWatermark() függvény használatához. A vízjel által megadott időtartományon belül érkező összes duplikált adat elvetve lesz.

A rendezett adatok azért fontosak, mert a sorrenden kívüli adatok miatt a vízjel értéke helytelenül előreszalad. Ezután, amikor régebbi adatok érkeznek, későinek és elvetettnek minősülnek. A withEventTimeOrder beállítással a kezdeti pillanatképet a vízjelben megadott időbélyeg alapján, sorrendben dolgozhatja fel. A withEventTimeOrder lehetőség deklarálható az adathalmazt meghatározó kódban vagy a folyamatbeállításokbanspark.databricks.delta.withEventTimeOrder.enabledhasználatával. Például:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Jegyzet

A withEventTimeOrder beállítás csak Python esetén támogatott.

Az alábbi példában az adatokat a clickTimestampdolgozza fel, és az egymástól 5 másodpercen belül érkező, ismétlődő userId és clickAdId oszlopokat tartalmazó rekordok törlődnek.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Folyamatkonfiguráció optimalizálása állapotalapú feldolgozáshoz

A gyártási problémák és a túlzott késés elkerülése érdekében a Databricks azt javasolja, hogy engedélyezze a RocksDB-alapú állapotkezelést az állapotalapú streamfeldolgozáshoz, különösen akkor, ha a feldolgozáshoz nagy mennyiségű köztes állapot tárolására van szükség.

A megszakítás nélküli folyamatok automatikusan kezelik az állapottároló konfigurációit.

A RocksDB-alapú állapotkezelés engedélyezéséhez állítsa be a következő konfigurációt a folyamat üzembe helyezése előtt:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Ha többet szeretne megtudni a RocksDB állapottárolójáról, beleértve a RocksDB konfigurációs javaslatait, tekintse meg A RocksDB állapottároló konfigurálása az Azure Databricks.