Rekordok streamelése külső szolgáltatásokba DLT-fogadókkal
Fontos
A DLT sink
API nyilvános előzetes verziójú.
Ez a cikk ismerteti a DLT sink
API-t, valamint azt, hogy hogyan használható DLT-folyamatokkal olyan rekordok írására, amelyeket egy folyamat átalakít egy külső adatgyűjtőbe, például a Unity Catalog által felügyelt és külső táblákba, a Hive metaadattár-táblákba és az olyan eseménystreamelési szolgáltatásokba, mint az Apache Kafka vagy az Azure Event Hubs.
Mik azok a DLT-végpontok?
A DLT-fogadók lehetővé teszik átalakított adatok írását olyan célokra, mint az eseménystreamelési szolgáltatások, például az Apache Kafka vagy az Azure Event Hubs, valamint a Unity Catalog vagy a Hive metaadattár által felügyelt külső táblák. Korábban a DLT-folyamatban létrehozott streamtáblák és materializált nézetek csak az Azure Databricks által felügyelt Delta-táblákban tárolhatók. A tárolók használatával mostantól több lehetősége van a DLT-folyamatok kimenetének megőrzésére.
Mikor érdemes DLT-mosdókat használni?
A Databricks a DLT-fogadók használatát javasolja, ha szükséges:
- Olyan üzemeltetési használati eseteket hozhat létre, mint a csalásészlelés, a valós idejű elemzések és az ügyféljavaslatok. A működési használati esetek általában egy üzenetbuszból, például egy Apache Kafka-témakörből olvasnak be adatokat, majd alacsony késéssel dolgozzák fel az adatokat, és visszaírják a feldolgozott rekordokat egy üzenetbuszba. Ez a megközelítés lehetővé teszi, hogy alacsonyabb késést érjen el, ha nem ír vagy olvas a felhőbeli tárolóból.
- Átalakított adatok írása a DLT-folyamatokból egy külső Delta-példány által felügyelt táblákba, beleértve a Unity Katalógus által felügyelt táblákat, a külső táblákat és a Hive metaadattár táblákat.
- Hajtsa végre a fordított nyerés-átalakítás-terhelést (ETL) a Databricksen kívüli célpontokba, például az Apache Kafka témákba. Ezzel a módszerrel hatékonyan támogathatja azokat az eseteket, amikor az adatokat a Unity Catalog-táblákon vagy más Databricks által felügyelt tárolón kívül kell olvasni vagy használni.
Hogyan használhatom a DLT-fogadókat?
Jegyzet
- Csak
spark.readStream
ésdlt.read_stream
használó streamlekérdezések támogatottak. A Batch-lekérdezések nem támogatottak. - A fogadókba csak
append_flow
lehet írni. Más folyamatok, mint aapply_changes
, nincsenek támogatva. - A teljes frissítés futtatása nem törli a korábban kiszámított eredmények adatait a tárolókban. Ez azt jelenti, hogy az újrafeldolgozott adatok hozzá lesznek fűzve a tárolóhoz, és a meglévő adatok nem módosulnak.
Mivel az eseményadatok streamelési forrásból kerülnek a DLT-folyamatba, ezeket az adatokat DLT-funkciókkal dolgozhatja fel és finomíthatja, majd a hozzáfűzési folyamat feldolgozásával streamelheti az átalakított adatrekordokat egy DLT-fogadóba. Ezt a nyelőt a create_sink()
függvénnyel hozhatja létre. További információért a create_sink
függvény használatával kapcsolatban lásd a fogadó API-referenciát.
DLT-fogadó implementálásához kövesse az alábbi lépéseket:
- Állítson be egy DLT-folyamatot a streamelési eseményadatok feldolgozásához, és készítse elő az adatrekordokat a DLT-fogadóba való íráshoz.
- Konfigurálja és hozza létre a DLT-fogadót az előnyben részesített fogadóformátum használatára.
- Egy hozzáfűző mechanizmus használatával írja az előkészített rekordokat az adatfogadóba.
Ezeket a lépéseket a témakör többi része ismerteti.
DLT-folyamat beállítása a rekordok fogadóba való írásához való előkészítéséhez
Az első lépés egy DLT csővezeték beállítása, amely átalakítja a nyers eseményfolyam adatait a célhelyre írandó előkészített adatokká.
A folyamat jobb megértéséhez kövesse az alábbi példát egy olyan DLT-folyamatra, amely a Databricks wikipedia-datasets
mintaadataiból dolgozza fel a clickstream eseményadatokat. Ez a folyamat elemzi a nyers adatkészletet, hogy azonosítsa az Apache Spark dokumentációs lapjára hivatkozó Wikipedia-lapokat, és fokozatosan finomítja ezeket az adatokat csak azokra a táblázatsorokra, ahol a hivatkozó hivatkozás Apache_Spark.
Ebben a példában a DLT-folyamat a medallion architektúrahasználatával van felépítve, amely különböző rétegekbe rendezi az adatokat a minőség és a feldolgozási hatékonyság növelése érdekében.
Első lépésként töltse be a nyers JSON-rekordokat az adathalmazból a bronz rétegbe Automatikus betöltőhasználatával. Ez a Python-kód bemutatja, hogyan hozhat létre egy clickstream_raw
nevű streamelési táblát, amely a forrásból származó nyers, feldolgozatlan adatokat tartalmazza:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
A kód futtatása után az adatok a Medallion-architektúra "bronz" (vagy "nyers adatok") szintjén lesznek, ezért törölni kell őket. A következő lépés az adatokat az "ezüst" szintre pontosítja, ami magában foglalja az adattípusok és az oszlopnevek eltávolítását, valamint a DLT-elvárások használatát az adatintegritás biztosítása érdekében.
Az alábbi kód bemutatja, hogyan teheti ezt meg a bronzréteg adatainak a clickstream_clean
ezüsttáblába való megtisztításával és érvényesítésével:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
A folyamatstruktúra "arany" rétegének fejlesztéséhez szűrje a megtisztított kattintásfolyam-adatokat, hogy elkülönítse azokat a bejegyzéseket, ahol a hivatkozó oldal Apache_Spark
. Ebben az utolsó kód példában csak a cél fogadótáblába való íráshoz szükséges oszlopokat választja ki.
Az alábbi kód bemutatja, hogyan hozhat létre egy spark_referrers
nevű táblát, amely az aranyréteget jelöli:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Az adatelőkészítési folyamat befejezése után konfigurálnia kell a célhelyeket, amelyekbe a megtisztított adatok íródnak.
DLT-fogadó konfigurálása
A Databricks háromféle kimeneti célt támogat, amelyekbe az adatfolyamból feldolgozott rekordokat írhatja:
- Delta asztali mosogatók
- Apache Kafka adatfogyasztók.
- Azure Event Hubs-kimenetek
Az alábbiakban példákat láthat a Delta, a Kafka és az Azure Event Hubs fogadóinak konfigurációira:
Delta mosogatók
Delta-fogadó létrehozása fájl elérési útja alapján:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Delta-fogadó létrehozása táblanév alapján egy teljesen meghatározott katalógus és séma útvonal használatával:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
A Kafka és az Azure Event Hubs adatfogadói
Ez a kód az Apache Kafka és az Azure Event Hubs fogadók esetében is működik.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Most, hogy konfigurálta a fogadót, és elkészült a DLT-folyamat, megkezdheti a feldolgozott rekordok átvitelét a fogadóba.
Írás hozzáfűző folyamattal rendelkező DLT-fogadóba
Azután, hogy a fogadó megfelelően konfigurálva van, a következő lépés a feldolgozott rekordok írása erre a célra úgy, hogy a hozzáfűzési adatfolyam kimenete a fogadóra irányuljon. Ehhez úgy kell megadnia a fogadót, hogy az a target
értéket viselje a append_flow
dekorátorban.
- A Unity Catalog által felügyelt és külső táblák esetében használja a
delta
formátumot, és adja meg az elérési utat vagy a tábla nevét a beállítások között. A DLT-folyamatokat konfigurálni kell a Unity Catalog használatára. - Apache Kafka-témakörök esetén használja a
kafka
formátumot, és adja meg a témakör nevét, kapcsolati adatait és hitelesítési adatait a beállítások között. Ezek ugyanazok a lehetőségek, melyeket a Spark strukturált streamelési Kafka-fogadó támogat. Lásd: A Kafka strukturált streamelési írójának konfigurálása. - Az Azure Event Hubs esetében használja a
kafka
formátumot, és adja meg az Event Hubs nevét, kapcsolati adatait és hitelesítési adatait a beállítások között. Ezek ugyanazokat a lehetőségeket támogatják a Spark strukturált streamelési eseményközpontok fogadójában, amelyek a Kafka-felületet használják. Lásd: Szolgáltatásnév-hitelesítés a Microsoft Entra-azonosítóval és az Azure Event Hubs. - Hive metaadattártáblák esetén használja a
delta
formátumot, és adja meg az elérési utat vagy a tábla nevét a beállítások között. A DLT-folyamatokat úgy kell konfigurálni, hogy a Hive metaadattárat használják.
Az alábbiakban példákat láthat arra, hogyan állíthat be folyamatokat a Delta, a Kafka és az Azure Event Hubs fogadóiba való íráshoz a DLT-folyamat által feldolgozott rekordokkal.
Delta mosogató
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
A Kafka és az Azure Event Hubs végpontok
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
A value
paraméter kötelező egy Azure Event Hubs-fogadó esetében. További paraméterek, például key
, partition
, headers
és topic
nem kötelezőek.
A append_flow
dekoratőrrel kapcsolatos további részletekért lásd: A hozzáfűzési folyamat használata több forrásstreamből származó streamek streamelési táblába való írásához.
korlátozások
Csak a Python API támogatott. Az SQL nem támogatott.
Csak
spark.readStream
ésdlt.read_stream
használó streamlekérdezések támogatottak. A Batch-lekérdezések nem támogatottak.Csak a
append_flow
használható adatfogadó írására. Egyéb adatfolyamok, például aapply_changes
, nem támogatottak, és DLT-adathalmaz-definíciókban nem használhat fogadót. A következők például nem támogatottak:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
Delta sinkek esetén a tábla nevét teljes körűen megadva kell használni. A Unity Catalog által felügyelt külső táblák esetében a tábla nevének
<catalog>.<schema>.<table>
formában kell lennie. A Hive metastore-ja<schema>.<table>
formájában kell lennie.A
FullRefresh
futtatása nem törli a korábban kiszámított eredmények adatait a fogadókban. Ez azt jelenti, hogy az újrafeldolgozott adatok hozzá lesznek adva az adattárolóhoz, és a meglévő adatok nem módosulnak.A DLT-elvárások nincsnek támogatva.