مشاركة عبر


استخدم Apache Flink مع Azure Event Hubs لـ Apache Kafka

يوضح لك هذا البرنامج التعليمي كيفية توصيل Apache Flink بمركز أحداث دون تغيير عملاء البروتوكول أو تشغيل مجموعاتك الخاصة. لمزيد من المعلومات حول دعم مراكز الأحداث لبروتوكول مستهلك Apache Kafka، راجع مراكز الأحداث ل Apache Kafka.

في هذا البرنامج التعليمي، تتعلم كيفية:

  • إنشاء مساحة اسم مراكز الأحداث
  • استنساخ مثال للمشروع
  • تشغيل منتج Flink
  • تشغيل مستهلك Flink

إشعار

تلك العينة متاحة على GitHub

المتطلبات الأساسية

لإكمال هذا البرنامج التعليمي، تأكد من توفر المتطلبات الأساسية التالية لديك:

إنشاء مساحة اسم مراكز الأحداث

مطلوب مساحة اسم Event Hubs للإرسال أو الاستلام من أي خدمة من خدمات Event Hubs. راجع إنشاء مركز أحداث للحصول على إرشادات لإنشاء مساحة اسم ومركز أحداث. تأكد من نسخ سلسلة اتصال Event Hubs لاستخدامها لاحقاً.

استنساخ مثال للمشروع

الآن بعد أن أصبح لديك "مراكز الأحداث" سلسلة الاتصال، استنسخ مستودع Azure Event Hubs ل Kafka وانتقل إلى flink المجلد الفرعي:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink

باستخدام مثال منتج Flink المتوفر، أرسل رسائل إلى خدمة Event Hubs.

قم بتوفير نقطة نهاية Event Hubs Kafka

producer.config

bootstrap.servers قم بتحديث قيم producer/src/main/resources/producer.config و sasl.jaas.config لتوجيه المنتج إلى نقطة نهاية Event Hubs Kafka بالمصادقة الصحيحة.

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

هام

استبدل {YOUR.EVENTHUBS.CONNECTION.STRING} بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

قم بتشغيل المنتج من سطر الأوامر

لتشغيل المنتج من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"

سيبدأ المنتج الآن في إرسال الأحداث إلى مركز الأحداث في الموضوع test وطباعة الأحداث إلى stdout.

باستخدام مثال المستهلك المقدم، استقبل الرسائل من مركز الحدث.

قم بتوفير نقطة نهاية Event Hubs Kafka

consumer.config

bootstrap.servers قم بتحديث قيم consumer/src/main/resources/consumer.config و sasl.jaas.config لتوجيه المستهلك إلى نقطة نهاية Kafka لمراكز الأحداث بالمصادقة الصحيحة.

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

هام

استبدل {YOUR.EVENTHUBS.CONNECTION.STRING} بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

قم بتشغيل المستهلك من سطر الأوامر

لتشغيل المستهلك من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"

إذا كان مركز الأحداث يحتوي على أحداث (على سبيل المثال، إذا كان المنتج قيد التشغيل أيضا)، فسيبدأ المستهلك الآن في تلقي الأحداث من الموضوع test.

راجع دليل موصل Kafka الخاص ب Flink للحصول على مزيد من المعلومات التفصيلية حول توصيل Flink ب Kafka.

الخطوات التالية

لمعرفة المزيد حول Event Hubs for Kafka، راجع المقالات التالية: