تفاعل مع مجموعات Apache Kafka في Azure HDInsight باستخدام وكيل REST
يتيح لك Kafka REST Proxy التفاعل مع نظام مجموعة Kafka عبر واجهة برمجة تطبيقات REST عبر HTTPS. يعني هذا الإجراء أن عملاء Kafka الخاصين بك يمكن أن يكونوا خارج شبكتك الافتراضية. يمكن للعملاء إجراء مكالمات HTTPS بسيطة وآمنة إلى مجموعة Kafka بدلاً من الاعتماد على مكتبات Kafka. توضح هذه المقالة كيفية إنشاء نظام مجموعة Kafka ممكن لوكيل REST. يوفر أيضاً نموذج رمز يوضح كيفية إجراء مكالمات إلى وكيل REST.
مرجع REST API
للعمليات التي تدعمها واجهة برمجة تطبيقات Kafka REST API، راجع مرجع واجهة برمجة تطبيقات وكيل HDInsight Kafka REST .
خلفية
للحصول على المواصفات الكاملة للعمليات التي تدعمها واجهة برمجة التطبيقات، راجع Apache Kafka REST Proxy API .
نقطة نهاية وكيل REST
يؤدي إنشاء مجموعة HDInsight Kafka باستخدام وكيل REST إلى إنشاء نقطة نهاية عامة جديدة لمجموعتك، والتي يمكنك العثور عليها في مجموعة HDInsight الخصائص على مدخل Microsoft Azure.
Security
الوصول إلى وكيل Kafka REST المدار مع مجموعات أمان Microsoft Entra. عند إنشاء نظام مجموعة Kafka، قم بتوفير مجموعة أمان Microsoft Entra مع الوصول إلى نقطة نهاية REST. يجب أن يتم تسجيل عملاء Kafka الذين يحتاجون إلى الوصول إلى وكيل REST في هذه المجموعة بواسطة مالك المجموعة. يمكن لمالك المجموعة التسجيل عبر البوابة أو عبر PowerShell.
بالنسبة لطلبات نقطة نهاية وكيل REST، يجب أن تحصل تطبيقات العميل على رمز OAuth المميز. يستخدم الرمز المميز للتحقق من عضوية مجموعة الأمان. البحث عن نموذج تطبيق العميل يوضح كيفية الحصول على رمز OAuth المميز. يمرر تطبيق العميل رمز OAuth المميز في طلب HTTPS إلى وكيل REST.
إشعار
راجع إدارة الوصول إلى التطبيقات والموارد باستخدام مجموعات Microsoft Entra، لمعرفة المزيد حول مجموعات أمان Microsoft Entra. لمزيد من المعلومات حول كيفية عمل رموز OAuth المميزة، راجع تخويل الوصول إلى تطبيقات الويب Microsoft Entra باستخدام تدفق منح التعليمات البرمجية OAuth 2.0.
وكيل Kafka REST مع مجموعات أمان الشبكة
إذا قمت بإحضار VNet الخاص بك والتحكم في حركة مرور الشبكة باستخدام مجموعات أمان الشبكة، فقم بالسماح بحركة المرور الواردة على المنفذ 9400 بالإضافة إلى المنفذ 443. وهذا يضمن إمكانية الوصول إلى خادم وكيل Kafka REST.
المتطلبات الأساسية
تسجيل تطبيق باستخدام معرف Microsoft Entra. تستخدم تطبيقات العميل التي تكتبها للتفاعل مع وكيل Kafka REST معرف التطبيق والسر للمصادقة على Azure.
إنشاء مجموعة أمان Microsoft Entra. أضف التطبيق الذي قمت بتسجيله باستخدام معرف Microsoft Entra إلى مجموعة الأمان كعضو في المجموعة. سيتم استخدام مجموعة الأمان هذه للتحكم في التطبيقات التي تسمح بالتفاعل مع وكيل REST. لمزيد من المعلومات حول إنشاء مجموعات Microsoft Entra، راجع إنشاء مجموعة أساسية وإضافة أعضاء باستخدام معرف Microsoft Entra.
تحقق من أن المجموعة من النوع الأمان .
تحقق من أن التطبيق عضو في المجموعة.
إنشاء كتلة Kafka مع تمكين وكيل REST
تستخدم الخطوات مدخل Microsoft Azure. للحصول على مثال باستخدام Azure CLI، راجع إنشاء مجموعة وكيل Apache Kafka REST باستخدام Azure CLI .
أثناء سير عمل إنشاء مجموعة Kafka، في علامة التبويب الأمان + الشبكات ، حدد الخيار تمكين خادم وكيل Kafka REST.
انقر فوق تحديد مجموعة الأمان . من قائمة مجموعات الأمان، حدد مجموعة الأمان التي تريد أن يكون لها وصول إلى وكيل REST. يمكنك استخدام مربع البحث للعثور على مجموعة الأمان المناسبة. انقر فوق الزر تحديد في الجزء السفلي.
أكمل الخطوات المتبقية لإنشاء المجموعة الخاصة بك كما هو موضح في إنشاء مجموعة Apache Kafka في Azure HDInsight باستخدام مدخل Microsoft Azure .
بمجرد إنشاء الكتلة، انتقل إلى خصائص الكتلة لتسجيل عنوان URL لوكيل Kafka REST.
نموذج تطبيق العميل
يمكنك استخدام التعليمات البرمجية ل Python للتفاعل مع وكيل REST على نظام مجموعة Kafka. لاستخدام نموذج التعليمات البرمجية، اتبع الخطوات التالية:
احفظ نموذج التعليمات البرمجية على جهاز مثبت عليه Python.
ثبت تبعيات Python المطلوبة بتنفيذ
pip3 install msal
.قم بتعديل قسم الرمز قم بتكوين هذه الخصائص وقم بتحديث الخصائص التالية لبيئتك:
الخاصية الوصف معرف المستأجر مستأجر Azure حيث يوجد اشتراكك. معرف العميل معرّف التطبيق الذي قمت بتسجيله في مجموعة الأمان. سر العميل سر التطبيق الذي قمت بتسجيله في مجموعة الأمان. بتسجيله في مجموعة الأمان. Kafkarest_endpoint احصل على هذه القيمة من علامة التبويب الخصائص في نظرة عامة على المجموعة كما هو موضح في قسم النشر . يجب أن يكون بالتنسيق التالي - https://<clustername>-kafkarest.azurehdinsight.net
من سطر الأوامر، قم بتنفيذ ملف python بتنفيذ
sudo python3 <filename.py>
يقوم هذا الرمز بالإجراء التالي:
- إحضار رمز OAuth المميز من معرف Microsoft Entra.
- يوضح كيفية تقديم طلب إلى وكيلKafka REST.
لمزيد من المعلومات حول الحصول على رموز OAuth المميزة في Python، راجع فئة Python AuthenticationContext . قد ترى تأخيرا أثناء topics
عدم إنشاء ذلك أو حذفه من خلال وكيل Kafka REST تنعكس هناك. هذا التأخير بسبب تحديث ذاكرة التخزين المؤقت. تم تحسين حقل القيمة لـ Producer API. بات يقبل الآن عناصر JSON وأي شكل متسلسل.
#Required Python packages
#pip3 install msal
import json
import msal
import random
import requests
import string
import sys
import time
def get_random_string():
letters = string.ascii_letters
random_string = ''.join(random.choice(letters) for i in range(7))
return random_string
#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#
# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id
app = msal.ConfidentialClientApplication(
client_id , client_secret, authority,
#cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)
# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10
# Print access token
print("Access token: " + accessToken)
# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}' # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'
# Request header
headers = {
'Authorization': 'Bearer ' + accessToken,
'Content-type': 'application/json' # set Content-type to 'application/json'
}
# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")
topics = []
# Create a new topic
# Example of topic config
topic_config = {
"partition_count": 1,
"replication_factor": 1,
"topic_properties": {
"retention.ms": 604800000,
"min.insync.replicas": "1"
}
}
create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)
if response.ok:
while new_topic not in topics:
print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
time.sleep(30)
# List Topic
get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)
response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
topic_list = response.json()
topics = topic_list.get("topics", [])
else:
print("Topic " + new_topic + " was created. Exit.")
sys.exit(1)
# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
"records": [
{
"key": "key1",
"value": "**********" # A string
},
{
"partition": 0,
"value": 5 # An integer
},
{
"value": 3.14 # A floating number
},
{
"value": { # A JSON object
"id": 1,
"name": "HDInsight Kafka REST proxy"
}
},
{
"value": [ # A list of JSON objects
{
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
{
"id": 2,
"name": "HDInsight Kafka REST proxy 2"
},
{
"id": 3,
"name": "HDInsight Kafka REST proxy 3"
}
]
},
{
"value": { # A nested JSON object
"group id": 1,
"HDI Kafka REST": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
"HDI Kafka REST server info": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1",
"servers": [
{
"server id": 1,
"server name": "HDInsight Kafka REST proxy server 1"
},
{
"server id": 2,
"server name": "HDInsight Kafka REST proxy server 2"
},
{
"server id": 3,
"server name": "HDInsight Kafka REST proxy server 3"
}
]
}
}
}
]
}
print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)
# Consume messages from the topic
partition_id = 0
offset = 0
count = 2
while True:
consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
print("Consuming " + str(count) + " messages from offset " + str(offset))
response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)
if response.ok:
messages = response.json()
print("Consumed messages: \n" + json.dumps(messages, indent=2))
next_offset = response.headers.get("NextOffset")
if offset == next_offset or not messages.get("records", []):
print("Consumer caught up with producer. Exit for now...")
break
offset = next_offset
else:
print("Error " + str(response.status_code))
break
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from " + get_partitions_url)
response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))
# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from " + get_partition_url)
response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))
ابحث أدناه عن نموذج آخر حول كيفية الحصول على رمز مميز من وكيل Azure لـ REST باستخدام أمر curl. لاحظ أننا نحتاج إلى scope=https://hib.azurehdinsight.net/.default
المحدد أثناء الحصول على رمز.
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'