Delen via


Stateful verwerking in DLT optimaliseren met watermerken

Gebruik watermerken bij het uitvoeren van stateful streamverwerking in DLT, inclusief aggregaties, joins en deduplicatie, om de gegevens die in de toestand bewaard worden effectief te beheren. In dit artikel wordt beschreven hoe u watermerken gebruikt in uw DLT-query's en voorbeelden bevat van de aanbevolen bewerkingen.

Notitie

Om ervoor te zorgen dat query's die aggregaties uitvoeren incrementeel worden verwerkt en niet volledig opnieuw worden aangevuld met elke update, moet u watermerken gebruiken.

Wat is een watermerk?

In stroomverwerking is een watermerk een Apache Spark-functie die een drempelwaarde op basis van tijd kan definiëren voor het verwerken van gegevens bij het uitvoeren van stateful bewerkingen, zoals aggregaties. Gegevens die binnenkomen, worden verwerkt totdat de drempelwaarde is bereikt, waarna het tijdvenster dat door de drempelwaarde is gedefinieerd, wordt gesloten. Watermerken kunnen worden gebruikt om problemen tijdens het verwerken van query's te voorkomen, voornamelijk bij het verwerken van grotere gegevenssets of langdurige verwerking. Deze problemen kunnen hoge latentie omvatten bij het produceren van resultaten en zelfs out-of-memory (OOM) fouten veroorzaken vanwege de hoeveelheid gegevens die tijdens de verwerking in het geheugen wordt gehouden. Omdat streaminggegevens inherent ongeordeld zijn, bieden watermerken ook ondersteuning voor het correct berekenen van bewerkingen zoals tijdvensteraggregaties.

Zie Watermerken in Apache Spark Structured Streaming en Watermerken toepassen om dataverwerkingsdrempels te beherenvoor meer informatie over het gebruik van watermerken in stroomverwerking.

Hoe definieert u een watermerk?

U definieert een watermerk door een tijdstempelveld en een waarde op te geven die de tijdsdrempel aangeeft voor late gegevens die binnenkomen. Gegevens worden te laat beschouwd als ze na de gedefinieerde tijdsdrempel binnenkomen. Als de drempelwaarde bijvoorbeeld is gedefinieerd als 10 minuten, kunnen records die na de drempelwaarde van 10 minuten binnenkomen, worden verwijderd.

Omdat records die na de gedefinieerde drempelwaarde binnenkomen, mogelijk worden verwijderd, is het selecteren van een drempelwaarde die voldoet aan uw latentie versus de vereisten voor juistheid belangrijk. Als u een kleinere drempelwaarde kiest, worden records sneller verzonden, maar betekent dit ook dat latere records waarschijnlijker worden verwijderd. Een grotere drempelwaarde betekent een langere wachttijd, maar mogelijk meer volledigheid van gegevens. Vanwege de grotere statusgrootte kan een grotere drempelwaarde ook extra rekenresources vereisen. Omdat de drempelwaarde afhankelijk is van uw vereisten voor gegevens en verwerking, is testen en bewaken van uw verwerking belangrijk om een optimale drempelwaarde te bepalen.

U gebruikt de functie withWatermark() in Python om een watermerk te definiëren. Gebruik in SQL de WATERMARK-component om een watermerk te definiëren:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Watermerken gebruiken met stream-stream-joins

Voor stream-stream joins moet u een watermerk aan beide zijden van de join en een tijdsintervalcomponent definiëren. Omdat elke joinbron een onvolledige weergave van de gegevens heeft, is de tijdsintervalcomponent vereist om de streaming-engine te laten weten wanneer er geen verdere overeenkomsten kunnen worden gemaakt. De tijdsintervalcomponent moet dezelfde velden gebruiken die worden gebruikt om de watermerken te definiëren.

Omdat voor elke stroom mogelijk verschillende drempelwaarden voor watermerken zijn vereist, hoeven de streams niet dezelfde drempelwaarden te hebben. Om ontbrekende gegevens te voorkomen, onderhoudt de streaming-engine één globaal watermerk op basis van de langzaamste stroom.

In het volgende voorbeeld wordt een stream met advertentie-indrukken samengevoegd en wordt er een stroom gebruikersklikken op advertenties weergegeven. In dit voorbeeld moet er binnen 3 minuten na de indruk een klik plaatsvinden. Nadat het tijdsinterval van 3 minuten is verstreken, worden rijen uit de status verwijderd die niet meer overeenkomen.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

vensteraggregaties uitvoeren met watermerken

Een algemene stateful bewerking voor streaminggegevens is een gevensterde aggregatie. Gevensterde aggregaties zijn vergelijkbaar met gegroepeerde aggregaties, behalve dat geaggregeerde waarden worden geretourneerd voor de set rijen die deel uitmaken van het gedefinieerde venster.

Een venster kan worden gedefinieerd als een bepaalde lengte en een aggregatiebewerking kan worden uitgevoerd op alle rijen die deel uitmaken van dat venster. Spark Streaming ondersteunt drie typen vensters:

  • Vaste tumblingvensters: een reeks niet-overlappende en aaneengesloten tijdsintervallen met vaste grootte. Een invoerrecord behoort tot slechts één venster.
  • schuifvensters: vergelijkbaar met tumblingvensters zijn schuifvensters vast formaat, maar vensters kunnen overlappen en een record kan in meerdere vensters vallen.

Wanneer gegevens voorbij het einde van het venster binnenkomen plus de lengte van het watermerk, worden er geen nieuwe gegevens geaccepteerd voor het venster, wordt het resultaat van de aggregatie verzonden en wordt de status voor het venster verwijderd.

In het volgende voorbeeld wordt elke 5 minuten een som van de weergaven berekend met behulp van een vast venster. In dit voorbeeld gebruikt de select-component de alias impressions_windowen wordt het venster zelf gedefinieerd als onderdeel van de GROUP BY-component. Het venster moet zijn gebaseerd op dezelfde tijdstempelkolom als het watermerk, de clickTimestamp kolom in dit voorbeeld.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Een vergelijkbaar voorbeeld in Python om de winst te berekenen in vaste tijdvensters van een uur.

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Streaminggegevens ontdubbelen

Structured Streaming heeft exactly-once verwerkingswaarborg, maar ontdubbelt niet automatisch records uit gegevensbronnen. Omdat veel berichtenwachtrijen bijvoorbeeld ten minste eenmaal garanties hebben, moeten dubbele records worden verwacht bij het lezen van een van deze berichtenwachtrijen. U kunt de functie dropDuplicatesWithinWatermark() gebruiken om records in elk opgegeven veld te dedupliceren, zodat dubbele waarden uit een stroom worden verwijderd, zelfs als sommige velden verschillen (zoals de tijd van de gebeurtenis of de aankomsttijd). U moet een watermerk opgeven om de functie dropDuplicatesWithinWatermark() te gebruiken. Alle dubbele gegevens die binnen het door het watermerk opgegeven tijdsbereik binnenkomen, worden verwijderd.

Geordende gegevens zijn belangrijk omdat out-of-ordergegevens ervoor kunnen zorgen dat de waarde van de watermerk verkeerd vooruitgaat. Wanneer oudere gegevens binnenkomen, wordt dit beschouwd als laat en verwijderd. Gebruik de optie withEventTimeOrder om de eerste momentopname op volgorde te verwerken op basis van de tijdstempel die is opgegeven in het watermerk. De optie withEventTimeOrder kan worden gedeclareerd in de code die de gegevensset definieert of in de pijplijninstellingen met behulp van spark.databricks.delta.withEventTimeOrder.enabled. Bijvoorbeeld:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Notitie

De optie withEventTimeOrder wordt alleen ondersteund met Python.

In het volgende voorbeeld worden gegevens op volgorde van clickTimestampverwerkt en worden records die binnen 5 seconden na elkaar binnenkomen die dubbele userId bevatten en clickAdId kolommen worden verwijderd.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Pijplijnconfiguratie optimaliseren voor toestandafhankelijke verwerking

Om productieproblemen en overmatige latentie te voorkomen, raadt Databricks aan om RocksDB-gebaseerd statusbeheer in te schakelen voor je stateful stroomverwerking, vooral als je verwerking een grote hoeveelheid tussenstatus vereist.

Serverloze pijplijnen beheren configuraties voor statusopslag automatisch.

U kunt statusbeheer op basis van RocksDB inschakelen door de volgende configuratie in te stellen voordat u een pijplijn implementeert:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Zie Configure RocksDB state store on Azure Databricksvoor meer informatie over het RocksDB-statusarchief, inclusief configuratieaanaanvelingen voor RocksDB.