البرنامج التعليمي: تنفيذ نمط التقاط مستودع بيانات لتحديث جدول Databricks Delta
يوضح لك هذا البرنامج التعليمي كيفية التعامل مع الأحداث في حساب تخزين يحتوي على مساحة أسماء هرمية.
سوف تنشئ حلاً صغيراً يتيح للمستخدم ملء جدول Databricks Delta من خلال تحميل ملف قيم مفصولة بفاصلة (csv) يصف أمر المبيعات. سوف تنشئ هذا الحل من خلال توصيل اشتراك "شبكة الأحداث" ودالة Azure ووظيفة في Azure Databricks.
في هذا البرنامج التعليمي، سوف نتعلم:
- إنشاء اشتراك "شبكة الأحداث" الذي يستدعي دالة Azure.
- إنشاء دالة Azure التي تتلقى الإخطار من الحدث، ثم تعمل على تشغيل الوظيفة في Azure Databricks.
- إنشاء وظيفة Databricks التي تُدرج أمر العميل في جدول Databricks Delta الموجود في حساب التخزين.
سوف تنشئ هذا الحل بترتيب عكسي، بدءاً من مساحة عمل Azure Databricks.
المتطلبات الأساسية
إنشاء حساب تخزين يحتوي على مساحة اسم هرمية (Azure Data Lake Storage). يستخدم هذا البرنامج التعليمي حساب تخزين باسم
contosoorders
.التحقق من تعيين دور مساهم بيانات كائن التخزين الثنائي كبير الحجم لحساب المستخدم لديك.
أنشئ كيان خدمة، وأنشئ سر العميل، ثم امنح كيان الخدمة حق الوصول إلى حساب التخزين.
راجع البرنامج التعليمي: الاتصال ب Azure Data Lake Storage (الخطوات من 1 إلى 3). بعد إكمال هذه الخطوات، تأكد من لصق معرف المستأجر ومعرف التطبيق والقيم السرية للعميل في ملف نصي. ستحتاج إليها قريباً.
في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.
إنشاء أمر مبيعات
أولاً، إنشاء ملف csv يصف أمر المبيعات، ثم تحميل هذا الملف إلى حساب التخزين. ستستخدم بيانات هذا الملف لاحقاً لملء الصف الأول في جدول Databricks Delta الخاص بنا.
يرجى الانتقال إلى حساب التخزين الجديد في بوابة Azure.
حدد Storage browser-Blob> containers-Add> container وأنشئ حاوية جديدة باسم data.
في حاوية البيانات ، قم بإنشاء دليل يسمى input.
ألصق النص التالي في محرر النص.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
احفظ هذا الملف في جهاز الكمبيوتر المحلي الخاص بك وامنحه الاسم data.csv.
في مستعرض التخزين، قم بتحميل هذا الملف إلى مجلد الإدخال .
إنشاء وظيفة في Azure Databricks
في هذا القسم، ستقوم بتنفيذ هذه المهام:
- أنشئ مساحة عمل Azure Databricks.
- إنشاء دفتر ملاحظات.
- إنشاء جدول Databricks Delta وملؤه.
- إضافة التعليمة البرمجية التي تدرج صفوفاً في جدول Databricks Delta.
- إنشاء وظيفة.
إنشاء مساحة عمل Azure Databricks
في هذا القسم، سوف تنشئ مساحة عمل Azure Databricks باستخدام مدخل Microsoft Azure.
أنشئ مساحة عمل Azure Databricks. قم بتسمية مساحة
contoso-orders
العمل هذه . راجع إنشاء مساحة عمل Azure Databricks.إنشاء نظام مجموعة. قم بتسمية نظام المجموعة
customer-order-cluster
. راجع إنشاء نظام مجموعة.إنشاء دفتر ملاحظات. قم بتسمية دفتر الملاحظات
configure-customer-table
واختر Python كلغة افتراضية لدفتر الملاحظات. راجع إنشاء دفتر ملاحظات.
إنشاء جدول Databricks Delta وملؤه
في دفتر الملاحظات الذي أنشأته، انسخ كتلة التعليمات البرمجية الآتية وألصقها في أول خلية، لكن دون تشغيل هذه التعليمة البرمجية بعد.
استبدل قيم العنصر النائب
appId
وpassword
وtenant
في كتلة التعليمات البرمجية هذه بالقيم التي جمعتها في أثناء إكمال المتطلبات الأساسية لهذا البرنامج التعليمي.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'
تنشئ هذه التعليمة البرمجية عنصر واجهة مستخدماً باسم source_file. لاحقاً، ستنشئ دالة Azure التي تستدعي هذه التعليمة البرمجية وتمرر مسار الملف إلى عنصر واجهة المستخدم هذا. كما تصادق هذه التعليمة البرمجية كيان الخدمة الخاص بك باستخدام حساب التخزين، وينشئ بعض المتغيرات التي ستستخدمها في خلايا أخرى.
إشعار
في إعداد "الإنتاج"، ضع في اعتبارك تخزين مفتاح المصادقة في Azure Databricks. ثم أضف مفتاح بحث إلى كتلة التعليمات البرمجية الخاصة بك بدلاً من مفتاح المصادقة.
على سبيل المثال، بدلاً من استخدام سطر التعليمات البرمجية هذا:spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
، سوف تستخدم سطر التعليمات البرمجية الآتي:spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
.
بعد الانتهاء من هذا البرنامج التعليمي، راجع مقالة Azure Data Lake Storage على موقع Azure Databricks على الويب للاطلاع على أمثلة على هذا النهج.اضغط على المفاتيح SHIFT + ENTER لتشغيل التعليمات البرمجية في هذه الكتلة.
انسخ كتلة التعليمات البرمجية الآتية وألصقها في خلية مختلفة، ثم اضغط على مفاتيح SHIFT + ENTER لتشغيل التعليمات البرمجية الموجودة في هذه الكتلة.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))
تنشئ هذه التعليمة البرمجية جدول Databricks Delta في حساب التخزين الخاص بك، ثم تحمل بعض البيانات الأولية من ملف csv الذي حملته سابقاً.
بعد تشغيل كتلة التعليمات البرمجية هذه بنجاح، أزلها من دفتر ملاحظاتك.
إضافة التعليمة البرمجية التي تدرج صفوفاً في جدول Databricks Delta
انسخ كتلة التعليمات البرمجية الآتية وألصقها في خلية مختلفة، لكن دون تشغيل هذه التعليمة البرمجية بعد.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
تُدرج هذه التعليمة البرمجية بيانات في عرض جدول مؤقت باستخدام بيانات من ملف csv. المسار إلى ملف csv هذا يأتي من عنصر واجهة مستخدم الإدخال الذي أنشأته في خطوة سابقة.
انسخ كتلة التعليمات البرمجية التالية والصقها في خلية مختلفة. تدمج هذه التعليمات البرمجية محتويات طريقة عرض الجدول المؤقتة مع جدول Databricks Delta.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
إنشاء مهمة
أنشئ وظيفة "شبكة الأحداث" التي تشغل دفتر الملاحظات الذي أنشأته سابقاً. لاحقاً، ستنشئ دالة Azure التي تشغّل هذه الوظيفة عند تنفيذ حدث.
حدد New-Job>.
امنح الوظيفة اسما، واختر دفتر الملاحظات الذي أنشأته والمجموعة. ثم حدد Create لإنشاء المهمة.
إنشاء وظيفة Azure
إنشاء دالة Azure التي تشغّل الوظيفة.
في مساحة عمل Azure Databricks، انقر فوق اسم مستخدم Azure Databricks في الشريط العلوي، ثم من القائمة المنسدلة، حدد إعدادات المستخدم.
في علامة التبويب Access tokens ، حدد Generate new token.
انسخ الرمز المميز الذي يظهر، ثم انقر فوق تم.
في الزاوية العلوية من مساحة عمل Databricks، اختر رمز الأشخاص، ثم اختر إعدادات المستخدم.
حدد الزر Generate new token، ثم حدد الزر Generate.
تأكد من نسخ الرمز المميز إلى مكان آمن. تحتاج دالة Azure إلى هذا الرمز المميز للمصادقة مع Databricks لتتمكن من تشغيل الوظيفة.
من قائمة مدخل Azure أو الصفحة الرئيسية، حدد Create a resource.
في صفحة New، قم باختيار Compute>Function App.
في علامة التبويب Basics في صفحة Create Function App ، اختر مجموعة موارد، ثم قم بتغيير الإعدادات التالية أو التحقق منها:
الإعداد القيمة اسم تطبيق الوظائف ترتيب contosoorder مكدس وقت التشغيل .NET نشر رمز نظام تشغيل Windows نوع الخطة الاستهلاك (دون خادم) حدد مراجعة + إنشاء، ثم حدد إنشاء.
عند اكتمال النشر، حدد Go to resource لفتح صفحة نظرة عامة على Function App.
في مجموعة الإعدادات ، حدد التكوين.
في صفحة إعدادات التطبيق، اختر زر إعداد تطبيق جديد لإضافة كل إعداد.
أضف الإعدادات الآتية:
اسم الإعداد القيمة DBX_INSTANCE منطقة مساحة عمل databricks الخاصة بك. على سبيل المثال: westus2.azuredatabricks.net
DBX_PAT الرمز المميز للوصول الشخصي الذي أنشأته سابقاً. DBX_JOB_ID محدد الوظيفة قيد التشغيل. حدد حفظ لتثبيت هذه الإعدادات.
في مجموعة Functions ، حدد Functions، ثم حدد Create.
اختر مشغل شبكة أحدث Azure.
ثبت الملحق Microsoft.Azure.WebJobs.Extensions.EventGrid حال مطالبتك بذلك. إذا كان عليك تثبيته، فسوف يتعين عليك اختيار مشغل شبكة أحداث Azure مجدداً لإنشاء الدالة.
يظهر جزء دالة جديدة.
في جزء New Function ، قم بتسمية الدالة UpsertOrder، ثم حدد الزر Create .
استبدل محتويات ملف التعليمات البرمجية بهذه التعليمة البرمجية، ثم حدد الزر حفظ :
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
تحلل هذه التعليمة البرمجية المعلومات المتعلقة بحدث التخزين الذي تم تنفيذه، ثم تنشئ رسالة طلب تشمل عنوان url الخاص بالملف الذي شغل الحدث. كجزء من الرسالة، تمرر الدالة قيمة إلى عنصر واجهة المستخدم source_file الذي أنشأته سابقاً. يرسل رمز الدالة الرسالة إلى وظيفة Databricks ويستخدم الرمز المميز الذي حصلت عليه سابقا كمصادقة.
إنشاء اشتراك "شبكة الأحداث"
في هذا القسم، ستنشئ اشتراك "شبكة الأحداث" الذي يستدعي دالة Azure عندما يتم تحميل الملفات إلى حساب التخزين.
حدد Integration، ثم في صفحة Integration ، حدد Event Grid Trigger.
في جزء Edit Trigger ، قم بتسمية الحدث
eventGridEvent
، ثم حدد Create Event subscription.إشعار
يطابق الاسم
eventGridEvent
المعلمة المسماة التي تم تمريرها إلى Azure Function.في علامة التبويب Basics في صفحة Create Event Subscription ، قم بتغيير الإعدادات التالية أو التحقق منها:
الإعداد القيمة الاسم contoso-order-event-subscription نوع الموضوع حساب التخزين مورد المصدر contosoorders اسم موضوع النظام <create any name>
تصفية إلى أنواع الأحداث تم إنشاء Blob وحذف Blob حدد زر إنشاء.
اختبر اشتراك "شبكة الأحداث"
أنشئ ملفاً باسم
customer-order.csv
، وألصق المعلومات الآتية في هذا الملف، ثم احفظه في الكمبيوتر المحلي لديك.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
في "مستكشف التخزين"، حمل هذا الملف إلى مجلد الإدخال في حساب التخزين الخاص بك.
يؤدي تحميل ملف إلى تنفيذ حدث Microsoft.Storage.BlobCreated. تُخطر "شبكة الأحداث" جميع المشتركين بهذا الحدث. في حالتنا، دالة Azure تُمثل المشترك الوحيد. تحلل دالة Azure معلمات الحدث لتحديد الحدث الذي وقع. ثم تُمرر عنوان URL الخاص بالملف إلى وظيفة Databricks. تقرأ وظيفة Databricks الملف وتضيف صفاً إلى جدول Databricks Delta الموجود في حساب التخزين الخاص بك.
للتحقق مما إذا كانت المهمة قد نجحت، اعرض عمليات التشغيل لمهمتك. سترى حالة إكمال. لمزيد من المعلومات حول كيفية عرض عمليات التشغيل لوظيفة، راجع عرض عمليات التشغيل لوظيفة
في خلية مصنّف جديدة، شغل هذا الاستعلام في الخلية لعرض جدول delta المحدث.
%sql select * from customer_data
يعرض الجدول المرتجع أحدث سجل.
لتحديث هذا السجل، أنشئ ملفاً باسم
customer-order-update.csv
، وألصق المعلومات الآتية في هذا الملف، ثم احفظه في الكمبيوتر المحلي لديك.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
يُعد ملف csv هذا مطابقاً تقريباً للملف السابق باستثناء كمية الأوامر التي تم تغييرها من
228
إلى22
.في "مستكشف التخزين"، حمل هذا الملف إلى مجلد الإدخال في حساب التخزين الخاص بك.
شغل
select
الاستعلام مرة أخرى لعرض جدول delta المحدث.%sql select * from customer_data
يعرض الجدول المرتجع السجل المحدث.
تنظيف الموارد
عند انعدام الحاجة إلى مجموعة الموارد، احذفها واحذف جميع الموارد ذات الصلة. للقيام بذلك، حدد مجموعة الموارد لحساب التخزين ثم حدد حذف.