مشاركة عبر


استخدام مخطط JSON مع تطبيقات Apache Kafka

يرشدك هذا البرنامج التعليمي خلال سيناريو تستخدم فيه مخططات JSON لتسلسل الحدث وإلغاء تسلسله باستخدام Azure Schema Registry في مراكز الأحداث.

في حالة الاستخدام هذه، يستخدم تطبيق منتج Kafka مخطط JSON المخزن في Azure Schema Registry، لتسلسل الحدث ونشره إلى مركز أحداث/موضوع Kafka في مراكز أحداث Azure. يقوم مستهلك Kafka بإلغاء تسلسل الأحداث التي يستهلكها من مراكز الأحداث. لذلك يستخدم معرف المخطط للحدث ومخطط JSON، المخزن في Azure Schema Registry. رسم تخطيطي يوضح تسلسل المخطط/إلغاء التسلسل لتطبيقات Kafka باستخدام مخطط JSON.

المتطلبات الأساسية

إذا كنت مستخدماً جديداً لـ Azure Event Hubs، فراجع نظرة عامة على Event Hubs قبل إجراء هذا التشغيل السريع.

تحتاج إلى المتطلبات الأساسية التالية لإكمال هذا التشغيل السريع:

إنشاء مركز أحداث

اتبع الإرشادات من التشغيل السريع: إنشاء مساحة اسم مراكز الأحداث ومركز أحداث لإنشاء مساحة اسم مراكز الأحداث ومركز أحداث. بعد ذلك، اتبع الإرشادات من الحصول على سلسلة الاتصال للحصول على سلسلة الاتصال إلى مساحة اسم Event Hubs.

دون الإعدادات التالية التي تستخدمها في التشغيل السريع الحالي:

  • سلسلة الاتصال لمساحة اسم "مراكز الأحداث"
  • اسم مركز الحدث

إنشاء مخطط

اتبع الإرشادات من إنشاء المخططات باستخدام سجل المخطط لإنشاء مجموعة مخطط ومخطط.

  1. إنشاء مجموعة مخطط تُسمى contoso-sg باستخدام مدخل "سجل المخطط". استخدم مخطط JSON كنوع التسلسل.

  2. في مجموعة المخطط هذه، قم بإنشاء مخطط JSON جديد باسم المخطط: Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice باستخدام محتوى المخطط التالي.

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

تسجيل تطبيق للوصول إلى سجل المخطط

يمكنك استخدام معرف Microsoft Entra لتخويل تطبيق منتج Kafka والمستهلك للوصول إلى موارد Azure Schema Registry. لتمكينه، تحتاج إلى تسجيل تطبيق العميل الخاص بك مع مستأجر Microsoft Entra من مدخل Microsoft Azure.

لتسجيل تطبيق Microsoft Entra المسمى example-app ، راجع تسجيل التطبيق الخاص بك مع مستأجر Microsoft Entra.

  • tenant.id - تعيين معرف المستأجر للتطبيق
  • client.id - تعيين معرف العميل للتطبيق
  • client.secret - تعيين سر العميل للمصادقة

وإذا كنت تستخدم الهوية المدارة، فستحتاج إلى:

  • use.managed.identity.credential - يشير إلى أنه يجب استخدام بيانات اعتماد MSI، يجب استخدامها للجهاز الظاهري الذي يدعم MSI
  • managed.identity.clientId - إذا تم تحديده، فإنه يبني بيانات اعتماد MSI مع معرف العميل المحدد managed.identity.resourceId - إذا تم تحديده، فإنه ينشئ بيانات اعتماد MSI مع معرف مورد معين

إضافة مستخدم إلى دور قارئ سجل المخطط

أضف حساب المستخدم الخاص بك إلى دور قارئ سجل المخطط على مستوى مساحة الاسم. يمكنك أيضا استخدام دور مساهم سجل المخطط، ولكن هذا ليس ضروريا لهذا التشغيل السريع.

  1. في صفحة Event Hubs Namespace ، حدد Access control (IAM) في القائمة اليسرى.
  2. في صفحة Access control (IAM)، حدد + Add ->Add role assignment في القائمة.
  3. في صفحة نوع الواجب، حدد التالي.
  4. في صفحة Roles ، حدد Schema Registry Reader، ثم حدد Next في أسفل الصفحة.
  5. استخدم الارتباط + Select members لإضافة example-app التطبيق الذي قمت بإنشائه في الخطوة السابقة إلى الدور، ثم حدد Next.
  6. في صفحة Review + assign ، حدد Review + assign.

تحديث تكوين تطبيق العميل لتطبيقات Kafka

تحتاج إلى تحديث تكوين العميل لتطبيقات منتج Kafka والمستهلك مع تفاصيل تطبيق Microsoft Entra ومعلومات سجل المخطط.

لتحديث تكوين منتج Kafka، انتقل إلى azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. قم بتحديث تكوين تطبيق Kafka في src/main/resources/app.properties باتباع دليل البدء السريع ل Kafka لمراكز الأحداث.

  2. قم بتحديث تفاصيل التكوين للمنتج في src/main/resources/app.properties باستخدام التكوين ذي الصلة بسجل المخطط وتطبيق Microsoft Entra الذي قمت بإنشائه في الخطوة السابقة كما يلي:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. اتبع نفس الإرشادات وقم بتحديث تكوين azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer أيضا.

  4. لكل من تطبيقات منتج Kafka والمستهلك، يتم استخدام مخطط JSON التالي:

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

استخدام منتج Kafka مع التحقق من صحة مخطط JSON

لتشغيل تطبيق منتج Kafka، انتقل إلى azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. يمكنك تشغيل تطبيق المنتج بحيث يمكنه إنتاج سجلات محددة لمخطط JSON أو سجلات عامة. بالنسبة لوضع سجلات معينة، تحتاج أولا إلى إنشاء الفئات مقابل مخطط المنتج باستخدام أمر maven التالي:

    mvn generate-sources
    
  2. ثم يمكنك تشغيل تطبيق المنتج باستخدام الأوامر التالية.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. عند التنفيذ الناجح لتطبيق المنتج، فإنه يطالبك باختيار سيناريو المنتج. لهذا التشغيل السريع، يمكنك اختيار الخيار 1 - إنتاج SpecificRecords.

    Enter case number:
    1 - produce SpecificRecords
    
  4. عند تسلسل البيانات ونشرها بنجاح، يجب أن تشاهد سجلات وحدة التحكم التالية في تطبيق المنتج الخاص بك:

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

استخدام مستهلك Kafka مع التحقق من صحة مخطط JSON

لتشغيل تطبيق مستهلك Kafka، انتقل إلى azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.

  1. يمكنك تشغيل تطبيق المستهلك بحيث يمكنه استهلاك سجلات محددة لمخطط JSON أو سجلات عامة. بالنسبة لوضع سجلات معينة، تحتاج أولا إلى إنشاء الفئات مقابل مخطط المنتج باستخدام أمر maven التالي:

    mvn generate-sources
    
  2. ثم يمكنك تشغيل تطبيق المستهلك باستخدام الأمر التالي.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. عند التنفيذ الناجح لتطبيق المستهلك، فإنه يطالبك باختيار سيناريو المنتج. لهذا التشغيل السريع، يمكنك اختيار الخيار 1 - استهلاك SpecificRecords.

    Enter case number:
    1 - consume SpecificRecords
    
  4. عند استهلاك البيانات وإلغاء تسلسلها بنجاح، يجب أن تشاهد سجلات وحدة التحكم التالية في تطبيق المنتج الخاص بك:

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

تنظيف الموارد

احذف مساحة اسم مراكز الأحداث أو احذف مجموعة الموارد التي تحتوي على مساحة الاسم.