Stream az Apache Pulsarból
Fontos
Ez a funkció a nyilvános előzetes verzióban érhető el.
A Databricks Runtime 14.1-ben és újabb verziókban a strukturált streamelés használatával streamelheti az adatokat az Apache Pulsarból az Azure Databricksen.
A strukturált streamelés pontosan egyszeri feldolgozási szemantikát biztosít a Pulsar-forrásokból beolvasott adatokhoz.
Syntax-példa
Az alábbiakban egy egyszerű példa látható a Strukturált streamelés pulsarból való olvasására:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
A témakörök megadásához mindig meg kell adnia a service.url
következő lehetőségek egyikét:
topic
topics
topicsPattern
A lehetőségek teljes list lásd: A Pulsar streamelési olvasásibeállításainak konfigurálása.
Hitelesítés Pulsarban
Az Azure Databricks támogatja a truststore és a keystore hitelesítést a Pulsar számára. A Databricks a konfiguráció részleteinek tárolásakor titkos kulcsok használatát javasolja.
A streamkonfiguráció során a következő beállításokat set:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Ha a stream használ egy PulsarAdmin
-t, vedd figyelembe a következőket is: set.
pulsar.admin.authPluginClassName
pulsar.admin.authParams
Az alábbi példa a hitelesítési beállítások konfigurálását mutatja be:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar schema
Pulsarból beolvasott rekordok schema azon múlik, hogy a témakörök sémái hogyan vannak kódolva.
- Az Avro- vagy JSON-schematémakörök esetében a mezőnevek és a mezőtípusok megmaradnak az eredményül kapott Spark DataFrame-ben.
- Azoknál a témaköröknél, amelyek nem rendelkeznek schema-val, vagy egyszerű adattípussal bírnak a Pulsarban, a hasznos terhelés betöltődik egy
value
column-ba. - Ha az olvasó több különböző sémával rendelkező témakör olvasására van konfigurálva, set
allowDifferentTopicSchemas
betölteni a nyers tartalmat egyvalue
column.
A Pulsar-rekordok a következő metaadatmezőket tartalmaznak:
Column | Típus |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
A Pulsar streamelési olvasási beállításainak konfigurálása
Minden beállítás konfigurálva van egy strukturált streamelési olvasás részeként szintaxis használatával .option("<optionName>", "<optionValue>")
. A hitelesítést a beállítások használatával is konfigurálhatja. Lásd: Hitelesítés Pulsarban.
Az alábbi table a Pulsar szükséges konfigurációit ismerteti. A beállítások topic
közül csak egyet kell megadnia, topics
vagy topicsPattern
.
Lehetőség | Alapértelmezett érték | Leírás |
---|---|---|
service.url |
Nincs | A Pulsar szolgáltatás Pulsar serviceUrl konfigurációja. |
topic |
Nincs | A használni kívánt témakör névsztringje. |
topics |
Nincs | A felhasználandó témakörök vesszővel tagolt list. |
topicsPattern |
Nincs | Egy Java regex sztring, amely megfelel a felhasználandó témaköröknek. |
Az alábbi table a Pulsar által támogatott egyéb lehetőségeket ismerteti:
Lehetőség | Alapértelmezett érték | Leírás |
---|---|---|
predefinedSubscription |
Nincs | Az összekötő által a Spark-alkalmazások előrehaladásának nyomon követéséhez használt előre definiált előfizetésnév. |
subscriptionPrefix |
Nincs | Egy előtag, amelyet az összekötő használ egy véletlenszerű előfizetés generate a Spark-alkalmazások előrehaladásának nyomon követéséhez. |
pollTimeoutMs |
120 000 | A Pulsar üzeneteinek ezredmásodpercben történő olvasásának időtúllépése. |
waitingForNonExistedTopic |
false |
Várjon-e az összekötő, amíg létrejönnek a kívánt témakörök. |
failOnDataLoss |
true |
Azt határozza meg, hogy az adatok elveszésekor meghiúsuljon-e a lekérdezés (például a témakörök törlődnek, vagy az üzenetek megőrzési szabályzat miatt törlődnek). |
allowDifferentTopicSchemas |
false |
Ha több különböző sémával rendelkező témakört olvas, ezzel a paraméterrel kikapcsolhatja az automatikus schema-alapú témakörérték-deszerializálást. A rendszer csak a nyers values adja vissza, ha ez true . |
startingOffsets |
latest |
Ha latest , az olvasó a futtatás megkezdése után felolvassa a legújabb rekordokat. Ha earliest , az olvasó a legkorábbi offset-ből olvas. A felhasználó megadhat egy JSON-sztringet is, amely egy adott offsetad meg. |
maxBytesPerTrigger |
Nincs | A mikrobatchenként feldolgozni kívánt bájtok számának finom korlátja, limit. Ha ez meg van adva, admin.url azt is meg kell adni. |
admin.url |
Nincs | A Pulsar serviceHttpUrl konfigurációja. Csak akkor szükséges, ha maxBytesPerTrigger meg van adva. |
A Pulsar-ügyfél-, rendszergazda- és olvasókonfigurációkat az alábbi minták használatával is megadhatja:
Minta | Hivatkozás a konfigurálási beállításokra |
---|---|
pulsar.client.* |
Pulsar-ügyfélkonfiguráció |
pulsar.admin.* |
Pulsar rendszergazdai konfiguráció |
pulsar.reader.* |
Pulsar-olvasó konfigurációja |
Kezdő eltolások létrehozása JSON
Manuálisan létrehozhat egy üzenetazonosítót egy adott offset megadásához, és ezt JSON-ként továbbíthatja a startingOffsets
beállításnak. Az alábbi példakód ezt a szintaxist mutatja be:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()