مشاركة عبر


دفق من Apache Pulsar

هام

هذه الميزة في المعاينة العامة.

في Databricks Runtime 14.1 وما فوق، يمكنك استخدام Structured Streaming لدفق البيانات من Apache Pulsar على Azure Databricks.

يوفر الدفق المنظم دلالات معالجة مرة واحدة بالضبط للبيانات المقروءة من مصادر Pulsar.

مثال على بناء الجملة

فيما يلي مثال أساسي لاستخدام Structured Streaming للقراءة من Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

يجب عليك دائما توفير service.url و أحد الخيارات التالية لتحديد الموضوعات:

  • topic
  • topics
  • topicsPattern

للحصول على قائمة كاملة بالخيارات، راجع تكوين خيارات قراءة تدفق Pulsar.

المصادقة على Pulsar

يدعم Azure Databricks مصادقة مخزن الثقة ومخزن المفاتيح إلى Pulsar. توصي Databricks باستخدام الأسرار عند تخزين تفاصيل التكوين.

يمكنك تعيين الخيارات التالية أثناء تكوين الدفق:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

إذا كان الدفق يستخدم PulsarAdmin، فقم أيضا بتعيين ما يلي:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

يوضح المثال التالي تكوين خيارات المصادقة:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

مخطط Pulsar

يعتمد مخطط السجلات المقروءة من Pulsar على كيفية ترميز الموضوعات لمخططاتها.

  • بالنسبة للمواضيع ذات مخطط Avro أو JSON، يتم الاحتفاظ بأسماء الحقول وأنواع الحقول في Spark DataFrame الناتج.
  • بالنسبة للمواضيع التي لا تحتوي على مخطط أو بنوع بيانات بسيط في Pulsar، يتم تحميل الحمولة إلى value عمود.
  • إذا تم تكوين القارئ لقراءة مواضيع متعددة بمخططات مختلفة، فقم بتعيين allowDifferentTopicSchemas لتحميل المحتوى الخام إلى value عمود.

تحتوي سجلات Pulsar على حقول بيانات التعريف التالية:

Column النوع
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

تكوين خيارات لقراءة تدفق Pulsar

يتم تكوين جميع الخيارات كجزء من قراءة Structured Streaming باستخدام .option("<optionName>", "<optionValue>") بناء الجملة. يمكنك أيضا تكوين المصادقة باستخدام الخيارات. راجع المصادقة على Pulsar.

يصف الجدول التالي التكوينات المطلوبة ل Pulsar. يجب تحديد خيار واحد فقط أو topictopics topicsPattern.

خيار القيمة الافتراضية الوصف
service.url لا شيء تكوين Pulsar serviceUrl لخدمة Pulsar.
topic لا شيء سلسلة اسم موضوع ليستهلكها الموضوع.
topics لا شيء قائمة مفصولة بفواصل للمواضيع التي يجب استهلاكها.
topicsPattern لا شيء سلسلة Java regex لتتطابق مع الموضوعات التي يجب استهلاكها.

يصف الجدول التالي الخيارات الأخرى المدعومة ل Pulsar:

خيار القيمة الافتراضية الوصف
predefinedSubscription لا شيء اسم الاشتراك المحدد مسبقا المستخدم من قبل الموصل لتتبع تقدم تطبيق spark.
subscriptionPrefix لا شيء بادئة يستخدمها الموصل لإنشاء اشتراك عشوائي لتتبع تقدم تطبيق spark.
pollTimeoutMs 120000 مهلة قراءة الرسائل من Pulsar بالمللي ثانية.
waitingForNonExistedTopic false ما إذا كان يجب أن ينتظر الموصل حتى يتم إنشاء الموضوعات المطلوبة.
failOnDataLoss true يتحكم في فشل استعلام عند فقدان البيانات (على سبيل المثال، يتم حذف الموضوعات أو حذف الرسائل بسبب نهج الاستبقاء).
allowDifferentTopicSchemas false إذا تم قراءة مواضيع متعددة ذات مخططات مختلفة، فاستخدم هذه المعلمة لإيقاف تشغيل إلغاء التسلسل التلقائي لقيمة الموضوع المستندة إلى المخطط. يتم إرجاع القيم الأولية فقط عندما يكون هذا هو true.
startingOffsets latest إذا latest، يقرأ القارئ أحدث السجلات بعد بدء تشغيله. إذا earliest، يقرأ القارئ من أقرب إزاحة. يمكن للمستخدم أيضا تحديد سلسلة JSON التي تحدد إزاحة معينة.
maxBytesPerTrigger لا شيء حد بسيط للحد الأقصى لعدد وحدات البايت التي نريد معالجتها لكل ميكروباتش. إذا تم تحديد ذلك، admin.url يجب أيضا تحديده.
admin.url لا شيء تكوين Pulsar serviceHttpUrl . مطلوب فقط عند maxBytesPerTrigger تحديد.

يمكنك أيضا تحديد أي تكوينات عميل ومسؤول وقارئ Pulsar باستخدام الأنماط التالية:

النمط ارتباط بخيارات التكوين
pulsar.client.* تكوين عميل Pulsar
pulsar.admin.* تكوين مسؤول Pulsar
pulsar.reader.* تكوين قارئ Pulsar

إنشاء إزاحات بدء تشغيل JSON

يمكنك إنشاء معرف رسالة يدويا لتحديد إزاحة معينة وتمرير هذا ك JSON إلى startingOffsets الخيار . يوضح مثال التعليمات البرمجية التالي بناء الجملة هذا:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()