Megosztás a következőn keresztül:


Stream az Apache Pulsarból

Fontos

Ez a funkció a nyilvános előzetes verzióban érhető el.

A Databricks Runtime 14.1-ben és újabb verziókban a strukturált streamelés használatával streamelheti az adatokat az Apache Pulsarból az Azure Databricksen.

A strukturált streamelés pontosan egyszeri feldolgozási szemantikát biztosít a Pulsar-forrásokból beolvasott adatokhoz.

Syntax-példa

Az alábbiakban egy egyszerű példa látható a Strukturált streamelés pulsarból való olvasására:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

A témakörök megadásához mindig meg kell adnia a service.url következő lehetőségek egyikét:

  • topic
  • topics
  • topicsPattern

A lehetőségek teljes list lásd: A Pulsar streamelési olvasásibeállításainak konfigurálása.

Hitelesítés Pulsarban

Az Azure Databricks támogatja a truststore és a keystore hitelesítést a Pulsar számára. A Databricks a konfiguráció részleteinek tárolásakor titkos kulcsok használatát javasolja.

A streamkonfiguráció során a következő beállításokat set:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Ha a stream használ egy PulsarAdmin-t, vedd figyelembe a következőket is: set.

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Az alábbi példa a hitelesítési beállítások konfigurálását mutatja be:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar schema

Pulsarból beolvasott rekordok schema azon múlik, hogy a témakörök sémái hogyan vannak kódolva.

  • Az Avro- vagy JSON-schematémakörök esetében a mezőnevek és a mezőtípusok megmaradnak az eredményül kapott Spark DataFrame-ben.
  • Azoknál a témaköröknél, amelyek nem rendelkeznek schema-val, vagy egyszerű adattípussal bírnak a Pulsarban, a hasznos terhelés betöltődik egy valuecolumn-ba.
  • Ha az olvasó több különböző sémával rendelkező témakör olvasására van konfigurálva, setallowDifferentTopicSchemas betölteni a nyers tartalmat egy valuecolumn.

A Pulsar-rekordok a következő metaadatmezőket tartalmaznak:

Column Típus
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

A Pulsar streamelési olvasási beállításainak konfigurálása

Minden beállítás konfigurálva van egy strukturált streamelési olvasás részeként szintaxis használatával .option("<optionName>", "<optionValue>") . A hitelesítést a beállítások használatával is konfigurálhatja. Lásd: Hitelesítés Pulsarban.

Az alábbi table a Pulsar szükséges konfigurációit ismerteti. A beállítások topicközül csak egyet kell megadnia, topics vagy topicsPattern.

Lehetőség Alapértelmezett érték Leírás
service.url Nincs A Pulsar szolgáltatás Pulsar serviceUrl konfigurációja.
topic Nincs A használni kívánt témakör névsztringje.
topics Nincs A felhasználandó témakörök vesszővel tagolt list.
topicsPattern Nincs Egy Java regex sztring, amely megfelel a felhasználandó témaköröknek.

Az alábbi table a Pulsar által támogatott egyéb lehetőségeket ismerteti:

Lehetőség Alapértelmezett érték Leírás
predefinedSubscription Nincs Az összekötő által a Spark-alkalmazások előrehaladásának nyomon követéséhez használt előre definiált előfizetésnév.
subscriptionPrefix Nincs Egy előtag, amelyet az összekötő használ egy véletlenszerű előfizetés generate a Spark-alkalmazások előrehaladásának nyomon követéséhez.
pollTimeoutMs 120 000 A Pulsar üzeneteinek ezredmásodpercben történő olvasásának időtúllépése.
waitingForNonExistedTopic false Várjon-e az összekötő, amíg létrejönnek a kívánt témakörök.
failOnDataLoss true Azt határozza meg, hogy az adatok elveszésekor meghiúsuljon-e a lekérdezés (például a témakörök törlődnek, vagy az üzenetek megőrzési szabályzat miatt törlődnek).
allowDifferentTopicSchemas false Ha több különböző sémával rendelkező témakört olvas, ezzel a paraméterrel kikapcsolhatja az automatikus schema-alapú témakörérték-deszerializálást. A rendszer csak a nyers values adja vissza, ha ez true.
startingOffsets latest Ha latest, az olvasó a futtatás megkezdése után felolvassa a legújabb rekordokat. Ha earliest, az olvasó a legkorábbi offset-ből olvas. A felhasználó megadhat egy JSON-sztringet is, amely egy adott offsetad meg.
maxBytesPerTrigger Nincs A mikrobatchenként feldolgozni kívánt bájtok számának finom korlátja, limit. Ha ez meg van adva, admin.url azt is meg kell adni.
admin.url Nincs A Pulsar serviceHttpUrl konfigurációja. Csak akkor szükséges, ha maxBytesPerTrigger meg van adva.

A Pulsar-ügyfél-, rendszergazda- és olvasókonfigurációkat az alábbi minták használatával is megadhatja:

Minta Hivatkozás a konfigurálási beállításokra
pulsar.client.* Pulsar-ügyfélkonfiguráció
pulsar.admin.* Pulsar rendszergazdai konfiguráció
pulsar.reader.* Pulsar-olvasó konfigurációja

Kezdő eltolások létrehozása JSON

Manuálisan létrehozhat egy üzenetazonosítót egy adott offset megadásához, és ezt JSON-ként továbbíthatja a startingOffsets beállításnak. Az alábbi példakód ezt a szintaxist mutatja be:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()