Megosztás a következőn keresztül:


Rekordok streamelése külső szolgáltatásokba DLT-fogadókkal

Fontos

A DLT sink API nyilvános előzetes verziójú.

Ez a cikk ismerteti a DLT sink API-t, valamint azt, hogy hogyan használható DLT-folyamatokkal olyan rekordok írására, amelyeket egy folyamat átalakít egy külső adatgyűjtőbe, például a Unity Catalog által felügyelt és külső táblákba, a Hive metaadattár-táblákba és az olyan eseménystreamelési szolgáltatásokba, mint az Apache Kafka vagy az Azure Event Hubs.

Mik azok a DLT-végpontok?

A DLT-fogadók lehetővé teszik átalakított adatok írását olyan célokra, mint az eseménystreamelési szolgáltatások, például az Apache Kafka vagy az Azure Event Hubs, valamint a Unity Catalog vagy a Hive metaadattár által felügyelt külső táblák. Korábban a DLT-folyamatban létrehozott streamtáblák és materializált nézetek csak az Azure Databricks által felügyelt Delta-táblákban tárolhatók. A tárolók használatával mostantól több lehetősége van a DLT-folyamatok kimenetének megőrzésére.

Mikor érdemes DLT-mosdókat használni?

A Databricks a DLT-fogadók használatát javasolja, ha szükséges:

  • Olyan üzemeltetési használati eseteket hozhat létre, mint a csalásészlelés, a valós idejű elemzések és az ügyféljavaslatok. A működési használati esetek általában egy üzenetbuszból, például egy Apache Kafka-témakörből olvasnak be adatokat, majd alacsony késéssel dolgozzák fel az adatokat, és visszaírják a feldolgozott rekordokat egy üzenetbuszba. Ez a megközelítés lehetővé teszi, hogy alacsonyabb késést érjen el, ha nem ír vagy olvas a felhőbeli tárolóból.
  • Átalakított adatok írása a DLT-folyamatokból egy külső Delta-példány által felügyelt táblákba, beleértve a Unity Katalógus által felügyelt táblákat, a külső táblákat és a Hive metaadattár táblákat.
  • Hajtsa végre a fordított nyerés-átalakítás-terhelést (ETL) a Databricksen kívüli célpontokba, például az Apache Kafka témákba. Ezzel a módszerrel hatékonyan támogathatja azokat az eseteket, amikor az adatokat a Unity Catalog-táblákon vagy más Databricks által felügyelt tárolón kívül kell olvasni vagy használni.

Hogyan használhatom a DLT-fogadókat?

Jegyzet

  • Csak spark.readStream és dlt.read_stream használó streamlekérdezések támogatottak. A Batch-lekérdezések nem támogatottak.
  • A fogadókba csak append_flow lehet írni. Más folyamatok, mint a apply_changes, nincsenek támogatva.
  • A teljes frissítés futtatása nem törli a korábban kiszámított eredmények adatait a tárolókban. Ez azt jelenti, hogy az újrafeldolgozott adatok hozzá lesznek fűzve a tárolóhoz, és a meglévő adatok nem módosulnak.

Mivel az eseményadatok streamelési forrásból kerülnek a DLT-folyamatba, ezeket az adatokat DLT-funkciókkal dolgozhatja fel és finomíthatja, majd a hozzáfűzési folyamat feldolgozásával streamelheti az átalakított adatrekordokat egy DLT-fogadóba. Ezt a nyelőt a create_sink() függvénnyel hozhatja létre. További információért a create_sink függvény használatával kapcsolatban lásd a fogadó API-referenciát.

DLT-fogadó implementálásához kövesse az alábbi lépéseket:

  1. Állítson be egy DLT-folyamatot a streamelési eseményadatok feldolgozásához, és készítse elő az adatrekordokat a DLT-fogadóba való íráshoz.
  2. Konfigurálja és hozza létre a DLT-fogadót az előnyben részesített fogadóformátum használatára.
  3. Egy hozzáfűző mechanizmus használatával írja az előkészített rekordokat az adatfogadóba.

Ezeket a lépéseket a témakör többi része ismerteti.

DLT-folyamat beállítása a rekordok fogadóba való írásához való előkészítéséhez

Az első lépés egy DLT csővezeték beállítása, amely átalakítja a nyers eseményfolyam adatait a célhelyre írandó előkészített adatokká.

A folyamat jobb megértéséhez kövesse az alábbi példát egy olyan DLT-folyamatra, amely a Databricks wikipedia-datasets mintaadataiból dolgozza fel a clickstream eseményadatokat. Ez a folyamat elemzi a nyers adatkészletet, hogy azonosítsa az Apache Spark dokumentációs lapjára hivatkozó Wikipedia-lapokat, és fokozatosan finomítja ezeket az adatokat csak azokra a táblázatsorokra, ahol a hivatkozó hivatkozás Apache_Spark.

Ebben a példában a DLT-folyamat a medallion architektúrahasználatával van felépítve, amely különböző rétegekbe rendezi az adatokat a minőség és a feldolgozási hatékonyság növelése érdekében.

Első lépésként töltse be a nyers JSON-rekordokat az adathalmazból a bronz rétegbe Automatikus betöltőhasználatával. Ez a Python-kód bemutatja, hogyan hozhat létre egy clickstream_rawnevű streamelési táblát, amely a forrásból származó nyers, feldolgozatlan adatokat tartalmazza:

import dlt

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
 comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

A kód futtatása után az adatok a Medallion-architektúra "bronz" (vagy "nyers adatok") szintjén lesznek, ezért törölni kell őket. A következő lépés az adatokat az "ezüst" szintre pontosítja, ami magában foglalja az adattípusok és az oszlopnevek eltávolítását, valamint a DLT-elvárások használatát az adatintegritás biztosítása érdekében.

Az alábbi kód bemutatja, hogyan teheti ezt meg a bronzréteg adatainak a clickstream_clean ezüsttáblába való megtisztításával és érvényesítésével:

@dlt.table(
 comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
 table_properties={
   "quality": "silver"
 }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
 return (
   spark.readStream.table("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
 )

A folyamatstruktúra "arany" rétegének fejlesztéséhez szűrje a megtisztított kattintásfolyam-adatokat, hogy elkülönítse azokat a bejegyzéseket, ahol a hivatkozó oldal Apache_Spark. Ebben az utolsó kód példában csak a cél fogadótáblába való íráshoz szükséges oszlopokat választja ki.

Az alábbi kód bemutatja, hogyan hozhat létre egy spark_referrers nevű táblát, amely az aranyréteget jelöli:

@dlt.table(
 comment="A table of the most common pages that link to the Apache Spark page.",
 table_properties={
   "quality": "gold"
 }
)
def spark_referrers():
 return (
   spark.readStream.table("clickstream_clean")
     .filter(expr("current_page_title == 'Apache_Spark'"))
     .withColumnRenamed("previous_page_title", "referrer")
     .select("referrer", "current_page_id", "current_page_title", "click_count")
 )

Az adatelőkészítési folyamat befejezése után konfigurálnia kell a célhelyeket, amelyekbe a megtisztított adatok íródnak.

DLT-fogadó konfigurálása

A Databricks háromféle kimeneti célt támogat, amelyekbe az adatfolyamból feldolgozott rekordokat írhatja:

  • Delta asztali mosogatók
  • Apache Kafka adatfogyasztók.
  • Azure Event Hubs-kimenetek

Az alábbiakban példákat láthat a Delta, a Kafka és az Azure Event Hubs fogadóinak konfigurációira:

Delta mosogatók

Delta-fogadó létrehozása fájl elérési útja alapján:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

Delta-fogadó létrehozása táblanév alapján egy teljesen meghatározott katalógus és séma útvonal használatával:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "my_catalog.my_schema.my_table" }
)

A Kafka és az Azure Event Hubs adatfogadói

Ez a kód az Apache Kafka és az Azure Event Hubs fogadók esetében is működik.

topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")

eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
  + f' required username="$ConnectionString" password="{connection_string}";'

dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
    "topic": topic_name
  }
)

Most, hogy konfigurálta a fogadót, és elkészült a DLT-folyamat, megkezdheti a feldolgozott rekordok átvitelét a fogadóba.

Írás hozzáfűző folyamattal rendelkező DLT-fogadóba

Azután, hogy a fogadó megfelelően konfigurálva van, a következő lépés a feldolgozott rekordok írása erre a célra úgy, hogy a hozzáfűzési adatfolyam kimenete a fogadóra irányuljon. Ehhez úgy kell megadnia a fogadót, hogy az a target értéket viselje a append_flow dekorátorban.

  • A Unity Catalog által felügyelt és külső táblák esetében használja a delta formátumot, és adja meg az elérési utat vagy a tábla nevét a beállítások között. A DLT-folyamatokat konfigurálni kell a Unity Catalog használatára.
  • Apache Kafka-témakörök esetén használja a kafka formátumot, és adja meg a témakör nevét, kapcsolati adatait és hitelesítési adatait a beállítások között. Ezek ugyanazok a lehetőségek, melyeket a Spark strukturált streamelési Kafka-fogadó támogat. Lásd: A Kafka strukturált streamelési írójának konfigurálása.
  • Az Azure Event Hubs esetében használja a kafka formátumot, és adja meg az Event Hubs nevét, kapcsolati adatait és hitelesítési adatait a beállítások között. Ezek ugyanazokat a lehetőségeket támogatják a Spark strukturált streamelési eseményközpontok fogadójában, amelyek a Kafka-felületet használják. Lásd: Szolgáltatásnév-hitelesítés a Microsoft Entra-azonosítóval és az Azure Event Hubs.
  • Hive metaadattártáblák esetén használja a delta formátumot, és adja meg az elérési utat vagy a tábla nevét a beállítások között. A DLT-folyamatokat úgy kell konfigurálni, hogy a Hive metaadattárat használják.

Az alábbiakban példákat láthat arra, hogyan állíthat be folyamatokat a Delta, a Kafka és az Azure Event Hubs fogadóiba való íráshoz a DLT-folyamat által feldolgozott rekordokkal.

Delta mosogató

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

A Kafka és az Azure Event Hubs végpontok

@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

A value paraméter kötelező egy Azure Event Hubs-fogadó esetében. További paraméterek, például key, partition, headersés topic nem kötelezőek.

A append_flow dekoratőrrel kapcsolatos további részletekért lásd: A hozzáfűzési folyamat használata több forrásstreamből származó streamek streamelési táblába való írásához.

korlátozások

  • Csak a Python API támogatott. Az SQL nem támogatott.

  • Csak spark.readStream és dlt.read_stream használó streamlekérdezések támogatottak. A Batch-lekérdezések nem támogatottak.

  • Csak a append_flow használható adatfogadó írására. Egyéb adatfolyamok, például a apply_changes, nem támogatottak, és DLT-adathalmaz-definíciókban nem használhat fogadót. A következők például nem támogatottak:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • Delta sinkek esetén a tábla nevét teljes körűen megadva kell használni. A Unity Catalog által felügyelt külső táblák esetében a tábla nevének <catalog>.<schema>.<table>formában kell lennie. A Hive metastore-ja <schema>.<table>formájában kell lennie.

  • A FullRefresh futtatása nem törli a korábban kiszámított eredmények adatait a fogadókban. Ez azt jelenti, hogy az újrafeldolgozott adatok hozzá lesznek adva az adattárolóhoz, és a meglévő adatok nem módosulnak.

  • A DLT-elvárások nincsnek támogatva.

Erőforrások