مشاركة عبر


الوصول إلى Azure Cosmos DB ل Apache Cassandra من Spark على YARN باستخدام HDInsight

ينطبق على: كاساندرا

تتناول هذه المقالة كيفية الوصول إلى Azure Cosmos DB ل Apache Cassandra من Spark على YARN باستخدام HDInsight-Spark من spark-shell. HDInsight هو Hortonworks Hadoop PaaS من Microsoft على Azure. ويستخدم تخزين الكائنات لـ HDFS ويأتي في العديد من الصفات، بما في ذلك Spark. بينما تشير هذه المقالة إلى HDInsight-Spark، فإنها تنطبق على جميع توزيعات Hadoop.

المتطلبات الأساسية

قبل البدء، راجع أساسيات الاتصال ب Azure Cosmos DB ل Apache Cassandra.

تحتاج إلى المتطلبات الأساسية التالية:

  • توفير Azure Cosmos DB ل Apache Cassandra. راجع إنشاء حساب قاعدة بيانات.

  • توفير نظام مجموعة HDInsight-Spark. راجع إنشاء نظام مجموعة Apache Spark في Azure HDInsight باستخدام قالب ARM.

  • واجهة برمجة التطبيقات لتكوين Cassandra في Spark2. يتطلب موصل Spark لـ Cassandra تكوين تفاصيل اتصال Cassandra المراد تهيئتها كجزء من سياق Spark. عند تشغيل دفتر ملاحظات Jupyter، تتم تهيئة جلسة Spark والسياق بالفعل. لا تتوقف وتعيد تهيئة سياق Spark ما لم يكتمل مع كل تكوين تم تعيينه كجزء من بدء تشغيل دفتر ملاحظات Jupyter الافتراضي لـ HDInsight. أحد الحلول هو إضافة تفاصيل مثيل Cassandra إلى تكوين خدمة Ambari، وSpark2 مباشرة. هذا النهج هو نشاط لمرة واحدة لكل نظام مجموعة يتطلب إعادة تشغيل خدمة Spark2.

    1. انتقل إلى خدمة Ambari وSpark2 وحدد التكوينات.

    2. انتقل إلى إعدادات spark2 الافتراضية المخصصة وأضف خاصية جديدة بما يلي، وأعد تشغيل خدمة Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

يمكنك استخدام cqlsh للتحقق من الصحة. لمزيد من المعلومات، راجع الاتصال ب Azure Cosmos DB ل Apache Cassandra من Spark.

الوصول إلى Azure Cosmos DB ل Apache Cassandra من Spark shell

تُستخدم Spark shell للاختبار والاستكشاف.

  • شغّل spark-shell مع التبعيات المخضرمة المطلوبة المتوافقة مع إصدار Spark لنظام المجموعة لديك.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • قم بتنفيذ بعض عمليات DDL وDML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • قم بتشغيل عمليات CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

الوصول إلى Azure Cosmos DB ل Apache Cassandra من دفاتر ملاحظات Jupyter

يأتي HDInsight-Spark مزودًا بخدمات أجهزة الكمبيوتر المحمولة Zeppelin وJupyter. كلاهما من بيئات دفاتر الملاحظات المستندة إلى الويب والتي تدعم Scala وPython. تعتبر دفاتر الملاحظات رائعة للتحليلات الاستكشافية التفاعلية والتعاون، ولكنها ليست مخصصة للعمليات التشغيلية أو الإنتاجية.

يمكن تحميل دفاتر ملاحظات Jupyter التالية إلى مجموعة HDInsight Spark وتوفير عينات جاهزة للعمل مع Azure Cosmos DB ل Apache Cassandra. تأكد من مراجعة دفتر الملاحظات 1.0-ReadMe.ipynb الأول لمراجعة تكوين خدمة Spark للاتصال ب Azure Cosmos DB ل Apache Cassandra.

قم بتنزيل دفاتر الملاحظات ضمن azure-cosmos-db-cassandra-api-spark-notebooks-jupyter على جهازك.

كيفية التحميل

عندما تقوم بتشغيل Jupyter، انتقل إلى Scala. قم بإنشاء دليل ثم قم بتحميل دفاتر الملاحظات إلى الدليل. يوجد زر التحميل في أعلى الجانب الأيسر.

كيفية التشغيل

انتقل من خلال دفاتر الملاحظات، وكل خلية دفتر ملاحظات بالتسلسل. حدد زر التشغيل أعلى كل دفتر ملاحظات لتشغيل جميع الخلايا، أو Shift+Enter لكل خلية.

الوصول باستخدام Azure Cosmos DB ل Apache Cassandra من برنامج Spark Scala

بالنسبة للعمليات الآلية في الإنتاج؛ يتم إرسال برامج Spark إلى نظام المجموعة باستخدام spark-submit.

الخطوات التالية