Az első strukturált streamelési számítási feladat futtatása
Ez a cikk az első strukturált streamelési lekérdezések Azure Databricksen való futtatásához szükséges alapfogalmak kód példáit és magyarázatát tartalmazza. A strukturált streamelést közel valós idejű és növekményes feldolgozási számítási feladatokhoz használhatja.
A strukturált streamelés az egyik olyan technológia, amely a DLT-ben üzemelteti a streamelési táblákat. A Databricks a DLT használatát javasolja az összes új ETL-, betöltési és strukturált streamelési számítási feladathoz. Lásd Mi az a DLT?.
Feljegyzés
Bár a DLT kissé módosított szintaxist biztosít a streamelési táblák deklarálásához, a streamelési olvasások és átalakítások konfigurálásának általános szintaxisa az Azure Databricks összes streamelési használati esetére vonatkozik. A DLT az állapotinformációk, metaadatok és számos konfiguráció kezelésével is leegyszerűsíti a streamelést.
Az Auto Loader használatával streamelési adatokat olvashatunk az objektumtárolóból.
Az alábbi példa a JSON-adatok automatikus betöltővel való betöltését mutatja be, amely cloudFiles
formátumot és beállításokat jelöl. A schemaLocation
lehetőség lehetővé teszi a séma következtetését és fejlődését. Illessze be a következő kódot egy Databricks-jegyzetfüzetcellába, és futtassa a cellát egy streamelési adatkeret raw_df
létrehozásához:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Az Azure Databricks más olvasási műveleteihez hasonlóan a streamelt olvasás konfigurálása nem tölt be adatokat. A stream megkezdése előtt aktiválnia kell egy műveletet az adatokon.
Feljegyzés
Egy streamelt DataFrame display()
hívása elindít egy streamelési feladatot. A legtöbb strukturált streamelési használati esetben a streamet aktiváló műveletnek adatokat kell írnia egy fogadóba. Lásd a strukturált streameléssel kapcsolatos gyártási megfontolások.
Végezze el a stream-átalakítást
A strukturált streamelés az Azure Databricksben és a Spark SQL-ben elérhető legtöbb átalakítást támogatja. Az MLflow-modelleket akárUDF-ként is betöltheti, és transzformációként streamelési előrejelzéseket készíthet.
Az alábbi példakód egy egyszerű átalakítást hajt végre a betöltött JSON-adatok további információval való gazdagításához a Spark SQL-függvények használatával:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
Az eredmény lekérdezési transformed_df
utasításokat tartalmaz az adatforrásba érkező egyes rekordok betöltésére és átalakítására.
Feljegyzés
A strukturált streamelés kötetlen vagy végtelen adathalmazként kezeli az adatforrásokat. Ezért egyes átalakítások nem támogatottak a strukturált streamelési számítási feladatokban, mert végtelen számú elem rendezésére lenne szükség.
A legtöbb összesítéshez és számos illesztéshez vízjelekkel, ablakokkal és kimeneti móddal kell kezelni az állapotinformációkat. Lásd: Vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozásához.
Növekményes kötegírás végrehajtása a Delta Lake-be
Az alábbi példa egy megadott fájlútvonallal és ellenőrzőponttal ír a Delta Lake-be.
Fontos
Mindig adjon meg egy egyedi ellenőrzőpont-helyet minden konfigurált streamíróhoz. Az ellenőrzőpont egyedi identitást biztosít a streamhez, nyomon követve a streamlekérdezéshez tartozó összes feldolgozott rekordot és állapotinformációt.
Az availableNow
eseményindító beállítása arra utasítja a strukturált streamelést, hogy dolgozza fel a forrásadatkészlet összes korábban feldolgozatlan rekordját, majd állítsa le azt, hogy biztonságosan végrehajthassa a következő kódot anélkül, hogy a stream futtatásával kellene foglalkoznia:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
Ebben a példában nem érkeznek új rekordok az adatforrásba, ezért a kód ismételt végrehajtása nem fogad be új rekordokat.
Figyelmeztetés
A strukturált streamelés végrehajtása megakadályozhatja, hogy az automatikus leállítás leállítsa a számítási erőforrásokat. A váratlan költségek elkerülése érdekében mindenképpen állítsa le a streamelési lekérdezéseket.
Adatok olvasása a Delta Lake-ből, átalakítás és írás a Delta Lake-be
A Delta Lake széles körben támogatja a strukturált streamelést forrásként és fogadóként is. Lásd: Delta táblák streaming olvasását és írását.
Az alábbi példa szintaxisa azt mutatja be, hogy egy Delta-tábla összes új rekordja növekményesen betölthető, összekapcsolható egy másik Delta-tábla pillanatképével, és egy Delta-táblába írható:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
A forrástáblák olvasásához és a céltáblákba és a megadott ellenőrzőpont-helyre való íráshoz megfelelő engedélyekkel kell rendelkeznie. Töltse ki az éles zárójelekkel (<>
) ellátott összes paramétert az adatforrásokra és fogadókra vonatkozó releváns értékekkel.
Feljegyzés
A DLT teljes mértékben deklaratív szintaxist biztosít a Delta Lake-folyamatok létrehozásához, és automatikusan kezeli az olyan tulajdonságokat, mint az eseményindítók és az ellenőrzőpontok. Lásd Mi az a DLT?.
Adatok olvasása a Kafkából, átalakítás és írás a Kafkába
Az Apache Kafka és más üzenetkezelési buszok biztosítják a nagy adathalmazok számára elérhető legkisebb késést. Az Azure Databricks használatával átalakításokat alkalmazhat a Kafkából betöltött adatokra, majd adatokat írhat vissza a Kafkába.
Feljegyzés
Az adatok felhőobjektum-tárolóba történő írása további késési többletterhelést eredményez. Ha adatokat kíván tárolni egy üzenetkezelő buszból a Delta Lake-ben, és a lehető legkisebb késleltetést akarja elérni a streaming munkaterhelések során, a Databricks azt javasolja, hogy külön streaming feladatokat konfiguráljon az adatok tóházba történő betöltéséhez, és majdnem valós idejű átalakításokat alkalmazzon az alsóbb rétegbeli üzenetkezelő busz fogadói számára.
Az alábbi példakód egy egyszerű mintát mutat be a Kafkából származó adatok bővítéséhez egy Delta-táblában lévő adatokkal való összekapcsolással, majd a Kafkába való visszaírással:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
A Kafka szolgáltatáshoz való hozzáféréshez megfelelő engedélyekkel kell rendelkeznie. Töltse ki a hegyes zárójelekkel (<>
) ellátott összes paramétert az adatforrások és fogadók megfelelő értékeinek használatával. Tekintse meg az Apache Kafka és az Azure Databricks streamfeldolgozását.