Optimalizace stavových zpracování v DLT pomocí vodoznaků
Pokud chcete efektivně spravovat data uložená ve stavu, použijte vodoznaky při provádění stavového zpracování datových proudů v DLT, včetně agregací, spojení a odstranění duplicitních dat. Tento článek popisuje, jak používat vodoznaky v dotazech DLT a obsahuje příklady doporučených operací.
Poznámka
Aby se zajistilo, že dotazy, které provádějí agregace, se zpracovávají přírůstkově a nejsou plně přepočítané při každé aktualizaci, musíte použít vodoznaky.
Co je vodoznak?
Při zpracování datových proudů je vodoznak funkcí Apache Sparku, která může definovat prahovou hodnotu založenou na čase pro zpracování dat při provádění stavových operací, jako jsou agregace. Příchozí data se zpracovávají, dokud nedosáhnete prahové hodnoty. V tomto okamžiku je časové období definované prahovou hodnotou uzavřeno. Vodoznaky se dají použít k zabránění problémům při zpracování dotazů, zejména při zpracování větších datových sad nebo dlouhotrvajícího zpracování. Tyto problémy můžou zahrnovat vysokou latenci při vytváření výsledků a dokonce i chyby typu nedostatek paměti (OOM) kvůli množství dat, která se během zpracování uchovávají ve stavu. Vzhledem k tomu, že streamovaná data jsou ze své podstaty neuspořádaná, vodoznaky také podporují správné výpočty operací, jako jsou agregace časových intervalů.
Další informace o používání vodoznaků při zpracování datových proudů najdete v tématu Vodoznaky ve strukturovaném streamování Apache Sparku a Použití vodoznaků k řízení prahových hodnot zpracování dat.
Jak definujete vodoznak?
Vodoznak definujete tak, že zadáte pole časového razítka a hodnotu představující časový práh pro příjem pozdních dat. Data se považují za opožděná, pokud dorazí po definované prahové hodnotě času. Pokud je například prahová hodnota definována jako 10 minut, můžou se záznamy přicházející po 10minutové prahové hodnotě vynechat.
Vzhledem k tomu, že záznamy, které přicházejí po definované prahové hodnotě, mohou být vyřazeny, je důležité vybrat prahovou hodnotu, která splňuje vaše latence a požadavky na správnost. Pokud zvolíte menší prahovou hodnotu, vygenerují se záznamy dříve, ale také to znamená, že opožděné záznamy budou pravděpodobně vynechány. Větší prahová hodnota znamená delší čekání, ale možná větší úplnost dat. Kvůli větší velikosti stavu může větší prahová hodnota vyžadovat také další výpočetní prostředky. Vzhledem k tomu, že prahová hodnota závisí na požadavcích na data a zpracování, je důležité určit optimální prahovou hodnotu testováním a monitorováním zpracování.
K definování vodoznaku použijete funkci withWatermark()
v Pythonu. V SQL použijte klauzuli WATERMARK
k definování vodoznaku:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Použijte vodoznaky pro spojování datových toků-streamů
U spojení stream-stream musíte definovat vodoznak na obou stranách spojení a klauzuli časového intervalu. Vzhledem k tomu, že každý zdroj spojení obsahuje neúplné zobrazení dat, je nutné, aby klauzule časového intervalu řekla modulu streamování, když není možné provést žádné další shody. Klauzule časového intervalu musí používat stejná pole, která slouží k definování vodoznaků.
Vzhledem k tomu, že každý datový proud vyžaduje pro vodoznaky různé prahové hodnoty, nemusí mít datové proudy stejné prahové hodnoty. Aby se zabránilo chybějícím datům, modul streamování udržuje jeden globální vodoznak založený na nejpomalejším datovém proudu.
Následující příklad spojí stream reklamních impresí a stream uživatelských kliknutí na reklamy. V tomto příkladu musí kliknutí dojít do 3 minut po zobrazení. Po uplynutí 3minutového časového intervalu se řádky, které již nelze spárovat, odstraní.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Provádění agregací oken s vodoznaky
Běžnou stavovou operací streamovaných dat je oknová agregace. Agregace s okny jsou podobné seskupeným agregacím s tím rozdílem, že agregační hodnoty se vrátí pro sadu řádků, které jsou součástí definovaného okna.
Okno lze definovat jako určitou délku a operaci agregace lze provést na všech řádcích, které jsou součástí tohoto okna. Streamování Sparku podporuje tři typy oken:
- Roluící (pevná) okna: řada pevných, nepřekrývajících se a souvislých časových intervalů. Vstupní záznam patří pouze do jednoho okna.
- posuvná okna: Podobně jako klouzavá okna mají posuvná okna pevnou velikost, ale okna se mohou překrývat a záznam může spadat do více oken.
Když data dorazí po uplynutí okna a délky vodoznaku, nebudou pro okno přijata žádná nová data, vygeneruje se výsledek agregace a stav okna se vymaže.
Následující příklad vypočítá součet impresí každých 5 minut pomocí pevného okna. V tomto příkladu klauzule select používá alias impressions_window
a pak samotné okno je definováno jako součást klauzule GROUP BY
. Okno musí být založeno na stejném sloupci s časovým razítkem jako je sloupec s vodoznakem, sloupce clickTimestamp
v tomto příkladu.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Podobný příklad v Pythonu pro výpočet zisku v pevně stanovených hodinových intervalech.
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Odstranění duplicitních streamovaných záznamů
Strukturované streamování má záruky zpracování přesně jednou, ale automaticky nededuplikuje záznamy ze zdrojů dat. Protože například mnoho front zpráv má alespoň jednou záruku, měly by se při čtení z jedné z těchto front zpráv očekávat duplicitní záznamy. Funkci dropDuplicatesWithinWatermark()
můžete použít k odstranění duplicitních záznamů u libovolného zadaného pole, odebrání duplicit ze streamu i v případě, že se některá pole liší (například čas události nebo čas příjezdu). Chcete-li použít funkci dropDuplicatesWithinWatermark()
, musíte zadat vodoznak. Všechna duplicitní data, která přicházejí v časovém rozsahu určeném vodoznakem, se zahodí.
Seřazená data jsou důležitá, protože neuspořádaná data způsobují nesprávné posunování hodnoty časové značky. Když přijdou starší data, považují se za opožděná a vyřazená. Pomocí možnosti withEventTimeOrder
můžete zpracovat počáteční snímek v pořadí podle časového razítka zadaného ve vodoznaku. Možnost withEventTimeOrder
lze deklarovat v kódu definujícím datovou sadu nebo v nastavení kanálu pomocí spark.databricks.delta.withEventTimeOrder.enabled
. Například:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Poznámka
Možnost withEventTimeOrder
se podporuje jenom v Pythonu.
V následujícím příkladu se data zpracovávají podle clickTimestamp
a záznamy, které přicházejí v rozmezí 5 sekund od sebe a obsahují duplicitní sloupce userId
a clickAdId
, se zahodí.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Optimalizace konfigurace potrubí pro stavové zpracování
Aby se zabránilo problémům v produkčním prostředí a nadměrné latenci, doporučuje Databricks povolit správu stavu založené na RocksDB pro zpracování stavových datových proudů, zejména pokud zpracování vyžaduje úsporu velkého zprostředkujícího stavu.
Kanály bez serverů automaticky spravují konfigurace stavového úložiště.
Správu stavu na základě RocksDB můžete povolit nastavením následující konfigurace před nasazením kanálu:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Další informace o úložišti stavů RocksDB, včetně doporučení konfigurace pro RocksDB, najdete v tématu Konfigurace úložiště stavů RocksDB v Azure Databricks.