Optymalizowanie przetwarzania stanowego w technologii DLT przy użyciu znaków wodnych
Aby skutecznie zarządzać danymi przechowywanymi w stanie, użyj znaków wodnych podczas wykonywania stanowego przetwarzania strumieniowego w DLT, w tym agregacji, łączeń i deduplikacji. W tym artykule opisano sposób używania znaków wodnych w zapytaniach DLT, wraz z przykładami zalecanych operacji.
Notatka
Aby zapewnić, że zapytania wykonujące agregacje są przetwarzane przyrostowo i nie są w pełni ponownie skompilowane przy każdej aktualizacji, należy użyć znaków wodnych.
Co to jest znak wodny?
W przetwarzaniu strumienia jest funkcją platformy Apache Spark, która może zdefiniować próg czasu przetwarzania danych podczas wykonywania operacji stanowych, takich jak agregacje. Dane przychodzące są przetwarzane do momentu osiągnięcia progu, w którym przedział czasu zdefiniowany przez próg zostanie zamknięty. Znaki wodne mogą służyć do unikania problemów podczas przetwarzania zapytań, głównie podczas przetwarzania większych zestawów danych lub długotrwałego przetwarzania. Te problemy mogą obejmować duże opóźnienie w generowaniu wyników, a nawet błędy braku pamięci (OOM) z powodu ilości danych przechowywanych w stanie podczas przetwarzania. Ponieważ dane przesyłane strumieniowo są naturalnie nieurządowane, watermarki obsługują prawidłowe obliczanie operacji, takich jak agregacje okien czasowych.
Aby dowiedzieć się więcej na temat używania znaków wodnych w strumieniowym przetwarzaniu danych, zobacz Znakowanie wodne w Apache Spark Structured Streaming oraz Stosowanie znaków wodnych do kontrolowania progów przetwarzania danych.
Jak zdefiniować znak wodny?
Należy zdefiniować znak wodny, określając pole znacznika czasu i wartość reprezentującą próg czasu dla późnych danych nadejścia. Dane są uznawane za opóźnione, jeśli docierają po zdefiniowanym progu czasu. Jeśli na przykład próg jest zdefiniowany jako 10 minut, rekordy przychodzące po progu 10 minut mogą zostać usunięte.
Ponieważ rekordy dostarczane po zdefiniowanym progu mogą zostać porzucone, wybranie progu spełniającego wymagania dotyczące opóźnień i poprawności jest ważne. Wybranie mniejszego progu powoduje, że rekordy są emitowane wcześniej, ale jednocześnie zwiększa prawdopodobieństwo, że późne rekordy zostaną porzucone. Większy próg oznacza dłuższe oczekiwanie, ale prawdopodobnie większą kompletność danych. Ze względu na większy rozmiar stanu większy próg może również wymagać dodatkowych zasobów obliczeniowych. Ponieważ wartość progowa zależy od wymagań dotyczących danych i przetwarzania, testowanie i monitorowanie przetwarzania jest ważne, aby określić optymalny próg.
Funkcja withWatermark()
w języku Python służy do definiowania znaku wodnego. W języku SQL użyj klauzuli WATERMARK
, aby zdefiniować znak wodny:
Pyton
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Używanie znaków wodnych z łączeniami strumieniowymi
W przypadku połączeń strumień-strumień należy zdefiniować znak wodny po obu stronach połączenia oraz klauzulę interwału czasu. Ponieważ każde źródło sprzężenia ma niekompletny widok danych, klauzula interwału czasu jest wymagana, aby poinformować silnik przesyłania strumieniowego, kiedy nie można już dokonać dalszych dopasowań. Klauzula interwału czasu musi używać tych samych pól używanych do definiowania znaków wodnych.
Ze względu na to, że każdy strumień wymaga różnych progów dla znaków wodnych, strumienie nie muszą mieć tych samych progów. Aby uniknąć brakujących danych, aparat przesyłania strumieniowego utrzymuje jeden globalny znak wodny na podstawie najwolniejszego strumienia.
Poniższy przykład łączy strumień wyświetleń reklam i strumień kliknięć użytkowników na reklamach. W tym przykładzie kliknięcie musi nastąpić w ciągu 3 minut od wyświetlenia. Po upływie 3-minutowego interwału wiersze ze stanu, którego nie można już dopasować, zostaną usunięte.
Pyton
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Wykonywanie agregacji okienkowych z sygnaturami czasowymi
Typową operacją stanową na danych przesyłanych strumieniowo jest agregacja okienna. Agregacje okienne są podobne do agregacji pogrupowanych, z tą różnicą, że wartości agregujące są zwracane dla zestawu wierszy będących częścią zdefiniowanego okna.
Okno można zdefiniować jako określoną długość, a operację agregacji można wykonać na wszystkich wierszach, które są częścią tego okna. Obsługa Spark Streaming obejmuje trzy typy okien:
- Okna kaskadowe (stałe): seria interwałów czasowych o stałym rozmiarze, nienakładających się i ciągłych. Rekord wejściowy należy tylko do jednego okna.
- Okna przesuwne: Podobnie jak okna przesuwające się bez nakładania, okna przesuwne są o stałym rozmiarze, ale mogą się nakładać, a rekord może znajdować się w wielu oknach.
Gdy dane docierają po zakończeniu okna oraz długości znaku wodnego, żadne nowe dane nie są akceptowane, wynik agregacji jest emitowany, a stan okna jest usuwany.
Poniższy przykład oblicza sumę wyświetleń co 5 minut przy użyciu stałego okna. W tym przykładzie klauzula select używa aliasu impressions_window
, a następnie samo okno jest definiowane jako część klauzuli GROUP BY
. Okno musi być oparte na tej samej kolumnie znacznika czasu co znak wodny, czyli kolumna clickTimestamp
w tym przykładzie.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Podobny przykład na przykładzie Pythona do obliczenia zysku w stałych godzinnych przedziałach czasowych.
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Deduplikacja rekordów przesyłania strumieniowego
Structured Streaming ma dokładnie jednokrotne gwarancje przetwarzania, ale nie deduplikuje automatycznie rekordów z źródeł danych. Na przykład ponieważ wiele kolejek komunikatów ma co najmniej raz gwarancje, podczas odczytu z jednej z tych kolejek komunikatów należy oczekiwać zduplikowanych rekordów. Za pomocą funkcji dropDuplicatesWithinWatermark()
można deduplikować rekordy w dowolnym określonym polu, usuwając duplikaty ze strumienia, nawet jeśli niektóre pola różnią się (np. czas zdarzenia lub godzina przybycia). Aby móc użyć funkcji dropDuplicatesWithinWatermark()
, musisz określić znak wodny. Wszystkie zduplikowane dane, które docierają do zakresu czasu określonego przez znacznik, są odrzucane.
Upórządkowane dane są ważne, ponieważ dane w niewłaściwej kolejności powodują nieprawidłowe przesunięcie wartości punktu odniesienia do przodu. Następnie, gdy pojawią się starsze dane, są uznawane za opóźnione i odrzucone. Użyj opcji withEventTimeOrder
, aby przetworzyć początkową migawkę w kolejności na podstawie znacznika czasu określonego w znaku wodnym. Opcję withEventTimeOrder
można zadeklarować w kodzie definiującym zestaw danych lub w ustawieniach potoku przy użyciu spark.databricks.delta.withEventTimeOrder.enabled
. Na przykład:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Notatka
Opcja withEventTimeOrder
jest obsługiwana tylko w języku Python.
W poniższym przykładzie dane są przetwarzane w porządku według clickTimestamp
, a rekordy, które docierają w odstępie 5 sekund i zawierają zduplikowane kolumny userId
i clickAdId
, są odrzucane.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Optymalizowanie konfiguracji potoku na potrzeby przetwarzania stanowego
Aby zapobiec problemom produkcyjnym i nadmiernemu opóźnieniu, usługa Databricks zaleca włączenie zarządzania stanem opartym na bazie bazy danych RocksDB na potrzeby przetwarzania strumienia stanowego, szczególnie jeśli przetwarzanie wymaga zaoszczędzenia dużej ilości stanu pośredniego.
Potoki bezserwerowe automatycznie zarządzają konfiguracjami magazynu stanów.
Zarządzanie stanem oparte na bazie danych RocksDB włączysz, ustawiając następującą konfigurację przed wdrożeniem potoku:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Aby dowiedzieć się więcej na temat magazynu stanów bazy danych RocksDB, w tym zaleceń dotyczących konfiguracji bazy danych RocksDB, zobacz Configure RocksDB state store on Azure Databricks.