مشاركة عبر


معالجة الدفق باستخدام Apache Kafka وAzure Databricks

توضح هذه المقالة كيف يمكنك استخدام Apache Kafka كمصدر أو متلقي عند تشغيل أحمال عمل Structured Streaming على Azure Databricks.

لمزيد من Kafka، راجع وثائق Kafka.

قراءة البيانات من Kafka

فيما يلي مثال لقراءة دفق من Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

يدعم Azure Databricks أيضا دلالات قراءة الدفعات لمصادر بيانات Kafka، كما هو موضح في المثال التالي:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

لتحميل الدفعات المتزايدة، توصي Databricks باستخدام Kafka مع Trigger.AvailableNow. راجع تكوين معالجة الدفعات التزايدية.

في Databricks Runtime 13.3 LTS وما فوق، يوفر Azure Databricks دالة SQL لقراءة بيانات Kafka. يتم دعم الدفق باستخدام SQL فقط في Delta Live Tables أو مع جداول الدفق في Databricks SQL. راجع read_kafka دالة قيم الجدول.

تكوين قارئ Kafka Structured Streaming

يوفر kafka Azure Databricks الكلمة الأساسية بتنسيق بيانات لتكوين الاتصالات ب Kafka 0.10+.

فيما يلي التكوينات الأكثر شيوعا ل Kafka:

هناك طرق متعددة لتحديد الموضوعات التي يجب الاشتراك فيها. يجب توفير واحدة فقط من هذه المعلمات:

خيار قيمة ‏‏الوصف
اشترك قائمة مواضيع مفصولة بفواصل. قائمة المواضيع للاشتراك فيها.
subscribePattern سلسلة Java regex. النمط المستخدم للاشتراك في الموضوع (المواضيع).
تعيين سلسلة {"topicA":[0,1],"topic":[2,4]}JSON . مواضيع محددة يجب استهلاكها.

التكوينات البارزة الأخرى:

خيار القيمة‬ القيمة الافتراضية ‏‏الوصف
kafka.bootstrap.servers قائمة مفصولة بفواصل للمضيف:المنفذ. فارغ [مطلوب] تكوين Kafka bootstrap.servers . إذا وجدت أنه لا توجد بيانات من Kafka، فتحقق من قائمة عناوين الوسيط أولا. إذا كانت قائمة عناوين الوسيط غير صحيحة، فقد لا تكون هناك أي أخطاء. وذلك لأن عميل Kafka يفترض أن الوسطاء سيصبحون متاحين في النهاية وفي حالة إعادة محاولة أخطاء الشبكة إلى الأبد.
failOnDataLoss true أو false. true [اختياري] ما إذا كان يجب فشل الاستعلام عندما يكون من الممكن فقدان البيانات. يمكن أن تفشل الاستعلامات بشكل دائم في قراءة البيانات من Kafka بسبب العديد من السيناريوهات مثل الموضوعات المحذوفة، واقتطاع الموضوع قبل المعالجة، وما إلى ذلك. نحاول تقدير ما إذا كانت البيانات قد فقدت أم لا بشكل متحفظ. في بعض الأحيان يمكن أن يسبب هذا إنذارات خاطئة. قم بتعيين هذا الخيار إلى false إذا لم يعمل كما هو متوقع، أو تريد أن يستمر الاستعلام في المعالجة على الرغم من فقدان البيانات.
minPartitions عدد صحيح >= 0، 0 = معطل. 0 (معطل) [اختياري] الحد الأدنى لعدد الأقسام المراد قراءتها من Kafka. يمكنك تكوين Spark لاستخدام الحد الأدنى العشوائي من الأقسام للقراءة من Kafka باستخدام minPartitions الخيار . عادة ما يحتوي Spark على تعيين 1-1 لموضوع KafkaPartitions إلى أقسام Spark المستهلكة من Kafka. إذا قمت بتعيين minPartitions الخيار إلى قيمة أكبر من Kafka topicPartitions، فسيقسم Spark أقسام Kafka الكبيرة إلى أجزاء أصغر. يمكن تعيين هذا الخيار في أوقات ذروة الأحمال وانحراف البيانات، ومع تراجع الدفق الخاص بك لزيادة معدل المعالجة. يأتي ذلك على حساب تهيئة مستهلكي Kafka في كل مشغل، مما قد يؤثر على الأداء إذا كنت تستخدم SSL عند الاتصال ب Kafka.
kafka.group.id معرف مجموعة مستهلكين Kafka. لم يتم تعيين [اختياري] معرف المجموعة المراد استخدامه أثناء القراءة من Kafka. استخدم هذا بحذر. بشكل افتراضي، يقوم كل استعلام بإنشاء معرف مجموعة فريد لقراءة البيانات. وهذا يضمن أن كل استعلام له مجموعة المستهلكين الخاصة به التي لا تواجه تداخلا من أي مستهلك آخر، وبالتالي يمكن قراءة جميع أقسام الموضوعات المشتركة فيه. في بعض السيناريوهات (على سبيل المثال، التخويل المستند إلى مجموعة Kafka)، قد تحتاج إلى استخدام معرفات مجموعة معتمدة معينة لقراءة البيانات. يمكنك اختياريا تعيين معرف المجموعة. ومع ذلك، قم بذلك بحذر شديد لأنه يمكن أن يسبب سلوكا غير متوقع.

- من المحتمل أن تتداخل الاستعلامات قيد التشغيل المتزامن (كل من الدفعة والتدفق) مع معرف المجموعة نفسه مع بعضها البعض مما يتسبب في قراءة كل استعلام لجزء من البيانات فقط.
- قد يحدث هذا أيضا عند بدء تشغيل/إعادة تشغيل الاستعلامات في تتابع سريع. لتقليل مثل هذه المشكلات، قم بتعيين تكوين session.timeout.ms مستهلك Kafka ليكون صغيرا جدا.
بدء تشغيل مجموعات الأقدم ، الأحدث الأحدث [اختياري] نقطة البدء عند بدء تشغيل استعلام، إما "الأقدم" التي تكون من أقدم الإزاحات، أو سلسلة json تحدد إزاحة بداية لكل TopicPartition. في json، يمكن استخدام -2 كإزاحة للإشارة إلى الأقدم، -1 إلى الأحدث. ملاحظة: بالنسبة للاستعلامات الدفعية، لا يسمح بأحدث (إما ضمنيا أو باستخدام -1 في json). بالنسبة للاستعلامات المتدفقة، ينطبق هذا فقط عند بدء تشغيل استعلام جديد، وسيلتقط هذا البدء دائما من حيث توقف الاستعلام. ستبدأ الأقسام المكتشفة حديثا أثناء الاستعلام في أقرب وقت.

راجع دليل تكامل Structured Streaming Kafka للحصول على تكوينات اختيارية أخرى.

مخطط لسجلات Kafka

مخطط سجلات Kafka هو:

Column النوع
المفتاح binary
قيمة binary
الموضوع سلسلة
القسم العدد الصحيح
إزاحة طويل
الطابع الزمني طويل
نوع الطابع الزمني العدد الصحيح

key يتم دائما إلغاء تسلسل و value كصفائف بايت مع ByteArrayDeserializer. استخدم عمليات DataFrame (مثل cast("string")) لإلغاء تسلسل المفاتيح والقيم بشكل صريح.

كتابة البيانات إلى Kafka

فيما يلي مثال على الكتابة المتدفقة إلى Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

يدعم Azure Databricks أيضا دلالات كتابة الدفعات إلى متلقي بيانات Kafka، كما هو موضح في المثال التالي:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

تكوين كاتب Kafka Structured Streaming

هام

يتضمن Databricks Runtime 13.3 LTS والإصدارات الأحدث إصدارا أحدث من kafka-clients المكتبة التي تمكن عمليات الكتابة المتكررة بشكل افتراضي. إذا كان مصدر Kafka يستخدم الإصدار 2.8.0 أو أقل مع تكوين قوائم التحكم في الوصول، ولكن دون IDEMPOTENT_WRITE تمكين، تفشل الكتابة مع رسالة org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error stateالخطأ .

حل هذا الخطأ عن طريق الترقية إلى الإصدار 2.8.0 من Kafka أو أعلى، أو عن طريق إعداد .option(“kafka.enable.idempotence”, “false”) أثناء تكوين كاتب Structured Streaming.

يتفاعل المخطط المقدم إلى DataStreamWriter مع متلقي Kafka. يمكنك استخدام الحقول التالية:

اسم العمود مطلوب أو اختياري نوع
key اختياري STRING أو BINARY
value مطلوب STRING أو BINARY
headers اختياري ARRAY
topic اختياري (يتم تجاهله إذا topic تم تعيينه كخيار كاتب) STRING
partition اختياري INT

فيما يلي الخيارات الشائعة التي تم تعيينها أثناء الكتابة إلى Kafka:

خيار القيمة‬ القيمة الافتراضية ‏‏الوصف
kafka.boostrap.servers قائمة مفصولة بفواصل <host:port> لا شيء [مطلوب] تكوين Kafka bootstrap.servers .
topic STRING لم يتم تعيين [اختياري] تعيين الموضوع لكافة الصفوف المراد كتابتها. يتجاوز هذا الخيار أي عمود موضوع موجود في البيانات.
includeHeaders BOOLEAN false [اختياري] ما إذا كان يجب تضمين رؤوس Kafka في الصف.

راجع دليل تكامل Structured Streaming Kafka للحصول على تكوينات اختيارية أخرى.

استرداد مقاييس Kafka

يمكنك الحصول على المتوسط والحد الأدنى والحد الأقصى لعدد الإزاحات التي يكون الاستعلام المتدفق خلف أحدث إزاحة متاحة بين جميع الموضوعات المشتركة مع avgOffsetsBehindLatestmaxOffsetsBehindLatestالمقاييس و وminOffsetsBehindLatest. راجع مقاييس القراءة بشكل تفاعلي.

إشعار

متوفر في Databricks Runtime 9.1 وما فوق.

احصل على العدد الإجمالي المقدر لوحدات البايت التي لم تستهلكها عملية الاستعلام من الموضوعات المشتركة عن طريق فحص قيمة estimatedTotalBytesBehindLatest. يستند هذا التقدير إلى الدفعات التي تمت معالجتها في آخر 300 ثانية. يمكن تغيير الإطار الزمني الذي يستند إليه التقدير عن طريق تعيين الخيار bytesEstimateWindowLength إلى قيمة مختلفة. على سبيل المثال، لتعيينه إلى 10 دقائق:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

إذا كنت تقوم بتشغيل الدفق في دفتر ملاحظات، يمكنك مشاهدة هذه المقاييس ضمن علامة التبويب البيانات الأولية في لوحة معلومات تقدم الاستعلام المتدفق:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

استخدام SSL لتوصيل Azure Databricks ب Kafka

لتمكين اتصالات SSL إلى Kafka، اتبع الإرشادات الموجودة في وثائق Confluent التشفير والمصادقة باستخدام SSL. يمكنك توفير التكوينات الموضحة هناك، مسبوقة ب kafka.، كخيارات. على سبيل المثال، يمكنك تحديد موقع مخزن الثقة في الخاصية kafka.ssl.truststore.location.

توصي Databricks بما يلي:

  • تخزين الشهادات الخاصة بك في تخزين كائن السحابة. يمكنك تقييد الوصول إلى الشهادات فقط إلى المجموعات التي يمكنها الوصول إلى Kafka. راجع إدارة البيانات باستخدام كتالوج Unity.
  • تخزين كلمات مرور الشهادة كأسرار في نطاق سري.

يستخدم المثال التالي مواقع تخزين الكائنات وأسرار Databricks لتمكين اتصال SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

توصيل Kafka على HDInsight ب Azure Databricks

  1. إنشاء مجموعة HDInsight Kafka.

    راجع الاتصال ب Kafka على HDInsight من خلال شبكة Azure الظاهرية للحصول على إرشادات.

  2. تكوين وسطاء Kafka للإعلان عن العنوان الصحيح.

    اتبع الإرشادات الواردة في تكوين Kafka للإعلان عن IP. إذا كنت تدير Kafka بنفسك على أجهزة Azure الظاهرية، فتأكد من advertised.listeners تعيين تكوين الوسطاء إلى IP الداخلي للمضيفين.

  3. إنشاء مجموعة Azure Databricks.

  4. نظير نظام مجموعة Kafka إلى نظام مجموعة Azure Databricks.

    اتبع الإرشادات الموجودة في الشبكات الظاهرية النظيرة.

مصادقة كيان الخدمة مع معرف Microsoft Entra ومراكز أحداث Azure

يدعم Azure Databricks مصادقة وظائف Spark مع خدمات مراكز الأحداث. تتم هذه المصادقة عبر OAuth باستخدام معرف Microsoft Entra.

رسم تخطيطي لمصادقة AAD

يدعم Azure Databricks مصادقة معرف Microsoft Entra مع معرف العميل والسر في بيئات الحوسبة التالية:

  • Databricks Runtime 12.2 LTS وما فوق على الحساب المكون مع وضع وصول مستخدم واحد.
  • Databricks Runtime 14.3 LTS وما فوق على الحساب المكون مع وضع الوصول المشترك.
  • البنية الأساسية لبرنامج ربط العمليات التجارية ل Delta Live Tables التي تم تكوينها بدون كتالوج Unity.

لا يدعم Azure Databricks مصادقة معرف Microsoft Entra مع شهادة في أي بيئة حساب، أو في خطوط أنابيب Delta Live Tables المكونة باستخدام كتالوج Unity.

لا تعمل هذه المصادقة على المجموعات المشتركة أو على جداول Delta Live كتالوج Unity.

تكوين موصل Kafka المتدفق المنظم

لإجراء المصادقة باستخدام معرف Microsoft Entra، ستحتاج إلى القيم التالية:

  • معرف مستأجر. يمكنك العثور على هذا في علامة التبويب خدمات معرف Microsoft Entra.

  • معرف العميل (المعروف أيضا باسم معرف التطبيق).

  • سر العميل. بمجرد حصولك على هذا، يجب إضافته كسر إلى مساحة عمل Databricks. لإضافة هذا السر، راجع إدارة البيانات السرية.

  • موضوع EventHubs. يمكنك العثور على قائمة بالمواضيع في قسم مراكز الأحداث ضمن قسم الكيانات في صفحة مساحة اسم مراكز الأحداث المحددة. للعمل مع مواضيع متعددة، يمكنك تعيين دور IAM على مستوى مراكز الأحداث.

  • خادم EventHubs. يمكنك العثور على هذا في صفحة النظرة العامة لمساحة اسم مراكز الأحداث المحددة:

    مساحة اسم

بالإضافة إلى ذلك، لاستخدام معرف إنترا، نحتاج إلى إخبار Kafka باستخدام آلية OAuth SASL (SASL هو بروتوكول عام، وOAuth هو نوع من "آلية" SASL):

  • kafka.security.protocol يجب أن يكون SASL_SSL
  • kafka.sasl.mechanism يجب أن يكون OAUTHBEARER
  • kafka.sasl.login.callback.handler.class يجب أن يكون اسما مؤهلا بالكامل لفئة Java بقيمة kafkashaded لمعالج رد اتصال تسجيل الدخول لفئة Kafka المظللة. راجع المثال التالي للفئة الدقيقة.

مثال

بعد ذلك، دعونا ننظر إلى مثال قيد التشغيل:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

معالجة الأخطاء المحتملة

  • خيارات الدفق غير مدعومة.

    إذا حاولت استخدام آلية المصادقة هذه في مسار Delta Live Tables الذي تم تكوينه باستخدام كتالوج Unity، فقد تتلقى الخطأ التالي:

    خطأ تدفق غير معتمد

    لحل هذا الخطأ، استخدم تكوين حساب معتمد. راجع مصادقة كيان الخدمة مع معرف Microsoft Entra ومراكز أحداث Azure.

  • فشل إنشاء جديد KafkaAdminClient.

    هذا خطأ داخلي يطرحه Kafka إذا كان أي من خيارات المصادقة التالية غير صحيح:

    • معرف العميل (المعروف أيضا باسم معرف التطبيق)
    • معرف المستأجر
    • خادم EventHubs

    لحل الخطأ، تحقق من صحة القيم لهذه الخيارات.

    بالإضافة إلى ذلك، قد ترى هذا الخطأ إذا قمت بتعديل خيارات التكوين المتوفرة بشكل افتراضي في المثال (الذي طلب منك عدم تعديله)، مثل kafka.security.protocol.

  • لا توجد سجلات يتم إرجاعها

    إذا كنت تحاول عرض DataFrame أو معالجته ولكن لم تحصل على نتائج، فسترى ما يلي في واجهة المستخدم.

    لا توجد رسالة نتائج

    تعني هذه الرسالة أن المصادقة كانت ناجحة، ولكن EventHubs لم ترجع أي بيانات. بعض الأسباب المحتملة (وإن لم تكن شاملة بأي حال من الأحوال) هي:

    • لقد حددت موضوع EventHubs غير صحيح.
    • خيار تكوين Kafka الافتراضي ل startingOffsets هو latest، ولا تتلقى حاليا أي بيانات من خلال الموضوع حتى الآن. يمكنك تعيين startingOffsetstoearliest لبدء قراءة البيانات بدءا من أقرب إزاحات Kafka.