Sdílet prostřednictvím


Kontrolní body strukturovaného streamování

Kontrolní body a protokoly s předstihem pro zápis spolupracují a poskytují záruky zpracování pro úlohy strukturovaného streamování. Kontrolní bod sleduje informace, které identifikují dotaz, včetně informací o stavu a zpracovaných záznamů. Když odstraníte soubory v adresáři kontrolních bodů nebo změníte na nové umístění kontrolního bodu, začne další spuštění dotazu nové.

Každý dotaz musí mít jiné umístění kontrolního bodu. Více dotazů by nikdy nemělo sdílet stejné umístění.

Povolení vytváření kontrolních bodů pro dotazy strukturovaného streamování

Před spuštěním streamovacího dotazu musíte zadat checkpointLocation možnost, jak je znázorněno v následujícím příkladu:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Poznámka:

Některé jímky, například výstup display() poznámkových bloků a memory jímky, automaticky vygenerují dočasné umístění kontrolního bodu, pokud tuto možnost vynecháte. Tato dočasná umístění kontrolních bodů nezajistí žádnou odolnost proti chybám ani záruky konzistence dat a nemusí se správně vyčistit. Databricks doporučuje vždy zadat umístění kontrolního bodu pro tyto jímky.

Obnovení po změnách v dotazu strukturovaného streamování

Existují omezení toho, jaké změny v dotazu streamování jsou povolené mezi restartováními ze stejného umístění kontrolního bodu. Tady je několik změn, které buď nejsou povolené, nebo efekt změny není dobře definovaný. Pro všechny:

  • Povolený termín znamená, že zadanou změnu můžete provést, ale to, jestli je sémantika jejího účinku dobře definovaná, závisí na dotazu a změně.
  • Nepovolený termín znamená, že byste neměli provést zadanou změnu, protože restartovaný dotaz pravděpodobně selže s nepředvídatelnými chybami.
  • sdf představuje streamovaný datový rámec nebo datovou sadu vygenerovanou pomocí sparkSession.readStream.

Typy změn v dotazech strukturovaného streamování

  • Změny čísla nebo typu (tj. jiného zdroje) vstupních zdrojů: To není povoleno.
  • Změny parametrů vstupních zdrojů: Jestli je to povoleno a zda jsou sémantika změny dobře definovaná, závisí na zdroji a dotazu. Tady je pár příkladů.
    • Přidání, odstranění a úprava omezení četnosti je povolené:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      na

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Změny odebíraných článků a souborů se obecně nepovolují, protože výsledky jsou nepředvídatelné: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Změny v intervalu aktivační události: Mezi přírůstkovými dávkami a časovými intervaly můžete změnit triggery. Viz Změna intervalů aktivačních událostí mezi spuštěními.
  • Změny typu výstupní jímky: Změny mezi několika konkrétními kombinacemi jímek jsou povolené. To je potřeba ověřit na základě případu. Tady je pár příkladů.
    • Je povolená jímka souborů do jímky Kafka. Kafka uvidí jenom nová data.
    • Jímka Kafka do jímky souborů není povolená.
    • Je povolená jímka Kafka na foreach nebo naopak.
  • Změny parametrů výstupní jímky: Jestli je tato možnost povolená a jestli jsou sémantika změny dobře definovaná, závisí na jímce a dotazu. Tady je pár příkladů.
    • Změny výstupního adresáře jímky souborů nejsou povoleny: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Změny v tématu výstupu jsou povolené: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Změny uživatelem definované jímky foreach (tj ForeachWriter . kódu) jsou povolené, ale sémantika změny závisí na kódu.
  • Změny v projekci / filtru / operacích podobných mapování: Některé případy jsou povolené. Příklad:
    • Přidání nebo odstranění filtrů je povoleno: sdf.selectExpr("a") do sdf.where(...).selectExpr("a").filter(...).
    • Změny v projekcích se stejným schématem výstupu jsou povoleny: sdf.selectExpr("stringColumn AS json").writeStream do sdf.select(to_json(...).as("json")).writeStream.
    • Změny v projekcích s různým výstupním schématem jsou podmíněně povoleny: sdf.selectExpr("a").writeStream je sdf.selectExpr("b").writeStream povoleno pouze v případě, že výstupní jímka umožňuje změnu schématu z "a" na "b".
  • Změny ve stavových operacích: Některé operace v dotazech streamování musí udržovat stavová data, aby bylo možné průběžně aktualizovat výsledek. Strukturované streamování automaticky kontroluje stavová data do úložiště odolného proti chybám (například DBFS, Azure Blob Storage) a po restartování je obnoví. Předpokládá se však, že schéma stavových dat zůstává v restartech stejné. To znamená, že mezi restartováními nejsou povolené všechny změny (tj. přidání, odstranění nebo úpravy schématu) stavových operací streamovacího dotazu. Tady je seznam stavových operací, jejichž schéma by se nemělo mezi restartováními měnit, aby se zajistilo obnovení stavu:
    • Agregace streamování: Například sdf.groupBy("a").agg(...). Jakákoli změna počtu nebo typů klíčů nebo agregací není povolena.
    • Odstranění duplicitních dat streamování: Například sdf.dropDuplicates("a"). Jakákoli změna počtu nebo typů klíčů nebo agregací není povolena.
    • Spojení datových proudů: Například sdf1.join(sdf2, ...) (tj. oba vstupy se generují pomocí sparkSession.readStream). Změny ve schématu nebo sloupcích spojování koňovitých nejsou povoleny. Změny typu spojení (vnější nebo vnitřní) nejsou povoleny. Jiné změny v podmínce spojení jsou špatně definované.
    • Libovolná stavová operace: Například sdf.groupByKey(...).mapGroupsWithState(...) nebo sdf.groupByKey(...).flatMapGroupsWithState(...). Jakákoli změna schématu uživatelem definovaného stavu a typ časového limitu není povolený. Všechny změny v rámci uživatelem definované funkce mapování stavu jsou povolené, ale sémantický účinek změny závisí na uživatelsky definované logice. Pokud opravdu chcete podporovat změny schématu stavu, můžete explicitně zakódovat nebo dekódovat složité stavové datové struktury do bajtů pomocí schématu kódování/dekódování, které podporuje migraci schématu. Pokud například uložíte stav jako bajty s kódováním Avro, můžete změnit schéma stavu Avro mezi restartováním dotazu, protože tím se obnoví binární stav.