مشاركة عبر


الاشتراك في Google Pub/Sub

يوفر Azure Databricks موصلا مضمنا للاشتراك في Google Pub/Sub في Databricks Runtime 13.3 LTS وما فوق. يوفر هذا الموصل دلالات معالجة مرة واحدة بالضبط للسجلات من المشترك.

إشعار

قد ينشر Pub/Sub سجلات مكررة، وقد تصل السجلات إلى المشترك خارج الترتيب. يجب عليك كتابة التعليمات البرمجية ل Azure Databricks لمعالجة السجلات المكررة وغير المطلوبة.

مثال على بناء الجملة

يوضح مثال التعليمات البرمجية التالي بناء الجملة الأساسي لتكوين قراءة Structured Streaming من Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

لمزيد من خيارات التكوين، راجع تكوين خيارات قراءة دفق النشر/الفرعي.

تكوين الوصول إلى Pub/Sub

توصي Databricks باستخدام الأسرار عند توفير خيارات التخويل. الخيارات التالية مطلوبة لتخويل اتصال:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

يصف الجدول التالي الأدوار المطلوبة لبيانات الاعتماد المكونة:

الأدوار مطلوب أو اختياري كيفية الاستخدام
roles/pubsub.viewer أو roles/viewer المطلوب تحقق مما إذا كان الاشتراك موجودا والحصول على الاشتراك
roles/pubsub.subscriber المطلوب إحضار البيانات من اشتراك
roles/pubsub.editor أو roles/editor اختياري تمكين إنشاء اشتراك إذا لم يكن موجودا ويمكن أيضا استخدام deleteSubscriptionOnStreamStop لحذف الاشتراكات عند إنهاء الدفق

مخطط Pub/Sub

يطابق مخطط الدفق السجلات التي تم جلبها من Pub/Sub، كما هو موضح في الجدول التالي:

الحقل نوع
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

تكوين خيارات لقراءة دفق النشر/الفرعي

يصف الجدول التالي الخيارات المعتمدة ل Pub/Sub. يتم تكوين جميع الخيارات كجزء من قراءة Structured Streaming باستخدام .option("<optionName>", "<optionValue>") بناء الجملة.

إشعار

تستخدم بعض خيارات تكوين Pub/Sub مفهوم الجلب بدلا من الدفعات الصغيرة. يعكس هذا تفاصيل التنفيذ الداخلية، وتعمل الخيارات بشكل مشابه للمسجلات في موصلات Structured Streaming الأخرى، باستثناء أنه يتم إحضار السجلات ثم معالجتها.

خيار القيمة الافتراضية ‏‏الوصف
numFetchPartitions تعيين إلى نصف عدد المنفذين الموجودين في تهيئة الدفق. عدد مهام Spark المتوازية التي تجلب السجلات من اشتراك.
deleteSubscriptionOnStreamStop false إذا true، يتم حذف الاشتراك الذي تم تمريره إلى الدفق عند انتهاء مهمة الدفق.
maxBytesPerTrigger لا شيء حد بسيط لحجم الدفعة المراد معالجته أثناء كل دفعة صغيرة يتم تشغيلها.
maxRecordsPerFetch 1000 عدد السجلات التي يجب إحضارها لكل مهمة قبل معالجة السجلات.
maxFetchPeriod 10 ثوان المدة الزمنية لكل مهمة لإحضارها قبل معالجة السجلات. توصي Databricks باستخدام القيمة الافتراضية.

دلالات معالجة الدفعات المتزايدة ل Pub/Sub

يمكنك استخدام Trigger.AvailableNow لاستهلاك السجلات المتوفرة من مصادر Pub/Sub دفعة تزايدية.

يسجل Azure Databricks الطابع الزمني عند بدء القراءة مع Trigger.AvailableNow الإعداد. تتضمن السجلات التي تتم معالجتها بواسطة الدفعة جميع البيانات التي تم إحضارها مسبقا وأي سجلات منشورة حديثا مع طابع زمني أقل من الطابع الزمني لبدء الدفق المسجل.

راجع تكوين معالجة الدفعات التزايدية.

مراقبة مقاييس الدفق

تبلغ مقاييس تقدم الدفق المنظم عن عدد السجلات التي تم جلبها وجاهزة للمعالجة، وحجم السجلات التي تم جلبها وجاهزة للمعالجة، وعدد التكرارات التي شوهدت منذ بدء الدفق. فيما يلي مثال على هذه المقاييس:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

القيود

التنفيذ التخميني (spark.speculation) غير مدعوم مع Pub/Sub.