Dela via


Välj ett utdataläge för strukturerad direktuppspelning

I den här artikeln beskrivs hur du väljer ett utdataläge för tillståndskänslig strömning. Endast tillståndskänsliga strömmar som innehåller aggregeringar kräver en konfiguration av utdataläget.

Kopplingar stöder endast tilläggsutdataläget och utdataläget påverkar inte dedupliceringen. De godtyckliga tillståndskänsliga operatorerna mapGroupsWithState och flatMapGroupsWithState genererar poster med hjälp av sin egen anpassade logik, så att strömmens utdataläge inte påverkar deras beteende.

För tillståndslös direktuppspelning fungerar alla utdatalägen på samma sätt.

Om du vill konfigurera utdataläget korrekt måste du förstå tillståndskänslig strömning, vattenstämplar och utlösare. Mer information finns i följande artiklar:

Vad är utdataläge?

Utdataläget för en strukturerad direktuppspelningsfråga avgör vilka poster frågans operatorer genererar under varje utlösare. De tre typerna av poster som kan genereras är:

  • Registrerar att framtida bearbetning inte ändras.
  • De poster som har ändrats sedan den senaste utlösaren.
  • Alla poster i tillståndstabellen.

Att veta vilka typer av poster som ska genereras är viktigt för tillståndskänsliga operatorer eftersom en viss rad som skapas av en tillståndskänslig operator kan ändras från utlösare till utlösare. När en strömningsaggregeringsoperator till exempel tar emot fler rader för ett visst fönster kan det fönstrets aggregeringsvärden ändras mellan utlösare.

För tillståndslösa operatorer påverkar skillnaden mellan posttyper inte operatorns beteende. Posterna som en tillståndslös operator genererar under en utlösare är alltid källposterna som bearbetas under utlösaren.

Tillgängliga utdatalägen

Det finns tre utdatalägen som talar om för en operator vilka poster som ska genereras under en viss utlösare:

Utdataläge beskrivning
Tilläggsläge (standard) Som standard körs strömmande frågor i tilläggsläge. I det här läget genererar operatorer endast rader som inte ändras i framtida utlösare. Tillståndsbevarande operatorer använder vattenmärket för att avgöra när detta inträffar.
Uppdateringsläge I uppdateringsläge sänder operatorerna ut alla rader som har ändrats under triggaren, även om posten som genereras kan ändras i en efterföljande trigger.
Fullständigt läge Fullständigt läge fungerar endast med strömningsaggregeringar. I fullständigt läge genereras alla resulterande rader som genereras av operatorn nedströms.

Produktionsöverväganden

För många tillståndskänsliga strömningsåtgärder måste du välja mellan tilläggs- och uppdateringslägen. I följande avsnitt beskrivs överväganden som kan ligga till hjälp för ditt beslut.

Kommentar

Komplett läge har vissa program, men kan fungera dåligt när data skalar. Databricks rekommenderar att du använder materialiserade vyer för att få semantiska garantier associerade med fullständigt läge för inkrementell bearbetning för många tillståndsberoende operationer. Se Använd materialiserade vyer i Databricks SQL.

Programsemantik

Programsemantik beskriver hur underordnade program använder strömmande data.

Om underordnade tjänster behöver vidta en enda åtgärd för varje nedströmsskrivning använder du tilläggsläget i de flesta fall. Om du till exempel har en underordnad meddelandetjänst som skickar meddelanden för varje ny post som skrivs till mottagaren, ser tilläggsläget till att varje post bara skrivs en gång. Uppdateringsläget skriver posten varje gång som tillståndsinformationen ändras, vilket skulle resultera i många uppdateringar.

Om underordnade tjänster behöver nya resultat säkerställer uppdateringsläget att mottagaren förblir så up-to-datum som möjligt. Exempel är en maskininlärningsmodell som läser funktioner i realtid eller en instrumentpanel för analys som spårar aggregeringar i realtid.

Operator- och mottagarkompatibilitet

Strukturerad direktuppspelning stöder inte alla åtgärder som är tillgängliga i Apache Spark och vissa strömningsåtgärder stöds inte i alla utdatalägen. Mer information om operatorbegränsningar finns i OSS-strömningsdokumenten.

Alla mottagare stöder inte alla utdatalägen. Både Delta Lake, som stöder alla hanterade Unity Catalog-tabeller, och Kafka stöder alla utdatalägen. Mer information om mottagarkompatibilitet finns i OSS-strömningsdokumenten.

Svarstid och kostnad

Utdataläget påverkar hur lång tid som måste förflutit innan en post skrivs, och frekvensen och mängden data som skrivs kan påverka kostnaderna för strömmande pipelines.

Tilläggsläget tvingar tillståndskänsliga operatorer att generera resultat först när tillståndskänsliga resultat har slutförts, vilket är minst lika länge som vattenstämpelfördröjningen. En vattenstämpelfördröjning på 1 hour i utdataläget för tillägg innebär att posterna har minst en timmes fördröjning innan de skickas nedströms.

Uppdateringsläget resulterar i en skrivning per utlösare per aggregeringsvärde. Om din mottagare debiteras per skrivning per enskild post kan det vara dyrt om posterna uppdateras många gånger innan vattenstämpelfördröjningen har passerat.

Exempel på konfigurationer

Följande kodexempel visar hur du konfigurerar utdataläge för direktuppspelningsuppdateringar till Unity Catalog-tabeller:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Se OSS-dokument för PySpark DataStreamWriter.outputMode eller Scala DataStreamWriter.outputMode.

Exempel på tillståndskänsliga strömnings- och utdatalägen

Följande exempel är avsett att hjälpa dig att resonera genom hur utdataläget interagerar med vattenstämplar för tillståndskänslig strömning.

Överväg en strömmande aggregering som beräknar de totala intäkterna som genereras varje timme i en butik med en vattenstämpelfördröjning på 15 minuter. Den första mikrobatchen bearbetar följande poster:

  • $15 kl. 14:40
  • $10 kl. 14:30
  • $30 kl. 15:10

Nu är motorns vattenstämpel 14:55 eftersom den subtraherar 15 minuter (fördröjningen) från den maximala tid som visas (15:10). Strömningsaggregeringsoperatorn har följande status:

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $30

I följande tabell beskrivs vad som skulle hända i varje utdataläge:

Utdataläge Resultat och orsak
Lägga till Strömningsaggregeringsoperatorn genererar inget nedströms. Detta beror på att båda dessa fönster kan ändras när nya värden visas med en efterföljande utlösare: vattenmärket vid 14:55 anger att poster efter 14:55 fortfarande kan komma, och dessa poster kan falla i antingen [2pm, 3pm]-fönstret eller [3pm, 4pm]-fönstret.
Uppdatera Operatorn genererar båda posterna eftersom båda posterna tog emot uppdateringar.
Klart Operatorn genererar alla poster.

Anta nu att strömmen tar emot ytterligare en post:

  • $20 kl. 15:20

Vattenstämpeln uppdateras till 15:05 eftersom motorn subtraherar 15 minuter från 15:20. Nu har strömningsaggregeringsoperatorn följande tillstånd:

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $50

I följande tabell beskrivs vad som skulle hända i varje utdataläge:

Utdataläge Resultat och orsak
Lägga till Strömningsaggregeringsoperatorn observerar att vattenstämpeln på 15:05 är större än slutet av [2pm, 3pm]-fönstret. Enligt definitionen av vattenstämpeln kan det fönstret inte längre ändras, så det avger fönstret [2pm, 3pm].
Uppdatera Strömningsaggregeringsoperatorn genererar [3pm, 4pm]-fönstret eftersom tillståndsvärdet har ändrats från 30 USD till 50 USD.
Klart Operatorn genererar alla poster.

Följande sammanfattar hur tillståndskänsliga operatorer beter sig i varje tilläggsläge:

  • I tilläggsläge skriver du poster en gång efter vattenstämpelfördröjningen.
  • I uppdateringsläge, skriv poster som har ändrats sedan föregående utlösare.
  • Skriv alla poster som skapats av den tillståndskänsliga operatorn i fullständigt läge.