Állapotalapú feldolgozás optimalizálása a DLT-ben vízjelekkel
Az állapotban tárolt adatok hatékony kezeléséhez használjon vízjeleket az állapotalapú adatfolyamok DLT-ben történő feldolgozásakor, beleértve az összesítéseket, illesztéseket és deduplikációkat. Ez a cikk bemutatja, hogyan használhat vízjeleket a DLT-lekérdezésekben, és példákat tartalmaz az ajánlott műveletekre.
Jegyzet
Annak érdekében, hogy az aggregációkat végrehajtó lekérdezések növekményesen legyenek feldolgozva, és ne legyenek teljesen újrafordítve az egyes frissítésekkel, vízjeleket kell használnia.
Mi az a vízjel?
A streamfeldolgozásban a vízjeles egy Apache Spark-szolgáltatás, amely az állapotalapú műveletek, például összesítések végrehajtásakor meghatározhat egy időalapú küszöbértéket az adatok feldolgozásához. Az érkező adatok feldolgozása addig történik, amíg el nem éri a küszöbértéket, és ekkor a küszöbérték által meghatározott időablak bezárul. A vízjelekkel elkerülhetők a lekérdezések feldolgozása során felmerülő problémák, főként nagyobb adathalmazok vagy hosszú ideig futó feldolgozás esetén. Előfordulhat, hogy ezek a problémák közé tartozik az eredmények előállításának jelentős késése, sőt akár memóriahiányos (OOM) hibák is, a feldolgozás során az állapotban felhalmozódott adatmennyiség miatt. Mivel a streamelési adatok eredendően rendezetlenek, a vízjelek támogatják az olyan műveletek helyes kiszámítását is, mint az időablak-összesítések.
Ha többet szeretne megtudni a vízjelek streamfeldolgozásban való használatáról, tekintse meg Vízjelezés az Apache Spark strukturált leképezésű adatfolyamokban és Vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozása érdekében.
Hogyan definiálhat vízjelet?
A vízjel meghatározásához meg kell adnia egy időbélyegmezőt és egy olyan értéket, amely késői adatok érkezésének időküszöbét adja meg. Az adatok késésnek minősülnek, ha a megadott időküszöb után érkeznek. Ha például a küszöbérték 10 perc, a 10 perces küszöbérték után érkező rekordok elvethetők.
Mivel a megadott küszöbérték után érkező rekordok elvethetők, fontos a késési és a helyességi követelményeknek megfelelő küszöbérték kiválasztása. Ha kisebb küszöbértéket választ, a rekordok hamarabb lesznek kibocsátva, de azt is jelenti, hogy a késői rekordok nagyobb valószínűséggel lesznek elvetve. A nagyobb küszöbérték hosszabb várakozást, de az adatok teljességét is jelentheti. A nagyobb állapotméret miatt a nagyobb küszöbértékek további számítási erőforrásokat is igényelhetnek. Mivel a küszöbérték az adatoktól és a feldolgozási követelményektől függ, a feldolgozás tesztelése és monitorozása fontos az optimális küszöbérték meghatározásához.
A Python withWatermark()
függvényével definiálhat vízjelet. Az SQL-ben a WATERMARK
záradék használatával definiáljon vízjelet:
Piton
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Vízjelek használata stream-stream csatlakoztatáskor
Stream-stream illesztések esetén meg kell adnia az illesztés mindkét oldalán egy vízjelet, valamint egy időintervallum-kifejezést. Mivel minden illesztési forrás hiányosan tekinti meg az adatokat, az időintervallum záradék szükséges ahhoz, hogy tájékoztassa a streamelési motort, ha nem lehet további egyezéseket létrehozni. Az időintervallum záradéknak ugyanazokat a mezőket kell használnia, mint a vízjelek definiálásához.
Mivel előfordulhat, hogy az egyes streamek különböző küszöbértékeket igényelnek a vízjelekhez, a streameknek nem kell azonos küszöbértékekkel rendelkezniük. A hiányzó adatok elkerülése érdekében a streamelési motor egy globális vízjelet tart fenn a leglassabb stream alapján.
Az alábbi példa egy hirdetésmegjelenések streamjéhez kapcsolódik, valamint a felhasználói kattintások streamjéhez a hirdetésekre. Ebben a példában a kattintásnak a megjelenést követő 3 percen belül kell történnie. A 3 perces időintervallum leteltét követően a már nem egyeztethető állapot sorait a rendszer elveti.
Piton
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Ablakos összesítések végrehajtása vízjelekkel
A streamelési adatok gyakori állapotalapú művelete egy ablakos összesítés. Az ablakos aggregációk hasonlóak a csoportosított aggregációkhoz, azzal a kivételrel, hogy a megadott ablak részét képező sorok összesítő értékei lesznek visszaadva.
Egy ablak definiálható egy adott hosszként, és az összesítési művelet az adott ablak részét képező összes sorban is végrehajtható. A Spark Streaming három ablaktípust támogat:
- Rögzített ablakok: Rögzített méretű, nem átfedésben lévő és folyamatos időintervallumok sorozata. Egy bemeneti rekord csak egyetlen ablakhoz tartozik.
- Tolóablakok: A tolóablakokhoz hasonlóan a tolóablakok rögzített méretűek, de az ablakok átfedésben lehetnek, és egy rekord több ablakba is eshet.
Amikor az adatok az ablak végén és a vízjel hosszán túl érkeznek, az ablakhoz nem fogadnak el új adatokat, az összesítés eredményét kibocsátják, és az ablak állapotát elvetik.
Az alábbi példa egy rögzített ablak használatával 5 percenként kiszámítja a megjelenítések összegét. Ebben a példában a kijelölési záradék az aliast impressions_window
használja, majd magát az ablakot a GROUP BY
záradék részeként definiálja. Az ablaknak ugyanazon időbélyegoszlopon kell alapulnia, mint a vízjelnek, a példában szereplő clickTimestamp
oszlopnak.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Hasonló példa a Pythonban az óránkénti egységes időablakok nyereségének kiszámítására.
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Streamelési rekordok deduplikálása
A strukturált streamelés pontosan egyszeri feldolgozási garanciával rendelkezik, de nem törli automatikusan az adatforrásokból származó rekordok duplikálását. Mivel például számos üzenetsor legalább egyszer garantálja a garanciát, ismétlődő rekordokra kell számítani az üzenetsorok egyikéből való olvasáskor. A dropDuplicatesWithinWatermark()
függvénnyel bármely megadott mező rekordjait eltávolíthatja, és eltávolíthatja az ismétlődéseket a streamből, még akkor is, ha egyes mezők eltérnek (például az esemény ideje vagy az érkezési idő). Meg kell adnia egy vízjelet a dropDuplicatesWithinWatermark()
függvény használatához. A vízjel által megadott időtartományon belül érkező összes duplikált adat elvetve lesz.
A rendezett adatok azért fontosak, mert a sorrenden kívüli adatok miatt a vízjel értéke helytelenül előreszalad. Ezután, amikor régebbi adatok érkeznek, későinek és elvetettnek minősülnek. A withEventTimeOrder
beállítással a kezdeti pillanatképet a vízjelben megadott időbélyeg alapján, sorrendben dolgozhatja fel. A withEventTimeOrder
lehetőség deklarálható az adathalmazt meghatározó kódban vagy a folyamatbeállításokbanspark.databricks.delta.withEventTimeOrder.enabled
használatával. Például:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Jegyzet
A withEventTimeOrder
beállítás csak Python esetén támogatott.
Az alábbi példában az adatokat a clickTimestamp
dolgozza fel, és az egymástól 5 másodpercen belül érkező, ismétlődő userId
és clickAdId
oszlopokat tartalmazó rekordok törlődnek.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Folyamatkonfiguráció optimalizálása állapotalapú feldolgozáshoz
A gyártási problémák és a túlzott késés elkerülése érdekében a Databricks azt javasolja, hogy engedélyezze a RocksDB-alapú állapotkezelést az állapotalapú streamfeldolgozáshoz, különösen akkor, ha a feldolgozáshoz nagy mennyiségű köztes állapot tárolására van szükség.
A megszakítás nélküli folyamatok automatikusan kezelik az állapottároló konfigurációit.
A RocksDB-alapú állapotkezelés engedélyezéséhez állítsa be a következő konfigurációt a folyamat üzembe helyezése előtt:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Ha többet szeretne megtudni a RocksDB állapottárolójáról, beleértve a RocksDB konfigurációs javaslatait, tekintse meg A RocksDB állapottároló konfigurálása az Azure Databricks.