البرنامج التعليمي: الاتصال ب Azure Cosmos DB ل NoSQL باستخدام Spark
ينطبق على: NoSQL
في هذا البرنامج التعليمي، يمكنك استخدام موصل Azure Cosmos DB Spark لقراءة البيانات أو كتابتها من حساب Azure Cosmos DB لحساب NoSQL. يستخدم هذا البرنامج التعليمي Azure Databricks ودفتر ملاحظات Jupyter لتوضيح كيفية التكامل مع واجهة برمجة التطبيقات ل NoSQL من Spark. يركز هذا البرنامج التعليمي على Python وSc scala، على الرغم من أنه يمكنك استخدام أي لغة أو واجهة يدعمها Spark.
في هذا البرنامج التعليمي، تتعلم كيفية:
- الاتصال بواجهة برمجة تطبيقات لحساب NoSQL باستخدام Spark ودفتر ملاحظات Jupyter.
- إنشاء موارد حاويات وقواعد بيانات.
- استيعاب البيانات إلى الحاوية.
- الاستعلام عن البيانات في الحاوية.
- تنفيذ العمليات الشائعة على العناصر الموجودة في الحاوية.
المتطلبات الأساسية
- حساب Azure Cosmos DB ل NoSQL موجود.
- إذا كان لديك اشتراك Azure موجود، فبادر بإنشاء حساب جديد.
- لا يوجد اشتراك في Azure؟ يمكنك تجربة Azure Cosmos DB مجانا دون الحاجة إلى بطاقة ائتمان.
- مساحة عمل Azure Databricks موجودة.
الاتصال باستخدام Spark وJupyter
استخدم مساحة عمل Azure Databricks الحالية لإنشاء مجموعة حوسبة جاهزة لاستخدام Apache Spark 3.4.x للاتصال بحساب Azure Cosmos DB الخاص بك ل NoSQL.
افتح مساحة عمل Azure Databricks.
في واجهة مساحة العمل، قم بإنشاء نظام مجموعة جديد. قم بتكوين نظام المجموعة باستخدام هذه الإعدادات، كحد أدنى:
إصدار القيمة إصدار وقت التشغيل 13.3 LTS (Scala 2.12، Spark 3.4.1) استخدم واجهة مساحة العمل للبحث عن حزم Maven من Maven Central مع معرف مجموعة .
com.azure.cosmos.spark
قم بتثبيت الحزمة خصيصا ل Spark 3.4 مع معرف Artifact مسبوق بالمجموعةazure-cosmos-spark_3-4
.وأخيرا، قم بإنشاء دفتر ملاحظات جديد.
تلميح
بشكل افتراضي، يتم إرفاق دفتر الملاحظات بالمجموعة التي تم إنشاؤها مؤخرا.
ضمن دفتر الملاحظات، قم بتعيين إعدادات تكوين معالجة المعاملات عبر الإنترنت (OLTP) لنقطة نهاية حساب NoSQL واسم قاعدة البيانات واسم الحاوية.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
إنشاء قاعدة بيانات وحاوية
استخدم واجهة برمجة تطبيقات الكتالوج لإدارة موارد الحساب مثل قواعد البيانات والحاويات. بعد ذلك، يمكنك استخدام OLTP لإدارة البيانات داخل موارد الحاوية.
تكوين واجهة برمجة تطبيقات الكتالوج لإدارة واجهة برمجة التطبيقات لموارد NoSQL باستخدام Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
إنشاء قاعدة بيانات جديدة باسم
cosmicworks
باستخدامCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
إنشاء حاوية جديدة باسم
products
باستخدامCREATE TABLE IF NOT EXISTS
. تأكد من تعيين مسار مفتاح القسم إلى/category
وتمكين معدل نقل التحجيم التلقائي مع الحد الأقصى لمعدل نقل1000
وحدات الطلب (RUs) في الثانية.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
إنشاء حاوية أخرى باسم
employees
باستخدام تكوين مفتاح قسم هرمي. استخدم/organization
و/department
و/team
كمسارات مفتاح القسم. اتبع هذا الترتيب المحدد. أيضا، قم بتعيين معدل النقل إلى كمية يدوية من400
وحدات الطلب.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
قم بتشغيل خلايا دفتر الملاحظات للتحقق من إنشاء قاعدة البيانات والحاويات داخل واجهة برمجة التطبيقات لحساب NoSQL.
استيعاب البيانات
إنشاء عينة مجموعة بيانات. ثم استخدم OLTP لاستيعاب تلك البيانات إلى واجهة برمجة التطبيقات لحاوية NoSQL.
إنشاء عينة مجموعة بيانات.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
استخدم
spark.createDataFrame
وتكوين OLTP المحفوظ مسبقا لإضافة بيانات نموذجية إلى الحاوية الهدف.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
بيانات الاستعلام
تحميل بيانات OLTP في إطار بيانات لإجراء استعلامات شائعة على البيانات. يمكنك استخدام بناء الجمل المختلفة لتصفية البيانات أو الاستعلام عن البيانات.
استخدم
spark.read
لتحميل بيانات OLTP في كائن إطار بيانات. استخدم نفس التكوين الذي استخدمته سابقا في هذا البرنامج التعليمي. أيضا، قم بتعيينspark.cosmos.read.inferSchema.enabled
إلىtrue
للسماح لموصل Spark باستنتاج المخطط عن طريق أخذ عينات من العناصر الموجودة.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
عرض مخطط البيانات المحملة في إطار البيانات باستخدام
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
عرض صفوف البيانات حيث
quantity
يكون العمود أقل من20
.where
استخدم الدالتين وshow
لتنفيذ هذا الاستعلام.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
عرض صف البيانات الأول حيث
clearance
يكونtrue
العمود . استخدم الدالةfilter
لتنفيذ هذا الاستعلام.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
عرض خمسة صفوف من البيانات بدون عامل تصفية أو اقتطاع. استخدم الدالة
show
لتخصيص مظهر وعدد الصفوف التي يتم عرضها.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
استعلم عن بياناتك باستخدام سلسلة استعلام NoSQL الأولية هذه:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
تنفيذ العمليات الشائعة
عند العمل مع واجهة برمجة التطبيقات لبيانات NoSQL في Spark، يمكنك إجراء تحديثات جزئية أو العمل مع البيانات ك JSON أولي.
لإجراء تحديث جزئي لعنصر:
انسخ متغير التكوين الموجود
config
وعدل الخصائص في النسخة الجديدة. على وجه التحديد، قم بتكوين استراتيجية الكتابة إلىItemPatch
. ثم قم بتعطيل الدعم المجمع. تعيين الأعمدة والعمليات المعينة. وأخيرا، قم بتعيين نوع العملية الافتراضية إلىSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
إنشاء متغيرات لمفتاح قسم العنصر والمعرف الفريد الذي تنوي استهدافه كجزء من عملية التصحيح هذه.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
إنشاء مجموعة من كائنات التصحيح لتحديد العنصر الهدف وتحديد الحقول التي يجب تعديلها.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
إنشاء إطار بيانات باستخدام مجموعة من كائنات التصحيح. استخدم
write
لتنفيذ عملية التصحيح.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
قم بتشغيل استعلام لمراجعة نتائج عملية التصحيح. يجب الآن تسمية
Yamba New Surfboard
العنصر دون أي تغييرات أخرى.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
للعمل مع بيانات JSON الأولية:
انسخ متغير التكوين الموجود
config
وعدل الخصائص في النسخة الجديدة. على وجه التحديد، قم بتغيير الحاوية الهدف إلىemployees
. ثم قم بتكوينcontacts
العمود/الحقل لاستخدام بيانات JSON الأولية.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
إنشاء مجموعة من الموظفين لاستيعابها في الحاوية.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
إنشاء إطار بيانات واستخدامه
write
لاستيعاب بيانات الموظف.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
عرض البيانات من إطار البيانات باستخدام
show
. لاحظ أنcontacts
العمود هو JSON الخام في الإخراج.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
المحتوى ذو الصلة
- Apache Spark
- واجهة برمجة تطبيقات كتالوج Azure Cosmos DB
- مرجع معلمة التكوين
- عينات موصل Azure Cosmos DB Spark
- الترحيل من Spark 2.4 إلى Spark 3.*
- توافق الإصدار:
- ملاحظات الإصدار:
- تنزيل الارتباطات: