الاشتراك في Google Pub/Sub
يوفر Azure Databricks موصلا مضمنا للاشتراك في Google Pub/Sub في Databricks Runtime 13.3 LTS وما فوق. يوفر هذا الموصل دلالات معالجة مرة واحدة بالضبط للسجلات من المشترك.
إشعار
قد ينشر Pub/Sub سجلات مكررة، وقد تصل السجلات إلى المشترك خارج الترتيب. يجب عليك كتابة التعليمات البرمجية ل Azure Databricks لمعالجة السجلات المكررة وغير المطلوبة.
مثال على بناء الجملة
يوضح مثال التعليمات البرمجية التالي بناء الجملة الأساسي لتكوين قراءة Structured Streaming من Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
لمزيد من خيارات التكوين، راجع تكوين خيارات قراءة دفق النشر/الفرعي.
تكوين الوصول إلى Pub/Sub
توصي Databricks باستخدام الأسرار عند توفير خيارات التخويل. الخيارات التالية مطلوبة لتخويل اتصال:
clientEmail
clientId
privateKey
privateKeyId
يصف الجدول التالي الأدوار المطلوبة لبيانات الاعتماد المكونة:
الأدوار | مطلوب أو اختياري | كيفية الاستخدام |
---|---|---|
roles/pubsub.viewer أو roles/viewer |
المطلوب | تحقق مما إذا كان الاشتراك موجودا والحصول على الاشتراك |
roles/pubsub.subscriber |
المطلوب | إحضار البيانات من اشتراك |
roles/pubsub.editor أو roles/editor |
اختياري | تمكين إنشاء اشتراك إذا لم يكن موجودا ويمكن أيضا استخدام deleteSubscriptionOnStreamStop لحذف الاشتراكات عند إنهاء الدفق |
مخطط Pub/Sub
يطابق مخطط الدفق السجلات التي تم جلبها من Pub/Sub، كما هو موضح في الجدول التالي:
الحقل | نوع |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
تكوين خيارات لقراءة دفق النشر/الفرعي
يصف الجدول التالي الخيارات المعتمدة ل Pub/Sub. يتم تكوين جميع الخيارات كجزء من قراءة Structured Streaming باستخدام .option("<optionName>", "<optionValue>")
بناء الجملة.
إشعار
تستخدم بعض خيارات تكوين Pub/Sub مفهوم الجلب بدلا من الدفعات الصغيرة. يعكس هذا تفاصيل التنفيذ الداخلية، وتعمل الخيارات بشكل مشابه للمسجلات في موصلات Structured Streaming الأخرى، باستثناء أنه يتم إحضار السجلات ثم معالجتها.
خيار | القيمة الافتراضية | الوصف |
---|---|---|
numFetchPartitions |
تعيين إلى نصف عدد المنفذين الموجودين في تهيئة الدفق. | عدد مهام Spark المتوازية التي تجلب السجلات من اشتراك. |
deleteSubscriptionOnStreamStop |
false |
إذا true ، يتم حذف الاشتراك الذي تم تمريره إلى الدفق عند انتهاء مهمة الدفق. |
maxBytesPerTrigger |
لا شيء | حد بسيط لحجم الدفعة المراد معالجته أثناء كل دفعة صغيرة يتم تشغيلها. |
maxRecordsPerFetch |
1000 | عدد السجلات التي يجب إحضارها لكل مهمة قبل معالجة السجلات. |
maxFetchPeriod |
10 ثوان | المدة الزمنية لكل مهمة لإحضارها قبل معالجة السجلات. توصي Databricks باستخدام القيمة الافتراضية. |
دلالات معالجة الدفعات المتزايدة ل Pub/Sub
يمكنك استخدام Trigger.AvailableNow
لاستهلاك السجلات المتوفرة من مصادر Pub/Sub دفعة تزايدية.
يسجل Azure Databricks الطابع الزمني عند بدء القراءة مع Trigger.AvailableNow
الإعداد. تتضمن السجلات التي تتم معالجتها بواسطة الدفعة جميع البيانات التي تم إحضارها مسبقا وأي سجلات منشورة حديثا مع طابع زمني أقل من الطابع الزمني لبدء الدفق المسجل.
راجع تكوين معالجة الدفعات التزايدية.
مراقبة مقاييس الدفق
تبلغ مقاييس تقدم الدفق المنظم عن عدد السجلات التي تم جلبها وجاهزة للمعالجة، وحجم السجلات التي تم جلبها وجاهزة للمعالجة، وعدد التكرارات التي شوهدت منذ بدء الدفق. فيما يلي مثال على هذه المقاييس:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
القيود
التنفيذ التخميني (spark.speculation
) غير مدعوم مع Pub/Sub.