دفق من Apache Pulsar
في Databricks Runtime 14.1 وما فوق، يمكنك استخدام Structured Streaming لدفق البيانات من Apache Pulsar على Azure Databricks.
يوفر الدفق المنظم دلالات معالجة مرة واحدة بالضبط للبيانات المقروءة من مصادر Pulsar.
مثال على بناء الجملة
فيما يلي مثال أساسي لاستخدام Structured Streaming للقراءة من Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
يجب عليك دائما توفير service.url
و أحد الخيارات التالية لتحديد الموضوعات:
topic
topics
topicsPattern
للحصول على قائمة كاملة بالخيارات، راجع تكوين خيارات قراءة تدفق Pulsar.
المصادقة على Pulsar
يدعم Azure Databricks مصادقة مخزن الثقة ومخزن المفاتيح إلى Pulsar. توصي Databricks باستخدام الأسرار عند تخزين تفاصيل التكوين.
يمكنك تعيين الخيارات التالية أثناء تكوين الدفق:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
إذا كان الدفق يستخدم PulsarAdmin
، فقم أيضا بتعيين ما يلي:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
يوضح المثال التالي تكوين خيارات المصادقة:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
مخطط Pulsar
يعتمد مخطط السجلات المقروءة من Pulsar على كيفية ترميز الموضوعات لمخططاتها.
- بالنسبة للمواضيع ذات مخطط Avro أو JSON، يتم الاحتفاظ بأسماء الحقول وأنواع الحقول في Spark DataFrame الناتج.
- بالنسبة للمواضيع التي لا تحتوي على مخطط أو بنوع بيانات بسيط في Pulsar، يتم تحميل الحمولة إلى
value
عمود. - إذا تم تكوين القارئ لقراءة مواضيع متعددة بمخططات مختلفة، فقم بتعيين
allowDifferentTopicSchemas
لتحميل المحتوى الخام إلىvalue
عمود.
تحتوي سجلات Pulsar على حقول بيانات التعريف التالية:
Column | النوع |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
تكوين خيارات لقراءة تدفق Pulsar
يتم تكوين جميع الخيارات كجزء من قراءة Structured Streaming باستخدام .option("<optionName>", "<optionValue>")
بناء الجملة. يمكنك أيضا تكوين المصادقة باستخدام الخيارات. راجع المصادقة على Pulsar.
يصف الجدول التالي التكوينات المطلوبة ل Pulsar. يجب تحديد خيار واحد فقط أو topic
topics
topicsPattern
.
خيار | القيمة الافتراضية | الوصف |
---|---|---|
service.url |
لا شيء | تكوين Pulsar serviceUrl لخدمة Pulsar. |
topic |
لا شيء | سلسلة اسم موضوع ليستهلكها الموضوع. |
topics |
لا شيء | قائمة مفصولة بفواصل للمواضيع التي يجب استهلاكها. |
topicsPattern |
لا شيء | سلسلة Java regex لتتطابق مع الموضوعات التي يجب استهلاكها. |
يصف الجدول التالي الخيارات الأخرى المدعومة ل Pulsar:
خيار | القيمة الافتراضية | الوصف |
---|---|---|
predefinedSubscription |
لا شيء | اسم الاشتراك المحدد مسبقا المستخدم من قبل الموصل لتتبع تقدم تطبيق spark. |
subscriptionPrefix |
لا شيء | بادئة يستخدمها الموصل لإنشاء اشتراك عشوائي لتتبع تقدم تطبيق spark. |
pollTimeoutMs |
120000 | مهلة قراءة الرسائل من Pulsar بالمللي ثانية. |
waitingForNonExistedTopic |
false |
ما إذا كان يجب أن ينتظر الموصل حتى يتم إنشاء الموضوعات المطلوبة. |
failOnDataLoss |
true |
يتحكم في فشل استعلام عند فقدان البيانات (على سبيل المثال، يتم حذف الموضوعات أو حذف الرسائل بسبب نهج الاستبقاء). |
allowDifferentTopicSchemas |
false |
إذا تم قراءة مواضيع متعددة ذات مخططات مختلفة، فاستخدم هذه المعلمة لإيقاف تشغيل إلغاء التسلسل التلقائي لقيمة الموضوع المستندة إلى المخطط. يتم إرجاع القيم الأولية فقط عندما يكون هذا هو true . |
startingOffsets |
latest |
إذا latest ، يقرأ القارئ أحدث السجلات بعد بدء تشغيله. إذا earliest ، يقرأ القارئ من أقرب إزاحة. يمكن للمستخدم أيضا تحديد سلسلة JSON التي تحدد إزاحة معينة. |
maxBytesPerTrigger |
لا شيء | حد بسيط للحد الأقصى لعدد وحدات البايت التي نريد معالجتها لكل ميكروباتش. إذا تم تحديد ذلك، admin.url يجب أيضا تحديده. |
admin.url |
لا شيء | تكوين Pulsar serviceHttpUrl . مطلوب فقط عند maxBytesPerTrigger تحديد. |
يمكنك أيضا تحديد أي تكوينات عميل ومسؤول وقارئ Pulsar باستخدام الأنماط التالية:
النمط | ارتباط بخيارات التكوين |
---|---|
pulsar.client.* |
تكوين عميل Pulsar |
pulsar.admin.* |
تكوين مسؤول Pulsar |
pulsar.reader.* |
تكوين قارئ Pulsar |
إنشاء إزاحات بدء تشغيل JSON
يمكنك إنشاء معرف رسالة يدويا لتحديد إزاحة معينة وتمرير هذا ك JSON إلى startingOffsets
الخيار . يوضح مثال التعليمات البرمجية التالي بناء الجملة هذا:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()