الوصول إلى Azure Cosmos DB ل Apache Cassandra من Spark على YARN باستخدام HDInsight
ينطبق على: كاساندرا
تتناول هذه المقالة كيفية الوصول إلى Azure Cosmos DB ل Apache Cassandra من Spark على YARN باستخدام HDInsight-Spark من spark-shell
. HDInsight هو Hortonworks Hadoop PaaS من Microsoft على Azure. ويستخدم تخزين الكائنات لـ HDFS ويأتي في العديد من الصفات، بما في ذلك Spark. بينما تشير هذه المقالة إلى HDInsight-Spark، فإنها تنطبق على جميع توزيعات Hadoop.
المتطلبات الأساسية
قبل البدء، راجع أساسيات الاتصال ب Azure Cosmos DB ل Apache Cassandra.
تحتاج إلى المتطلبات الأساسية التالية:
توفير Azure Cosmos DB ل Apache Cassandra. راجع إنشاء حساب قاعدة بيانات.
توفير نظام مجموعة HDInsight-Spark. راجع إنشاء نظام مجموعة Apache Spark في Azure HDInsight باستخدام قالب ARM.
واجهة برمجة التطبيقات لتكوين Cassandra في Spark2. يتطلب موصل Spark لـ Cassandra تكوين تفاصيل اتصال Cassandra المراد تهيئتها كجزء من سياق Spark. عند تشغيل دفتر ملاحظات Jupyter، تتم تهيئة جلسة Spark والسياق بالفعل. لا تتوقف وتعيد تهيئة سياق Spark ما لم يكتمل مع كل تكوين تم تعيينه كجزء من بدء تشغيل دفتر ملاحظات Jupyter الافتراضي لـ HDInsight. أحد الحلول هو إضافة تفاصيل مثيل Cassandra إلى تكوين خدمة Ambari، وSpark2 مباشرة. هذا النهج هو نشاط لمرة واحدة لكل نظام مجموعة يتطلب إعادة تشغيل خدمة Spark2.
انتقل إلى خدمة Ambari وSpark2 وحدد التكوينات.
انتقل إلى إعدادات spark2 الافتراضية المخصصة وأضف خاصية جديدة بما يلي، وأعد تشغيل خدمة Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
يمكنك استخدام cqlsh
للتحقق من الصحة. لمزيد من المعلومات، راجع الاتصال ب Azure Cosmos DB ل Apache Cassandra من Spark.
الوصول إلى Azure Cosmos DB ل Apache Cassandra من Spark shell
تُستخدم Spark shell للاختبار والاستكشاف.
شغّل
spark-shell
مع التبعيات المخضرمة المطلوبة المتوافقة مع إصدار Spark لنظام المجموعة لديك.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
قم بتنفيذ بعض عمليات DDL وDML
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
قم بتشغيل عمليات CRUD
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
الوصول إلى Azure Cosmos DB ل Apache Cassandra من دفاتر ملاحظات Jupyter
يأتي HDInsight-Spark مزودًا بخدمات أجهزة الكمبيوتر المحمولة Zeppelin وJupyter. كلاهما من بيئات دفاتر الملاحظات المستندة إلى الويب والتي تدعم Scala وPython. تعتبر دفاتر الملاحظات رائعة للتحليلات الاستكشافية التفاعلية والتعاون، ولكنها ليست مخصصة للعمليات التشغيلية أو الإنتاجية.
يمكن تحميل دفاتر ملاحظات Jupyter التالية إلى مجموعة HDInsight Spark وتوفير عينات جاهزة للعمل مع Azure Cosmos DB ل Apache Cassandra. تأكد من مراجعة دفتر الملاحظات 1.0-ReadMe.ipynb
الأول لمراجعة تكوين خدمة Spark للاتصال ب Azure Cosmos DB ل Apache Cassandra.
قم بتنزيل دفاتر الملاحظات ضمن azure-cosmos-db-cassandra-api-spark-notebooks-jupyter على جهازك.
كيفية التحميل
عندما تقوم بتشغيل Jupyter، انتقل إلى Scala. قم بإنشاء دليل ثم قم بتحميل دفاتر الملاحظات إلى الدليل. يوجد زر التحميل في أعلى الجانب الأيسر.
كيفية التشغيل
انتقل من خلال دفاتر الملاحظات، وكل خلية دفتر ملاحظات بالتسلسل. حدد زر التشغيل أعلى كل دفتر ملاحظات لتشغيل جميع الخلايا، أو Shift+Enter لكل خلية.
الوصول باستخدام Azure Cosmos DB ل Apache Cassandra من برنامج Spark Scala
بالنسبة للعمليات الآلية في الإنتاج؛ يتم إرسال برامج Spark إلى نظام المجموعة باستخدام spark-submit.