Akses Azure Cosmos DB untuk Apache Cassandra dari Spark di YARN dengan HDInsight
BERLAKU UNTUK: Cassandra
Artikel ini membahas cara mengakses Azure Cosmos DB untuk Apache Cassandra dari Spark di YARN dengan HDInsight-Spark dari spark-shell
. HDInsight adalah Hortonworks Hadoop PaaS Microsoft di Azure. Ia menggunakan penyimpanan objek untuk HDFS dan hadir dalam beberapa selera, termasuk Spark. Meskipun mengacu pada HDInsight-Spark, artikel ini berlaku untuk semua distribusi Hadoop.
Prasyarat
Sebelum memulai, tinjau dasar-dasar menyambungkan ke Azure Cosmos DB untuk Apache Cassandra.
Anda memerlukan prasyarat berikut:
Provisikan Azure Cosmos DB untuk Apache Cassandra. Lihat Buat akun database.
Memprovisikan kluster HDInsight-Spark. Lihat Mulai Cepat: Membuat kluster Apache Spark di Azure HDInsight menggunakan templat ARM.
API untuk konfigurasi Cassandra di Spark2. Konektor Spark untuk Cassandra memerlukan detail koneksi Cassandra untuk diinisialisasi sebagai bagian dari konteks Spark. Saat Anda meluncurkan notebook Jupyter, sesi dan konteks spark sudah diinisialisasi. Jangan menghentikan dan menginisialisasi ulang konteks Spark kecuali jika sudah selesai dengan setiap konfigurasi yang ditetapkan sebagai bagian dari pengaktifan notebook Jupyter default HDInsight. Salah satu solusinya adalah menambahkan secara langsung detail instans Cassandra ke Ambari, konfigurasi layanan Spark2. Pendekatan ini adalah aktivitas satu kali per kluster yang memerlukan layanan Spark2 dihidupkan ulang.
Buka layanan Ambari, Spark2, dan pilih konfigurasi.
Buka spark2-default kustom dan tambahkan properti baru dengan yang berikut ini, lalu mulai ulang layanan Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Anda dapat menggunakan cqlsh
untuk validasi. Untuk informasi selengkapnya, lihat Menyambungkan ke Azure Cosmos DB untuk Apache Cassandra dari Spark.
Mengakses Azure Cosmos DB untuk Apache Cassandra dari shell Spark
Spark shell digunakan untuk pengujian dan eksplorasi.
Luncurkan
spark-shell
dengan dependensi maven yang diperlukan yang kompatibel dengan versi Spark kluster Anda.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Menjalankan beberapa operasi DDL dan DML
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Menjalankan operasi CRUD
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Mengakses Azure Cosmos DB untuk Apache Cassandra dari notebook Jupyter
HDInsight-Spark dilengkapi dengan layanan notebook Zeppelin dan Jupyter. Keduanya merupakan lingkungan notebook berbasis web yang mendukung Scala dan Python. Notebook sangat bagus untuk analitik dan kolaborasi eksploratif interaktif, tetapi tidak dimaksudkan untuk proses operasional atau produksi.
Notebook Jupyter berikut dapat diunggah ke kluster HDInsight Spark Anda dan menyediakan sampel siap untuk bekerja dengan Azure Cosmos DB untuk Apache Cassandra. Pastikan untuk meninjau notebook 1.0-ReadMe.ipynb
pertama yang meninjau konfigurasi layanan Spark untuk menyambungkan ke Azure Cosmos DB untuk Apache Cassandra.
Unduh notebook ini di bawah azure-cosmos-db-cassandra-api-spark-notebooks-jupyter ke komputer Anda.
Cara mengunggah
Saat Anda meluncurkan Jupyter, arahkan ke Scala. Buat direktori lalu unggah notebook ke direktori tersebut. Tombol Unggah ada di bagian atas, di sisi kanan.
Cara menjalankan
Buka notebook, dan setiap sel notebook secara berurutan. Pilih tombol Jalankan di bagian atas setiap notebook untuk menjalankan semua sel, atau tekan Shift+Enteruntuk setiap sel.
Akses dengan Azure Cosmos DB untuk Apache Cassandra dari program Spark Scala Anda
Untuk proses otomatis dalam produksi, program Spark diserahkan ke kluster dengan menggunakan spark-submit.