مشاركة عبر


استخدام موجز بيانات تغيير Delta Lake على Azure Databricks

يسمح موجز بيانات التغيير ل Azure Databricks بتعقب التغييرات على مستوى الصف بين إصدارات جدول Delta. عند تمكينها في جدول Delta، تقوم سجلات وقت التشغيل بتغيير الأحداث لجميع البيانات المكتوبة في الجدول. يتضمن ذلك بيانات الصف مع بيانات التعريف التي تشير إلى ما إذا كان الصف المحدد قد تم إدراجه أو حذفه أو تحديثه.

هام

يعمل موجز بيانات التغيير جنبا إلى جنب مع محفوظات الجدول لتوفير معلومات التغيير. نظرا لأن استنساخ جدول Delta يؤدي إلى إنشاء محفوظات منفصلة، فإن موجز بيانات التغيير على الجداول المستنسخة لا يتطابق مع محفوظات الجدول الأصلي.

معالجة بيانات التغيير بشكل متزايد

توصي Databricks باستخدام موجز بيانات التغيير بالاشتراك مع Structured Streaming لمعالجة التغييرات بشكل متزايد من جداول Delta. يجب عليك استخدام Structured Streaming ل Azure Databricks لتعقب الإصدارات تلقائيا لموجز بيانات تغيير الجدول.

إشعار

توفر Delta Live Tables وظائف لنشر بيانات التغيير بسهولة وتخزين النتائج كجداول SCD (بعد يتغير ببطء) من النوع 1 أو النوع 2. راجع واجهات برمجة تطبيقات APPLY CHANGES: تبسيط التقاط بيانات التغيير باستخدام Delta Live Tables.

لقراءة موجز بيانات التغيير من جدول، يجب تمكين موجز بيانات التغيير على هذا الجدول. راجع تمكين موجز بيانات التغيير.

قم بتعيين الخيار readChangeFeed إلى true عند تكوين دفق مقابل جدول لقراءة موجز بيانات التغيير، كما هو موضح في مثال بناء الجملة التالي:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

بشكل افتراضي، يقوم الدفق بإرجاع أحدث لقطة للجدول عند بدء الدفق لأول مرة كتغير مستقبلي INSERT كبيانات تغيير.

تغيير عمليات تثبيت البيانات كجزء من معاملة Delta Lake، وتصبح متاحة في نفس الوقت الذي تلتزم فيه البيانات الجديدة بالجدول.

يمكنك اختياريا تحديد إصدار بدء. راجع هل يجب تحديد إصدار بدء؟.

يدعم موجز بيانات التغيير أيضا تنفيذ الدفعة، والذي يتطلب تحديد إصدار بدء. راجع قراءة التغييرات في استعلامات الدفعة.

خيارات مثل حدود المعدل (maxFilesPerTrigger، maxBytesPerTrigger) ويتم excludeRegex دعمها أيضا عند قراءة بيانات التغيير.

يمكن أن يكون تحديد المعدل ذريا للإصدارات الأخرى غير إصدار لقطة البداية. أي أن إصدار التثبيت بأكمله سيكون محدودا أو سيتم إرجاع التثبيت بأكمله.

هل يجب تحديد إصدار بدء؟

يمكنك اختياريا تحديد إصدار بدء إذا كنت تريد تجاهل التغييرات التي حدثت قبل إصدار معين. يمكنك تحديد إصدار باستخدام طابع زمني أو رقم معرف الإصدار المسجل في سجل معاملات Delta.

إشعار

مطلوب إصدار بدء لقراءة الدفعات، ويمكن للعديد من أنماط الدفعات الاستفادة من تعيين إصدار نهاية اختياري.

عند تكوين أحمال عمل Structured Streaming التي تتضمن موجز بيانات التغيير، من المهم فهم كيفية تأثير تحديد إصدار البداية على المعالجة.

تستفيد العديد من أحمال العمل المتدفقة، خاصة مسارات معالجة البيانات الجديدة، من السلوك الافتراضي. باستخدام السلوك الافتراضي، تتم معالجة الدفعة الأولى عندما يسجل الدفق أولا جميع السجلات الموجودة في الجدول كعملات INSERT في موجز بيانات التغيير.

إذا كان الجدول الهدف يحتوي بالفعل على جميع السجلات التي تحتوي على تغييرات مناسبة تصل إلى نقطة معينة، فحدد إصدار بدء لتجنب معالجة حالة الجدول المصدر كأحداث INSERT .

بناء الجملة المثال التالي استرداد من فشل دفق تم فيه تلف نقطة التحقق. في هذا المثال، افترض الشروط التالية:

  1. تم تمكين موجز بيانات التغيير على الجدول المصدر عند إنشاء الجدول.
  2. قام جدول انتقال البيانات من الخادم الهدف بمعالجة جميع التغييرات حتى الإصدار 75 بما في ذلك.
  3. تتوفر محفوظات الإصدارات للجدول المصدر للإصدارات 70 والإصدارات الأحدث.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

في هذا المثال، يجب أيضا تحديد موقع نقطة تحقق جديد.

هام

إذا قمت بتحديد إصدار بدء، يفشل الدفق في البدء من نقطة تحقق جديدة إذا لم يعد إصدار البداية موجودا في محفوظات الجدول. تقوم Delta Lake بتنظيف الإصدارات التاريخية تلقائيا، ما يعني أنه يتم حذف جميع إصدارات البدء المحددة في النهاية.

راجع هل يمكنني استخدام موجز بيانات التغيير لإعادة تشغيل محفوظات الجدول بالكامل؟.

قراءة التغييرات في استعلامات الدفعة

يمكنك استخدام بناء جملة الاستعلام الدفعي لقراءة جميع التغييرات بدءا من إصدار معين أو لقراءة التغييرات ضمن نطاق محدد من الإصدارات.

يمكنك تحديد إصدار كعدد صحيح وطوابع زمنية كسلسلة بالتنسيق yyyy-MM-dd[ HH:mm:ss[.SSS]].

إصدارات البدء والنهاية شاملة في الاستعلامات. لقراءة التغييرات من إصدار بدء معين إلى أحدث إصدار من الجدول، حدد إصدار البداية فقط.

إذا قمت بتوفير إصدار أقل أو طابع زمني أقدم من واحد قام بتسجيل أحداث التغيير - أي عند تمكين موجز بيانات التغيير - يتم طرح خطأ يشير إلى عدم تمكين موجز بيانات التغيير.

توضح أمثلة بناء الجملة التالية استخدام خيارات إصدار البدء والانتهاء مع قراءات الدفعة:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

إشعار

بشكل افتراضي، إذا مرر مستخدم إصدارا أو طابعا زمنيا يتجاوز التثبيت الأخير على جدول، يتم طرح الخطأ timestampGreaterThanLatestCommit . في Databricks Runtime 11.3 LTS والإصدارات الأحدث، يمكن لموجز البيانات التغيير معالجة حالة إصدار خارج النطاق إذا كان المستخدم يعين التكوين التالي إلى true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

إذا قمت بتوفير إصدار بدء أكبر من التثبيت الأخير على جدول أو طابع زمني للبدء أحدث من التثبيت الأخير في جدول، ثم عند تمكين التكوين السابق، يتم إرجاع نتيجة قراءة فارغة.

إذا قمت بتوفير إصدار نهاية أكبر من التثبيت الأخير على جدول أو طابع زمني للانتهاء أحدث من التثبيت الأخير على جدول، فعند تمكين التكوين السابق في وضع قراءة الدفعة، يتم إرجاع جميع التغييرات بين إصدار البدء والتثبيت الأخير.

ما هو مخطط موجز بيانات التغيير؟

عند القراءة من موجز بيانات التغيير لجدول، يتم استخدام مخطط أحدث إصدار للجدول.

إشعار

يتم دعم معظم عمليات تغيير المخطط والتطور بشكل كامل. لا يدعم الجدول الذي تم تمكين تعيين العمود فيه جميع حالات الاستخدام ويوضح سلوكا مختلفا. راجع تغيير قيود موجز البيانات للجداول مع تمكين تعيين العمود.

بالإضافة إلى أعمدة البيانات من مخطط جدول Delta، يحتوي موجز بيانات التغيير على أعمدة بيانات التعريف التي تحدد نوع حدث التغيير:

اسم العمود نوع القيم
_change_type السلسلة‬ insert، update_preimage ، update_postimage، delete (1)
_commit_version طويل يحتوي إصدار سجل أو جدول Delta على التغيير.
_commit_timestamp طابع زمني الطابع الزمني المقترن عند إنشاء التثبيت.

(1) preimage هي القيمة قبل التحديث، postimage وهي القيمة بعد التحديث.

إشعار

لا يمكنك تمكين تغيير موجز البيانات على جدول إذا كان المخطط يحتوي على أعمدة بنفس أسماء هذه الأعمدة المضافة. أعد تسمية الأعمدة في الجدول لحل هذا التعارض قبل محاولة تمكين موجز بيانات التغيير.

تمكين موجز بيانات التغيير

يمكنك فقط قراءة موجز بيانات التغيير للجداول الممكنة. يجب تمكين خيار موجز بيانات التغيير بشكل صريح باستخدام إحدى الطرق التالية:

  • جدول جديد: تعيين خاصية delta.enableChangeDataFeed = true الجدول في CREATE TABLE الأمر .

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • الجدول الموجود: تعيين خاصية delta.enableChangeDataFeed = true الجدول في ALTER TABLE الأمر .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • جميع الجداول الجديدة:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

هام

يتم تسجيل التغييرات التي تم إجراؤها بعد تمكين موجز بيانات التغيير فقط. لا يتم التقاط التغييرات السابقة على جدول.

تغيير تخزين البيانات

يؤدي تمكين موجز بيانات التغيير إلى زيادة صغيرة في تكاليف التخزين لجدول. يتم إنشاء سجلات البيانات المتغيرة أثناء تشغيل الاستعلام، وهي بشكل عام أصغر بكثير من الحجم الإجمالي للملفات التي تمت إعادة كتابتها.

تقوم سجلات Azure Databricks بتغيير البيانات لعمليات UPDATEو DELETEو MERGE في _change_data المجلد ضمن دليل الجدول. لا تنشئ بعض العمليات، مثل عمليات الإدراج فقط وحذف القسم الكامل، بيانات في _change_data الدليل لأن Azure Databricks يمكنه حساب موجز بيانات التغيير بكفاءة مباشرة من سجل المعاملات.

يجب أن تمر جميع القراءات مقابل ملفات البيانات في _change_data المجلد من خلال واجهات برمجة تطبيقات Delta Lake المدعومة.

تتبع الملفات الموجودة في _change_data المجلد نهج الاستبقاء للجدول. يتم حذف تغيير بيانات موجز البيانات عند VACUUM تشغيل الأمر.

هل يمكنني استخدام موجز بيانات التغيير لإعادة تشغيل محفوظات الجدول بالكامل؟

لا يقصد بموجز بيانات التغيير أن يكون بمثابة سجل دائم لجميع التغييرات التي تم إجراؤها على جدول. لا يسجل تغيير موجز البيانات سوى التغييرات التي تحدث بعد تمكينه.

يسمح لك موجز البيانات و Delta Lake دائما بإعادة إنشاء لقطة كاملة لجدول مصدر، ما يعني أنه يمكنك بدء قراءة دفق جديدة مقابل جدول مع تمكين موجز بيانات التغيير والتقاط الإصدار الحالي من هذا الجدول وجميع التغييرات التي تحدث بعد ذلك.

يجب عليك معاملة السجلات في موجز بيانات التغيير على أنها عابرة ولا يمكن الوصول إليها إلا لنافذة استبقاء محددة. يزيل سجل معاملات Delta إصدارات الجدول وإصدارات موجز بيانات التغيير المقابلة لها على فترات منتظمة. عند إزالة إصدار من سجل المعاملات، لم يعد بإمكانك قراءة موجز بيانات التغيير لهذا الإصدار.

إذا كانت حالة الاستخدام تتطلب الاحتفاظ بمحفوظات دائمة لجميع التغييرات التي تم إجراؤها على جدول، فيجب عليك استخدام منطق تزايدي لكتابة سجلات من موجز بيانات التغيير إلى جدول جديد. يوضح مثال التعليمات البرمجية التالي استخدام trigger.AvailableNow، والذي يستفيد من المعالجة التزايدية للتدفق المنظم ولكنه يعالج البيانات المتاحة كعبء عمل دفعي. يمكنك جدولة حمل العمل هذا بشكل غير متزامن مع مسارات المعالجة الرئيسية لإنشاء نسخة احتياطية من موجز بيانات التغيير لأغراض التدقيق أو إعادة التشغيل الكامل.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

تغيير قيود موجز البيانات للجداول مع تمكين تعيين العمود

مع تمكين تعيين العمود على جدول Delta، يمكنك إسقاط الأعمدة أو إعادة تسميتها في الجدول دون إعادة كتابة ملفات البيانات للبيانات الموجودة. مع تمكين تعيين العمود، يحتوي موجز بيانات التغيير على قيود بعد إجراء تغييرات مخطط غير مضافة مثل إعادة تسمية عمود أو إسقاطه أو تغيير نوع البيانات أو تغييرات قابلية القيم الخالية.

هام

  • لا يمكنك قراءة موجز بيانات التغيير لمعاملة أو نطاق يحدث فيه تغيير مخطط غير إضافي باستخدام دلالات الدفعة.
  • في Databricks Runtime 12.2 LTS والإدخالات أدناه، لا تدعم الجداول التي تم تمكين تعيين الأعمدة بها والتي شهدت تغييرات مخطط غير مضافة عمليات قراءة الدفق على موجز بيانات التغيير. راجع البث مع تعيين العمود وتغييرات المخطط.
  • في Databricks Runtime 11.3 LTS والإصدارات أدناه، لا يمكنك قراءة موجز بيانات التغيير للجداول مع تمكين تعيين الأعمدة التي واجهت إعادة تسمية العمود أو إسقاطه.

في Databricks Runtime 12.2 LTS والإصدارات الأحدث، يمكنك إجراء قراءات دفعية على موجز بيانات التغيير للجداول مع تمكين تعيين العمود الذي واجه تغييرات مخطط غير مضافة. بدلا من استخدام مخطط أحدث إصدار من الجدول، تستخدم عمليات القراءة مخطط الإصدار النهائي من الجدول المحدد في الاستعلام. لا تزال الاستعلامات تفشل إذا كان نطاق الإصدار المحدد يمتد على تغيير مخطط غير إضافي.