Megosztás a következőn keresztül:


Streamfeldolgozás az Apache Kafka és az Azure Databricks használatával

Ez a cikk azt ismerteti, hogyan használhatja az Apache Kafkát forrásként vagy fogadóként, ha strukturált streamelési számítási feladatokat futtat az Azure Databricksen.

További Kafka-információkért tekintse meg a Kafka dokumentációját.

Adatok olvasása a Kafkából

Az alábbiakban egy példa látható a Kafkából származó streamelési olvasásra:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Az Azure Databricks a Kafka-adatforrások kötegelt olvasási szemantikáját is támogatja, ahogyan az a következő példában is látható:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Növekményes kötegbetöltés esetén a Databricks a Kafka használatát javasolja.Trigger.AvailableNow Lásd a növekményes kötegfeldolgozás konfigurálását.

A Databricks Runtime 13.3 LTS-ben és újabb verziókban az Azure Databricks egy SQL-függvényt biztosít a Kafka-adatok olvasásához. Az SQL-sel való streamelés csak DLT-ben vagy a Databricks SQL streamtábláiban támogatott. Lásd read_kafka táblaértékelt függvény.

Kafka strukturált streamolvasó konfigurálása

Az Azure Databricks adatformátumként biztosítja a kafka kulcsszót a Kafka 0.10+-kapcsolatok konfigurálásához.

A Kafka leggyakoribb konfigurációi a következők:

Többféleképpen is megadhatja, hogy mely témakörökre szeretne feliratkozni. Csak az egyik paramétert kell megadnia:

Lehetőség Érték Leírás
feliratkozás Témakörök vesszővel tagolt listája. A feliratkozni kívánt témakörlista.
feliratkozási minta Java regex karakterlánc. A témakör(ek)re való feliratkozáshoz használt minta.
kijelöl JSON string {"topicA":[0,1],"topic":[2,4]}. Fogyasztandó konkrét témapartíciók.

Egyéb figyelemre méltó konfigurációk:

Lehetőség Érték Alapértelmezett érték Leírás
kafka.bootstrap.servers A gazdagép:port megadására szolgáló vesszővel elválasztott lista. üres [Kötelező] A Kafka-konfigurációbootstrap.servers. Ha úgy találja, hogy a Kafka nem tartalmaz adatokat, először ellenőrizze a közvetítő címlistáját. Ha a közvetítő címlistája helytelen, előfordulhat, hogy nem történt hiba. Ennek az az oka, hogy a Kafka-kliens feltételezi, hogy a közvetítők végül elérhetővé válnak, és hálózati hibák esetén végtelenségig próbálkozik.
hibaAdatvesztéskor true vagy false. true [Nem kötelező] A lekérdezés meghiúsulása, ha lehetséges, hogy az adatok elvesznek. A lekérdezések különféle okokból, például törölt témák vagy a témák feldolgozás előtti csonkolása miatt véglegesen nem tudnak adatokat beolvasni a Kafkából. Megpróbáljuk konzervatívan megbecsülni, hogy az adatok esetleg elvesztek-e vagy sem. Néha ez hamis riasztásokat okozhat. Állítsa be ezt a beállítást false, ha az nem a várt módon működik, vagy azt szeretné, hogy a lekérdezés az adatvesztés ellenére is feldolgozható legyen.
minPartitions Egész szám >= 0, 0 = letiltva. 0 (kikapcsolva) [Nem kötelező] A Kafkából beolvasandó partíciók minimális száma. A Spark konfigurálható úgy, hogy tetszőleges minimális partíciót használjon a Kafkából való olvasáshoz a minPartitions beállítás használatával. A Spark általában 1-1 leképezést alkalmaz a Kafka témapartíciók és a Kafka-ból fogyasztó Spark partíciók között. Ha a minPartitions beállítást a Kafka topicPartitionsnál nagyobb értékre állítja be, a Spark kisebb darabokra osztja a nagy Kafka-partíciókat. Ez a beállítás beállítható csúcsterhelések, adateltérés és a stream elmaradása esetén a feldolgozási sebesség növelése érdekében. Ennek költsége a Kafka-fogyasztók inicializálása minden egyes indításkor, ami negatívan befolyásolhatja a teljesítményt, ha SSL-t használ a Kafkához való csatlakozáskor.
kafka.group.id Kafka fogyasztói csoport azonosítója. nincs beállítva [Nem kötelező] A Kafkából való olvasáshoz használandó csoportazonosító. Óvatosan használja ezt. Alapértelmezés szerint minden lekérdezés egyedi csoportazonosítót hoz létre az adatok olvasásához. Ez biztosítja, hogy minden lekérdezés saját fogyasztói csoportokkal rendelkezik, amelyek nem ütköznek más felhasználók beavatkozásával, és ezért elolvashatják az előfizetett témakörök összes partícióját. Bizonyos esetekben (például Kafka-csoportalapú engedélyezés) érdemes lehet adott engedélyezett csoportazonosítókat használni az adatok olvasásához. Igény szerint beállíthatja a csoportazonosítót. Ezt azonban rendkívül körültekintően végezze el, mivel ez váratlan viselkedést okozhat.
  • Az azonos csoportazonosítóval egyidejűleg futó lekérdezések (kötegelt és streameléses) valószínűleg zavarják egymást, ezért az egyes lekérdezések csak az adatok egy részét olvassák be.
  • Ez akkor is előfordulhat, ha a lekérdezések gyors egymásutánban indulnak el/indulnak újra. Az ilyen problémák minimalizálása érdekében állítsa a Kafka fogyasztói konfigurációs session.timeout.ms nagyon kicsire.
startingOffsets legkorábbi , legkésőbbi legújabb [Nem kötelező] A lekérdezés indításának kezdőpontja lehet "legkorábbi," ami a legkorábbi eltolásoktól van, vagy egy JSON-sztring, amely megadja az egyes TopicPartition-k számára a kezdő eltolást. A jsonban -2 eltolásként használható a legkorábbi, -1 a legújabbra való hivatkozáshoz. Megjegyzés: Kötegelt lekérdezések esetén a legújabb adat lekérdezése (akár implicit módon, vagy a JSON-ban a -1 értékkel) nem engedélyezett. Streamelési lekérdezések esetén ez csak új lekérdezés indításakor érvényes, és az újrakezdés mindig onnan indul el, ahol a lekérdezés abbahagyta. A lekérdezés során újonnan felfedezett partíciók feldolgozása a lehető legkorábbi időpontban kezdődik.

További választható konfigurációkért lásd a strukturált streamelési Kafka-integrációs útmutatót .

Kafka-rekordok sémája

A Kafka-rekordok sémája a következő:

Oszlop Típus
kulcs bináris
Érték bináris
téma húr
partíció egész
eltolás hosszú
időbélyeg hosszú
időbélyegtípus egész

A key és a value mindig ByteArrayDeserializer szerinti bájttömbként van deszerializálva. DataFrame-műveletek (például cast("string")) használatával explicit módon deszerializálhatja a kulcsokat és az értékeket.

Adatok írása a Kafkába

Az alábbiakban egy példa látható a Kafkára történő adatfolyam-írásra:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Az Azure Databricks a Kafka-adatgyűjtőkbe történő kötegelt írási szemantikát is támogatja, ahogyan az a következő példában is látható:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

A Kafka strukturált streamelési író konfigurálása

Fontos

A Databricks Runtime 13.3 LTS és újabb verziója tartalmazza a kódtár újabb verzióját, amely alapértelmezés szerint lehetővé teszi az kafka-clients idempotens írást. Ha egy Kafka-fogadó a 2.8.0-s vagy újabb verziót használja a konfigurált ACL-ekkel, de IDEMPOTENT_WRITE nincs engedélyezve, az írás a hibaüzenettel org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error statemeghiúsul.

A hiba megoldásához frissítsen a Kafka 2.8.0-s vagy újabb verziójára, vagy állítsa be .option(“kafka.enable.idempotence”, “false”) a strukturált streamelési író konfigurálását.

A DataStreamWriter számára biztosított séma a Kafka-célponttal kommunikál. A következő mezőket használhatja:

Oszlop neve Kötelező vagy választható Típus
key választható STRING vagy BINARY
value kötelező STRING vagy BINARY
headers választható ARRAY
topic nem kötelező (figyelmen kívül hagyva, ha topic írói opcióként van beállítva) STRING
partition választható INT

A Kafkába való írás során a következő gyakori beállítások vannak beállítva:

Lehetőség Érték Alapértelmezett érték Leírás
kafka.boostrap.servers A <host:port> vesszővel tagolt listája Nincs [Kötelező] A Kafka-konfiguráció bootstrap.servers .
topic STRING nincs beállítva [Nem kötelező] Beállítja a témakört az összes megírandó sorhoz. Ez a beállítás felülírja az adatokban található témaköroszlopokat.
includeHeaders BOOLEAN false A Kafka-fejlécek belefoglalása a sorba [nem kötelező].

További választható konfigurációkért lásd a strukturált streamelési Kafka-integrációs útmutatót .

Kafka-metrikák lekérése

A streamelési lekérdezés által a legutóbbi elérhető eltoláshoz képest lemaradt eltolások számának átlagát, minimumát és maximumát az összes előfizetett témakör esetében a avgOffsetsBehindLatest, a maxOffsetsBehindLatest, és a minOffsetsBehindLatest metrikákkal lekérheti. Lásd: A metrikák interaktív olvasása.

Feljegyzés

A Databricks Runtime 9.1-ben és újabb verziókban érhető el.

A estimatedTotalBytesBehindLatestértékének vizsgálatával lekérheti a lekérdezési folyamat által nem felhasznált bájtok becsült teljes számát az előfizetett témakörökből. Ez a becslés az elmúlt 300 másodpercben feldolgozott kötegeken alapul. A becslés alapjául megadott időkeret módosítható úgy, hogy a beállítást bytesEstimateWindowLength egy másik értékre állítja. Ha például 10 percre szeretné beállítani:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Ha egy jegyzetfüzetben futtatja a streamet, a streamelési lekérdezés folyamatának irányítópultján, a Nyers adatok lapon láthatja ezeket a metrikákat:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Az Azure Databricks és a Kafka csatlakoztatása SSL használatával

A Kafka SSL-kapcsolatok bekapcsolásához kövesse a Confluent dokumentációban található utasításokat a SSL-alapú titkosítás és hitelesítésrésznél. Megadhatja az ott leírt konfigurációkat, előtaggal kafka., lehetőségként. Például megadhatja a bizalmi tár helyét a tulajdonságban kafka.ssl.truststore.location.

A Databricks a következőket javasolja:

  • A tanúsítványokat a felhőobjektum-tárolóban tárolhatja. A tanúsítványokhoz való hozzáférést korlátozhatja csak azokra a fürtökre, amelyek hozzáférhetnek a Kafkához. Lásd: Adatkezelés Unity Cataloggal.
  • Tárolja a tanúsítványjelszavakat titokként egy titkosskálán.

Az alábbi példa objektumtárolási helyeket és Databricks-titkos kulcsokat használ az SSL-kapcsolat engedélyezéséhez:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

A Kafka csatlakoztatása a HDInsighton az Azure Databrickshez

  1. HDInsight Kafka-fürt létrehozása.

    Útmutatásért lásd : Csatlakozás a HDInsighton futó Kafkához egy Azure-beli virtuális hálózaton keresztül.

  2. Konfigurálja a Kafka-közvetítőket a megfelelő cím meghirdetéséhez.

    Kövesse a Kafka IP-hirdetésekhez való konfigurálását ismertető útmutatót. Ha saját maga kezeli a Kafkát az Azure-beli virtuális gépeken, győződjön meg arról, hogy a brókerek advertised.listeners konfigurációja a gazdagépek belső IP-címére van állítva.

  3. Hozzon létre egy Azure Databricks-fürtöt.

  4. Csatlakoztassa a Kafka-fürtöt az Azure Databricks-fürthöz.

    Kövesse a Peer virtuális hálózatok utasításait.

Szolgáltatási főazonosító hitelesítés Microsoft Entra ID és Azure Event Hubs használatával

Az Azure Databricks támogatja a Spark-feladatok hitelesítését az Event Hubs-szolgáltatásokkal. Ez a hitelesítés OAuth-on keresztül történik a Microsoft Entra-azonosítóval.

AAD-hitelesítési diagram

Az Azure Databricks az alábbi számítási környezetekben támogatja a Microsoft Entra ID-hitelesítést ügyfél-azonosítóval és titkos kóddal:

  • Databricks Runtime 12.2 LTS és újabb verziók dedikált hozzáférési móddal (korábban egyfelhasználós hozzáférési mód) konfigurált számítási egységen.
  • Databricks Runtime 14.3 LTS és újabb verziók standard hozzáférési móddal (korábban megosztott hozzáférési móddal) konfigurált számítási erőforrásokon.
  • A Unity Catalog nélkül konfigurált DLT-folyamatok.

Az Azure Databricks nem támogatja a Microsoft Entra ID hitelesítést egy tanúsítvánnyal bármely számítási környezetben, vagy a Unity Catalogtal konfigurált DLT-folyamatokban.

Ez a hitelesítés nem működik a standard hozzáférési módú számításon vagy a Unity Catalog DLT-n.

A strukturált streamelési Kafka-összekötő konfigurálása

A Microsoft Entra-azonosítóval végzett hitelesítéshez a következő értékekre lesz szüksége:

  • Bérlőazonosító. Ezt a Microsoft Entra ID-szolgáltatások lapján találja.

  • Ügyfélazonosító (más néven alkalmazásazonosító).

  • Egy titkos ügyfél. Ezt követően titkos adatként kell hozzáadnia a Databricks-munkaterülethez. A titkos kód hozzáadásáról a Titkos kódok kezelése című témakörben olvashat.

  • Egy EventHubs-témakör. A témakörök listáját a Event Hubs szakaszban találja, ami az Entitások szakasz alatt van egy adott Event Hubs-névtér oldalon. Több témakör használatához beállíthatja az IAM-szerepkört az Event Hubs szintjén.

  • EventHubs-kiszolgáló. Ezt az adott Event Hubs-névtér áttekintési oldalán találja:

    Eseményközpontok névtér

Az Entra ID használatához meg kell adnunk a Kafkának, hogy használja az OAuth SASL mechanizmust (a SASL egy általános protokoll, az OAuth pedig a SASL "mechanizmus" típusa):

  • kafka.security.protocol legyen SASL_SSL
  • kafka.sasl.mechanism legyen OAUTHBEARER
  • kafka.sasl.login.callback.handler.class legyen a Java-osztály teljesen kvalifikált neve, amelynek értéke kafkashaded a mi árnyékolt Kafka-osztályunk bejelentkezési visszahívás-kezelőjénél. A pontos osztályhoz lásd az alábbi példát.

Példa

Következő lépésként tekintsünk meg egy futó példát:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Lehetséges hibák kezelése

  • A streamelési lehetőségek nem támogatottak.

    Ha ezt a hitelesítési mechanizmust egy Unity-katalógussal konfigurált DLT-folyamatban próbálja használni, a következő hibaüzenet jelenhet meg:

    Nem támogatott streamelési hiba

    A hiba megoldásához használjon támogatott számítási konfigurációt. Lásd: Szolgáltatásfelelős-hitelesítés a Microsoft Entra-azonosítóval és az Azure Event Hubs-szel.

  • Nem sikerült egy új KafkaAdminClient létrehozni.

    Ez egy belső hiba, amelyet a Kafka jelez, ha az alábbi hitelesítési lehetőségek közül bármelyik helytelen:

    • Ügyfélazonosító (más néven alkalmazásazonosító)
    • Bérlőazonosító
    • EventHubs-kiszolgáló

    A hiba megoldásához ellenőrizze, hogy az értékek helyesek-e ezekhez a beállításokhoz.

    Emellett ez a hiba akkor is előfordulhat, ha módosítja a példában alapértelmezés szerint megadott konfigurációs beállításokat (amelyeket a rendszer arra kért, hogy ne módosítsa), például kafka.security.protocol.

  • Nincsenek visszaadott rekordok

    Ha megkísérli megjeleníteni vagy feldolgozni a DataFrame-et, de nem kap eredményeket, az alábbiakat fogja látni a felhasználói felületen.

    Nincs találati üzenet

    Ez az üzenet azt jelenti, hogy a hitelesítés sikeres volt, de az EventHubs nem adott vissza adatokat. Néhány lehetséges (bár egyáltalán nem teljes) ok:

    • Helytelen EventHubs témát adott meg.
    • Az alapértelmezett Kafka-konfigurációs beállítás a startingOffsets, és egyelőre nem érkezik adat a témán keresztül. Beállítható, hogy startingOffsetstoearliest a Kafka legkorábbi eltolásaitól kezdve kezdje el olvasni az adatokat.