Sdílet prostřednictvím


Spuštění první úlohy strukturovaného streamování

Tento článek obsahuje příklady kódu a vysvětlení základních konceptů nezbytných ke spuštění prvních dotazů strukturovaného streamování v Azure Databricks. Strukturované streamování můžete použít pro úlohy téměř v reálném čase a přírůstkové zpracování.

Strukturované streamování je jednou z několika technologií, které pohánějí streamovací tabulky v DLT. Databricks doporučuje používat DLT pro všechny nové úlohy ETL, příjem dat a strukturovaného streamování. Podívejte se na Co je to DLT?.

Poznámka:

Zatímco DLT poskytuje mírně upravenou syntaxi pro deklarování streamovaných tabulek, obecná syntaxe konfigurace čtení a transformací streamování platí pro všechny případy použití streamování v Azure Databricks. DLT také zjednodušuje streamování tím, že spravuje informace o stavu, metadata a řadu konfigurací.

Použít automatický zavaděč pro čtení streamovaných dat z úložiště objektů

Následující příklad ukazuje načtení dat JSON pomocí Auto Loaderu, který používá cloudFiles k označení formátu a možností. Možnost schemaLocation umožňuje odvozovat a vyvíjet schéma. Do buňky poznámkového bloku Databricks vložte následující kód a spusťte buňku, aby se vytvořil datový rámec streamování s názvem raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Stejně jako jiné operace čtení v Azure Databricks se při konfiguraci streamovaného čtení ve skutečnosti nenačítají data. Před zahájením datového proudu je nutné aktivovat akci s daty.

Poznámka:

Volání display() na streamovaném datovém rámci spustí streamovací úlohu. U většiny případů použití strukturovaného streamování by akce, která aktivuje datový proud, měla zapisovat data do jímky. Viz aspekty produkce strukturovaného streamování.

Proveďte transformaci streamování

Strukturované streamování podporuje většinu transformací, které jsou k dispozici v Azure Databricks a Spark SQL. Modely MLflow můžete dokonce nahrát jako UDF a provádět predikce v reálném čase prostřednictvím transformace.

Následující příklad kódu dokončí jednoduchou transformaci pro obohacení přijatých dat JSON dalšími informacemi pomocí funkcí Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Výsledný transformed_df obsahuje dotazy k načtení a transformaci každého záznamu při jeho příchodu do zdroje dat.

Poznámka:

Strukturované streamování zpracovává zdroje dat jako nevázané nebo nekonečné datové sady. Některé transformace nejsou v úlohách strukturovaného streamování podporované, protože by vyžadovaly řazení nekonečného počtu položek.

Většina agregací a mnoho spojení vyžaduje správu informací o stavu pomocí vodoznaků, oken a výstupního režimu. Viz Použití vodoznaků pro řízení prahových hodnot zpracování dat.

Provedení přírůstkového dávkového zápisu do Delta Lake

Následující příklad zapisuje data do Delta Lake pomocí zadané cesty k souboru a kontrolního bodu.

Důležité

Vždy se ujistěte, že pro každý nakonfigurovaný zapisovač streamování zadáte jedinečné umístění kontrolního bodu. Kontrolní bod poskytuje jedinečnou identitu streamu, sledování všech zpracovaných záznamů a informací o stavu přidružených k dotazu streamování.

Nastavení availableNow triggeru dává strukturovanému streamování pokyn ke zpracování všech dříve nezpracovaných záznamů ze zdrojové datové sady a následné vypnutí, takže můžete bezpečně spustit následující kód, aniž byste se museli starat o opuštění streamu spuštěného:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

V tomto příkladu do našeho zdroje dat nepřicházejí žádné nové záznamy, takže opakované spuštění tohoto kódu neingestuje nové záznamy.

Varování

Vykonávání strukturovaného streamování může zabránit automatickému ukončení výpočetních prostředků. Aby nedošlo k neočekávaným nákladům, nezapomeňte ukončit streamovací dotazy.

Čtení dat z Delta Lake, transformace a zápisu do Delta Lake

Delta Lake má rozsáhlou podporu pro práci se strukturovaným streamováním jako zdrojem i jímkou. Viz čtení a zápisy streamovaných tabulek Delta.

Následující příklad ukazuje ukázkovou syntaxi pro přírůstkové načtení všech nových záznamů z tabulky Delta, jejich spojení se snímkem jiné tabulky Delta a jejich zápis do tabulky Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Musíte mít nakonfigurovaná správná oprávnění ke čtení zdrojových tabulek a zápisu do cílových tabulek a zadaného umístění kontrolního bodu. Vyplňte všechny parametry označené úhlovými závorkami (<>) s použitím relevantních hodnot pro zdroje dat a jímky.

Poznámka:

DLT poskytuje plně deklarativní syntaxi pro vytváření kanálů Delta Lake a spravuje vlastnosti, jako jsou triggery a kontrolní body, automaticky. Podívejte se na Co je to DLT?.

Čtení dat ze systému Kafka, transformace a zápisu do Systému Kafka

Apache Kafka a další sběrnice zasílání zpráv poskytují některé z nejnižších latencí dostupných pro velké datové sady. Azure Databricks můžete použít k aplikování transformací na data ingestované z Kafka a následnému zápisu dat zpět do Kafka.

Poznámka:

Zápis dat do cloudového úložiště objektů zvyšuje režijní náklady na latenci. Pokud chcete ukládat data ze sběrnice zasílání zpráv do Delta Lake, ale požadujete co nejnižší latenci pro streamovací úlohy, Databricks doporučuje nakonfigurovat samostatné streamovací úlohy pro ingestování dat do datového lakehouse a použití transformačních procesů téměř v reálném čase pro podřízené sběrnice zasílání zpráv.

Následující příklad kódu ukazuje jednoduchý vzor pro obohacení dat ze systému Kafka jejich spojením s daty v tabulce Delta a následným zápisem zpět do Systému Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Pro přístup ke službě Kafka musíte mít nakonfigurovaná správná oprávnění. Vyplňte všechny parametry označené úhlovými závorkami (<>) s použitím relevantních hodnot pro zdroje dat a jímky. Viz Zpracování datových proudů s využitím Apache Kafka a Azure Databricks.