مشاركة عبر


نقاط التحقق المتدفقة المنظمة

تعمل نقاط التحقق وسجلات الكتابة المسبقة معا لتوفير ضمانات المعالجة لأحمال عمل Structured Streaming. تتعقب نقطة التحقق المعلومات التي تعرف الاستعلام، بما في ذلك معلومات الحالة والسجلات المعالجة. عند حذف الملفات في دليل نقطة تحقق أو التغيير إلى موقع نقطة تحقق جديد، يبدأ التشغيل التالي للاستعلام من جديد.

يجب أن يكون لكل استعلام موقع نقطة تحقق مختلف. يجب ألا تشترك استعلامات متعددة في نفس الموقع.

تمكين نقاط التحقق من استعلامات Structured Streaming

يجب تحديد checkpointLocation الخيار قبل تشغيل استعلام دفق، كما في المثال التالي:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

إشعار

تقوم بعض المتلقيات، مثل إخراج display() في دفاتر الملاحظات والمتلقي memory ، تلقائيا بإنشاء موقع نقطة تفتيش مؤقت إذا حذفت هذا الخيار. لا تضمن مواقع نقاط التفتيش المؤقتة هذه أي التسامح مع الخطأ أو ضمانات تناسق البيانات وقد لا يتم تنظيفها بشكل صحيح. توصي Databricks دائما بتحديد موقع نقطة تفتيش لهذه المتلقيات.

الاسترداد بعد التغييرات في استعلام Structured Streaming

هناك قيود على التغييرات في استعلام الدفق المسموح بها بين عمليات إعادة التشغيل من نفس موقع نقطة التحقق. فيما يلي بعض التغييرات غير المسموح بها أو أن تأثير التغيير غير محدد جيدا. بالنسبة لهم جميعا:

  • يعني المصطلح المسموح به أنه يمكنك إجراء التغيير المحدد ولكن ما إذا كانت دلالات تأثيره محددة جيدا يعتمد على الاستعلام والتغيير.
  • يعني المصطلح غير المسموح به أنه يجب عدم إجراء التغيير المحدد حيث من المحتمل أن يفشل الاستعلام الذي تمت إعادة تشغيله مع وجود أخطاء غير متوقعة.
  • sdf يمثل تدفق DataFrame/Dataset تم إنشاؤه باستخدام sparkSession.readStream.

أنواع التغييرات في استعلامات الدفق المنظم

  • التغييرات في الرقم أو النوع (أي المصدر المختلف) لمصادر الإدخال: هذا غير مسموح به.
  • التغييرات في معلمات مصادر الإدخال: ما إذا كان هذا مسموحا به وما إذا كانت دلالات التغيير محددة جيدا يعتمد على المصدر والاستعلام. إليك بعض الأمثلة.
    • يسمح بإضافة حدود المعدل وحذفها وتعديلها:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • لا يسمح عموما بإجراء تغييرات على المقالات والملفات المشتركة لأن النتائج غير متوقعة: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • التغييرات في الفاصل الزمني للمشغل: يمكنك تغيير المشغلات بين الدفعات التزايدية والفواصل الزمنية. راجع تغيير فواصل المشغل بين عمليات التشغيل.
  • التغييرات في نوع متلقي الإخراج: يسمح بالتغييرات بين مجموعات معينة قليلة من المتلقيات. ويجب التحقق من ذلك على أساس كل حالة على حدة. إليك بعض الأمثلة.
    • يسمح بمتلقي الملفات إلى متلقي Kafka. سيشاهد Kafka البيانات الجديدة فقط.
    • لا يسمح بمتلقي Kafka إلى متلقي الملفات.
    • تم تغيير متلقي Kafka إلى foreach، أو يسمح بالعكس.
  • التغييرات في معلمات متلقي الإخراج: ما إذا كان هذا مسموحا به وما إذا كانت دلالات التغيير محددة جيدا يعتمد على المتلقي والاستعلام. إليك بعض الأمثلة.
    • لا يسمح بإجراء تغييرات على دليل الإخراج لمتلقي الملفات: sdf.writeStream.format("parquet").option("path", "/somePath") إلى sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • يسمح بإجراء تغييرات على موضوع الإخراج: sdf.writeStream.format("kafka").option("topic", "topic1") إلى sdf.writeStream.format("kafka").option("topic", "topic2")
    • يسمح بإجراء تغييرات على متلقي foreach المعرف من قبل المستخدم (أي التعليمات ForeachWriter البرمجية)، ولكن دلالات التغيير تعتمد على التعليمات البرمجية.
  • التغييرات في الإسقاط / التصفية / عمليات تشبه الخريطة: يسمح ببعض الحالات. على سبيل المثال:
    • يسمح بإضافة / حذف عوامل التصفية: sdf.selectExpr("a") إلى sdf.where(...).selectExpr("a").filter(...).
    • يسمح بالتغييرات في الإسقاطات ذات مخطط الإخراج نفسه: sdf.selectExpr("stringColumn AS json").writeStream إلى sdf.select(to_json(...).as("json")).writeStream.
    • يسمح بالتغييرات في الإسقاطات ذات مخطط إخراج مختلف بشكل مشروط: sdf.selectExpr("a").writeStream sdf.selectExpr("b").writeStream يسمح به فقط إذا كان مصدر الإخراج يسمح بتغيير المخطط من "a" إلى "b".
  • التغييرات في العمليات ذات الحالة: تحتاج بعض العمليات في استعلامات الدفق إلى الاحتفاظ ببيانات الحالة من أجل تحديث النتيجة باستمرار. يقوم الدفق المنظم تلقائيا بنقاط التحقق من بيانات الحالة إلى تخزين متسامح مع الأخطاء (على سبيل المثال، DBFS، تخزين Azure Blob) ويستعيدها بعد إعادة التشغيل. ومع ذلك، يفترض هذا أن مخطط بيانات الحالة يظل كما هو عبر عمليات إعادة التشغيل. وهذا يعني أنه لا يسمح بأي تغييرات (أي الإضافات أو عمليات الحذف أو تعديلات المخطط) على العمليات ذات الحالة الخاصة لاستعلام الدفق بين عمليات إعادة التشغيل. فيما يلي قائمة العمليات ذات الحالة التي يجب عدم تغيير مخططها بين عمليات إعادة التشغيل لضمان استرداد الحالة:
    • تجميع الدفق: على سبيل المثال، sdf.groupBy("a").agg(...). لا يسمح بأي تغيير في عدد أو نوع مفاتيح التجميع أو التجميعات.
    • إلغاء تكرار الدفق: على سبيل المثال، sdf.dropDuplicates("a"). لا يسمح بأي تغيير في عدد أو نوع مفاتيح التجميع أو التجميعات.
    • ربط دفق الدفق: على سبيل المثال، sdf1.join(sdf2, ...) (أي يتم إنشاء كلا الإدخالين باستخدام sparkSession.readStream). لا يسمح بالتغييرات في المخطط أو أعمدة الربط المتساوي. التغييرات في نوع الصلة (الخارجي أو الداخلي) غير مسموح بها. التغييرات الأخرى في حالة الصلة غير محددة.
    • عملية إجبارية ذات حالة: على سبيل المثال، sdf.groupByKey(...).mapGroupsWithState(...) أو sdf.groupByKey(...).flatMapGroupsWithState(...). لا يسمح بإجراء أي تغيير على مخطط الحالة المعرفة من قبل المستخدم ونوع المهلة. يسمح بأي تغيير داخل دالة تعيين الحالة المعرفة من قبل المستخدم، ولكن التأثير الدلالي للتغيير يعتمد على المنطق المعرف من قبل المستخدم. إذا كنت تريد حقا دعم تغييرات مخطط الحالة، يمكنك ترميز/فك ترميز بنيات بيانات الحالة المعقدة إلى وحدات بايت باستخدام نظام ترميز/فك ترميز يدعم ترحيل المخطط. على سبيل المثال، إذا قمت بحفظ حالتك كوحدات بايت مرمزة بواسطة Avro، فيمكنك تغيير مخطط حالة Avro بين إعادة تشغيل الاستعلام لأن هذا يستعيد الحالة الثنائية.