Condividi tramite


Ottimizzare l'elaborazione con stato in DLT con filigrane

Per gestire i dati mantenuti nello stato in modo efficace, utilizzare i watermark durante l'elaborazione del flusso con stato in DLT, comprese aggregazioni, join e deduplicazione. Questo articolo descrive come usare filigrane nelle query DLT e include esempi delle operazioni consigliate.

Nota

Per garantire che le query che eseguono aggregazioni vengano elaborate in modo incrementale e non completamente ricalcolate con ogni aggiornamento, è necessario usare filigrane.

Che cos'è una filigrana?

Nell'elaborazione del flusso, una filigrana è una funzionalità di Apache Spark che può definire una soglia basata sul tempo per l'elaborazione dei dati durante l'esecuzione di operazioni con stato, ad esempio le aggregazioni. I dati in arrivo vengono elaborati fino a quando non viene raggiunta la soglia, al momento in cui l'intervallo di tempo definito dalla soglia viene chiuso. Le filigrane possono essere usate per evitare problemi durante l'elaborazione delle query, principalmente durante l'elaborazione di set di dati di dimensioni maggiori o l'elaborazione a esecuzione prolungata. Questi problemi possono includere un'elevata latenza nella produzione dei risultati e anche errori di esaurimento della memoria (OOM) a causa della quantità di dati mantenuti nella memoria durante l'elaborazione. Poiché i dati di streaming sono intrinsecamente non ordinati, le filigrane supportano anche il calcolo corretto di operazioni come le aggregazioni della finestra temporale.

Per ulteriori informazioni sull'uso dei watermark nell'elaborazione del flusso, vedere Watermarking in Apache Spark Structured Streaming e Applicare i watermark per controllare le soglie di elaborazione dei dati.

Come si definisce una filigrana?

È possibile definire una filigrana specificando un campo timestamp e un valore che rappresenta la soglia di tempo per dati in ritardo all'arrivo. I dati sono considerati in ritardo se arrivano dopo la soglia temporale definita. Ad esempio, se la soglia è definita come 10 minuti, i record che arrivano dopo la soglia di 10 minuti potrebbero essere eliminati.

Poiché i record che arrivano dopo la soglia definita potrebbero essere eliminati, è importante selezionare una soglia che soddisfi i requisiti di latenza e correttezza. La scelta di una soglia più piccola comporta l'emissione dei record prima, ma significa anche che è più probabile che i record in ritardo vengano eliminati. Una soglia maggiore indica un'attesa più lunga, ma probabilmente più completa dei dati. A causa delle dimensioni dello stato maggiori, una soglia maggiore potrebbe richiedere anche risorse di calcolo aggiuntive. Poiché il valore soglia dipende dai requisiti di dati e elaborazione, il test e il monitoraggio dell'elaborazione sono importanti per determinare una soglia ottimale.

Si utilizza la funzione withWatermark() in Python per definire una filigrana. In SQL usare la clausola WATERMARK per definire una filigrana:

Pitone

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Usare filigrane con join di flusso

Per i join tra flussi, è necessario definire una filigrana su entrambi i lati del join e una clausola di intervallo di tempo. Poiché ogni origine join ha una visualizzazione incompleta dei dati, la clausola intervallo di tempo è necessaria per indicare al motore di streaming quando non è possibile fare ulteriori corrispondenze. La clausola intervallo di tempo deve utilizzare gli stessi campi usati per definire le filigrane.

Poiché potrebbero verificarsi momenti in cui ogni flusso richiede soglie diverse per le filigrane, i flussi non devono avere le stesse soglie. Per evitare dati mancanti, il motore di streaming mantiene una filigrana globale basata sul flusso più lento.

L'esempio seguente unisce un flusso di impression pubblicitarie e un flusso di clic degli utenti sugli annunci. In questo esempio, un clic deve verificarsi entro 3 minuti dall'impressione. Dopo il passaggio dell'intervallo di tempo di 3 minuti, le righe dello stato che non possono più essere confrontate vengono eliminate.

Pitone

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Eseguire aggregazioni finestrate con filigrane

Un'operazione con stato comune sui dati di streaming è un'aggregazione con finestra. Le aggregazioni con finestra sono simili alle aggregazioni raggruppate, ad eccezione del fatto che per il set di righe che fanno parte della finestra definita vengono restituiti valori aggregati.

Una finestra può essere definita come una determinata lunghezza e un'operazione di aggregazione può essere eseguita su tutte le righe che fanno parte di tale finestra. Spark Streaming supporta tre tipi di finestre:

  • finestre a scorrimento (fisse): una serie di intervalli di tempo fissi, non sovrapposti e contigui. Un record di input appartiene a una sola finestra.
  • finestre scorrevoli: Analogamente alle finestre a scorrimento, le finestre scorrevoli sono di dimensioni fisse, ma le finestre possono sovrapporsi e un record può appartenere a più finestre.

Quando i dati arrivano oltre la fine della finestra più la lunghezza della filigrana, non vengono accettati nuovi dati per la finestra, il risultato dell'aggregazione viene generato e lo stato per la finestra viene eliminato.

Nell'esempio seguente si calcola il totale delle impression ogni 5 minuti usando una finestra fissa. In questo esempio, la clausola select usa l'alias impressions_windowe quindi la finestra stessa viene definita come parte della clausola GROUP BY. La finestra deve essere basata sulla stessa colonna timestamp della filigrana, la colonna clickTimestamp in questo esempio.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Un esempio simile in Python per calcolare il profitto su finestre fisse orarie:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplicare i record di streaming

Structured Streaming offre una garanzia di elaborazione una sola volta, ma non deduplica automaticamente i record dalle fonti di dati. Ad esempio, poiché molte code di messaggio offrono garanzie almeno una volta, ci si devono aspettare record duplicati durante la lettura da una di queste code di messaggio. È possibile usare la funzione dropDuplicatesWithinWatermark() per deduplicare i record in qualsiasi campo specificato, rimuovendo i duplicati da un flusso anche se alcuni campi differiscono , ad esempio l'ora dell'evento o l'ora di arrivo. È necessario specificare una filigrana per usare la funzione dropDuplicatesWithinWatermark(). Tutti i dati duplicati che arrivano entro l'intervallo di tempo specificato dalla filigrana vengono eliminati.

I dati ordinati sono importanti perché i dati non ordinati causano un salto errato del valore della filigrana. Quindi, quando arrivano dati meno recenti, vengono considerati in ritardo e eliminati. Usare l'opzione withEventTimeOrder per elaborare lo snapshot iniziale in base al timestamp specificato nella filigrana. L'opzione withEventTimeOrder può essere dichiarata nel codice che definisce il set di dati o nelle impostazioni della pipeline usando spark.databricks.delta.withEventTimeOrder.enabled. Per esempio:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Nota

L'opzione withEventTimeOrder è supportata solo con Python.

Nell'esempio seguente, i dati vengono elaborati in base a clickTimestampe i record che arrivano entro 5 secondi di distanza l'uno dall'altro e che contengono colonne duplicate userId e clickAdId vengono eliminati.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Ottimizzare la configurazione della pipeline per l'elaborazione con stato

Per evitare problemi di produzione e una latenza eccessiva, Databricks consiglia di abilitare la gestione dello stato basata su RocksDB per l'elaborazione del flusso con stato, in particolare se l'elaborazione richiede un notevole risparmio di una grande quantità di stato intermedio.

Le pipeline senza server gestiscono automaticamente le configurazioni dell'archiviazione dello stato.

È possibile abilitare la gestione dello stato basata su RocksDB impostando la configurazione seguente prima di distribuire una pipeline:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Per altre informazioni sull'archivio stati di RocksDB, incluse le raccomandazioni di configurazione per RocksDB, vedere Configurare l'archivio stati RocksDB in Azure Databricks.