بيانات تدفق الاستعلام
يمكنك استخدام Azure Databricks للاستعلام عن مصادر البيانات المتدفقة باستخدام Structured Streaming. يوفر Azure Databricks دعما واسع النطاق لأحمال عمل الدفق في Python وSc scala، ويدعم معظم وظائف الدفق المنظم باستخدام SQL.
توضح الأمثلة التالية استخدام متلقي ذاكرة للفحص اليدوي للبيانات المتدفقة أثناء التطوير التفاعلي في دفاتر الملاحظات. بسبب حدود إخراج الصف في واجهة مستخدم دفتر الملاحظات، قد لا تلاحظ قراءة جميع البيانات بواسطة استعلامات الدفق. في أحمال عمل الإنتاج، يجب تشغيل استعلامات الدفق فقط عن طريق كتابتها إلى جدول هدف أو نظام خارجي.
إشعار
يقتصر دعم SQL للاستعلامات التفاعلية على تدفق البيانات على دفاتر الملاحظات التي تعمل على الحوسبة لجميع الأغراض. يمكنك أيضا استخدام SQL عند الإعلان عن جداول الدفق في Databricks SQL أو Delta Live Tables. راجع تحميل البيانات باستخدام جداول الدفق في Databricks SQL وما هي جداول Delta Live؟.
الاستعلام عن البيانات من أنظمة الدفق
يوفر Azure Databricks قراء بيانات متدفقة لأنظمة الدفق التالية:
- Kafka
- Kinesis
- PubSub
- نباض
يجب توفير تفاصيل التكوين عند تهيئة الاستعلامات مقابل هذه الأنظمة، والتي تختلف اعتمادا على البيئة التي تم تكوينها والنظام الذي تختار القراءة منه. راجع تكوين مصادر بيانات الدفق.
تتضمن أحمال العمل الشائعة التي تتضمن أنظمة الدفق استيعاب البيانات إلى مستودع التخزين ومعالجة الدفق إلى تخزين البيانات إلى الأنظمة الخارجية. لمزيد من المعلومات حول تدفق أحمال العمل، راجع البث على Azure Databricks.
توضح الأمثلة التالية قراءة دفق تفاعلية من Kafka:
Python
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
الاستعلام عن جدول كقراءة دفق
ينشئ Azure Databricks جميع الجداول باستخدام Delta Lake بشكل افتراضي. عند إجراء استعلام دفق مقابل جدول Delta، يلتقط الاستعلام تلقائيا سجلات جديدة عند تثبيت إصدار من الجدول. بشكل افتراضي، تتوقع الاستعلامات المتدفقة أن تحتوي جداول المصدر على سجلات ملحقة فقط. إذا كنت بحاجة إلى العمل مع دفق البيانات التي تحتوي على تحديثات وحذف، توصي Databricks باستخدام جداول Delta Live و APPLY CHANGES INTO
. راجع واجهات برمجة تطبيقات APPLY CHANGES: تبسيط التقاط بيانات التغيير باستخدام Delta Live Tables.
توضح الأمثلة التالية إجراء قراءة تدفق تفاعلية من جدول:
Python
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
الاستعلام عن البيانات في تخزين كائن السحابة باستخدام "المحمل التلقائي"
يمكنك دفق البيانات من تخزين الكائنات السحابية باستخدام أداة التحميل التلقائي، موصل بيانات سحابة Azure Databricks. يمكنك استخدام الموصل مع الملفات المخزنة في وحدات تخزين كتالوج Unity أو مواقع تخزين الكائنات السحابية الأخرى. توصي Databricks باستخدام وحدات التخزين لإدارة الوصول إلى البيانات في تخزين الكائنات السحابية. راجع الاتصال بمصادر البيانات.
يعمل Azure Databricks على تحسين هذا الموصل لاستيعاب دفق البيانات في تخزين الكائنات السحابية المخزنة في تنسيقات شائعة منظمة وشبه منظمة وغير منظمة البنية. توصي Databricks بتخزين البيانات التي تم استيعابها بتنسيق أولي تقريبا لزيادة معدل النقل وتقليل فقدان البيانات المحتمل بسبب السجلات التالفة أو تغييرات المخطط.
لمزيد من التوصيات حول استيعاب البيانات من تخزين الكائنات السحابية، راجع استيعاب البيانات في مستودع Databricks.
توضح الأمثلة التالية قراءة تدفق تفاعلية من دليل ملفات JSON في وحدة تخزين:
Python
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')