استخدام foreachBatch للكتابة إلى متلقي البيانات العشوائية
تتناول هذه المقالة استخدام foreachBatch
مع Structured Streaming لكتابة إخراج استعلام دفق إلى مصادر البيانات التي لا تحتوي على مصدر تدفق موجود.
يسمح لك نمط streamingDF.writeStream.foreachBatch(...)
التعليمات البرمجية بتطبيق وظائف الدفعة على بيانات الإخراج لكل دفعة صغيرة من استعلام الدفق. الدوال المستخدمة مع foreachBatch
أخذ معلمتين:
- DataFrame يحتوي على بيانات إخراج دفعة صغيرة.
- المعرف الفريد للدفعة الصغيرة.
يجب استخدام foreachBatch
لعمليات دمج Delta Lake في Structured Streaming. راجع Upsert من استعلامات الدفق باستخدام foreachBatch.
تطبيق عمليات DataFrame إضافية
لا يتم دعم العديد من عمليات DataFrame وDataset في دفق DataFrames لأن Spark لا يدعم إنشاء خطط تزايدية في هذه الحالات. foreachBatch()
يمكنك استخدام تطبيق بعض هذه العمليات على كل إخراج دفعة صغيرة. على سبيل المثال، يمكنك استخدام foreachBath()
عملية SQL MERGE INTO
وكتابة إخراج تجميعات الدفق في جدول Delta في وضع التحديث. راجع المزيد من التفاصيل في MERGE INTO.
هام
foreachBatch()
يوفر ضمانات كتابة مرة واحدة على الأقل فقط. ومع ذلك، يمكنك استخدامbatchId
المقدمة إلى الدالة كطريقة لإلغاء تكرار الإخراج والحصول على ضمان مرة واحدة بالضبط. في كلتا الحالتين، سيكون لديك سبب حول دلالات من طرف إلى طرف بنفسك.foreachBatch()
لا يعمل مع وضع المعالجة المستمرة لأنه يعتمد بشكل أساسي على تنفيذ الدفعة الصغيرة لاستعلام دفق. إذا كتبت البيانات في الوضع المستمر، فاستخدمforeach()
بدلا من ذلك.
يمكن استدعاء إطار بيانات فارغ مع foreachBatch()
ويجب أن تكون التعليمات البرمجية للمستخدم مرنة للسماح بالعملية المناسبة. يظهر مثال هنا:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
تغييرات السلوك في foreachBatch
Databricks Runtime 14.0
في Databricks Runtime 14.0 وما فوق على الحساب المكون مع وضع الوصول المشترك، يتم تطبيق تغييرات السلوك التالية:
print()
تكتب الأوامر الإخراج إلى سجلات برنامج التشغيل.- لا يمكنك الوصول إلى
dbutils.widgets
النموذج الفرعي داخل الدالة . - يجب أن تكون أي ملفات أو وحدات أو كائنات مشار إليها في الدالة قابلة للتسلسل ومتاحة على Spark.
إعادة استخدام مصادر بيانات الدفعات الموجودة
باستخدام foreachBatch()
، يمكنك استخدام كتاب بيانات الدفعات الحاليين لمتلقي البيانات التي قد لا تحتوي على دعم Structured Streaming. فيما يلي بعض الأمثلة على ذلك:
يمكن استخدام العديد من مصادر بيانات الدفعات الأخرى من foreachBatch()
. راجع الاتصال بمصادر البيانات.
الكتابة إلى مواقع متعددة
إذا كنت بحاجة إلى كتابة إخراج استعلام دفق إلى مواقع متعددة، توصي Databricks باستخدام العديد من كتاب Structured Streaming للحصول على أفضل توازي ومعدل نقل.
يؤدي استخدام foreachBatch
الكتابة إلى متلقيات متعددة إلى تسلسل تنفيذ عمليات الكتابة المتدفقة، ما يمكن أن يزيد من زمن الانتقال لكل دفعة صغيرة.
إذا كنت تستخدم foreachBatch
للكتابة إلى جداول Delta متعددة، فشاهد كتابة الجدول المتكرر في foreachBatch.