استخدم Apache Flink مع Azure Event Hubs لـ Apache Kafka
يوضح لك هذا البرنامج التعليمي كيفية توصيل Apache Flink بمركز أحداث دون تغيير عملاء البروتوكول أو تشغيل مجموعاتك الخاصة. لمزيد من المعلومات حول دعم مراكز الأحداث لبروتوكول مستهلك Apache Kafka، راجع مراكز الأحداث ل Apache Kafka.
في هذا البرنامج التعليمي، تتعلم كيفية:
- إنشاء مساحة اسم مراكز الأحداث
- استنساخ مثال للمشروع
- تشغيل منتج Flink
- تشغيل مستهلك Flink
إشعار
تلك العينة متاحة على GitHub
المتطلبات الأساسية
لإكمال هذا البرنامج التعليمي، تأكد من توفر المتطلبات الأساسية التالية لديك:
- اقرأ مقالة مراكز الأحداث لمقالة Apache Kafka.
- اشتراك Azure. إذا لم يكن لديك حساب، فأنشئ حسابًا مجانيًّا قبل أن تبدأ.
-
أداة تطوير Java (JDK) 1.7+
- على Ubuntu، شغّل
apt-get install default-jdk
لتثبيت JDK. - تأكد من ضبط متغير بيئة التشغيل JAVA_HOME ليشير إلى المجلد حيث تم تركيب JDK.
- على Ubuntu، شغّل
-
تنزيل أرشيف Maven ثنائي وتثبيته
- على Ubuntu، يمكنك تشغيل
apt-get install maven
لتثبيت Maven.
- على Ubuntu، يمكنك تشغيل
-
بوابه
- على Ubuntu، يمكنك تشغيل
sudo apt-get install git
لتثبيت Git.
- على Ubuntu، يمكنك تشغيل
إنشاء مساحة اسم مراكز الأحداث
مطلوب مساحة اسم Event Hubs للإرسال أو الاستلام من أي خدمة من خدمات Event Hubs. راجع إنشاء مركز أحداث للحصول على إرشادات لإنشاء مساحة اسم ومركز أحداث. تأكد من نسخ سلسلة اتصال Event Hubs لاستخدامها لاحقاً.
استنساخ مثال للمشروع
الآن بعد أن أصبح لديك "مراكز الأحداث" سلسلة الاتصال، استنسخ مستودع Azure Event Hubs ل Kafka وانتقل إلى flink
المجلد الفرعي:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink
تشغيل منتج Flink
باستخدام مثال منتج Flink المتوفر، أرسل رسائل إلى خدمة Event Hubs.
قم بتوفير نقطة نهاية Event Hubs Kafka
producer.config
bootstrap.servers
قم بتحديث قيم producer/src/main/resources/producer.config
و sasl.jaas.config
لتوجيه المنتج إلى نقطة نهاية Event Hubs Kafka بالمصادقة الصحيحة.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
هام
استبدل {YOUR.EVENTHUBS.CONNECTION.STRING}
بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
قم بتشغيل المنتج من سطر الأوامر
لتشغيل المنتج من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"
سيبدأ المنتج الآن في إرسال الأحداث إلى مركز الأحداث في الموضوع test
وطباعة الأحداث إلى stdout.
تشغيل مستهلك Flink
باستخدام مثال المستهلك المقدم، استقبل الرسائل من مركز الحدث.
قم بتوفير نقطة نهاية Event Hubs Kafka
consumer.config
bootstrap.servers
قم بتحديث قيم consumer/src/main/resources/consumer.config
و sasl.jaas.config
لتوجيه المستهلك إلى نقطة نهاية Kafka لمراكز الأحداث بالمصادقة الصحيحة.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
هام
استبدل {YOUR.EVENTHUBS.CONNECTION.STRING}
بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
قم بتشغيل المستهلك من سطر الأوامر
لتشغيل المستهلك من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"
إذا كان مركز الأحداث يحتوي على أحداث (على سبيل المثال، إذا كان المنتج قيد التشغيل أيضا)، فسيبدأ المستهلك الآن في تلقي الأحداث من الموضوع test
.
راجع دليل موصل Kafka الخاص ب Flink للحصول على مزيد من المعلومات التفصيلية حول توصيل Flink ب Kafka.
الخطوات التالية
لمعرفة المزيد حول Event Hubs for Kafka، راجع المقالات التالية: