Azure Cosmos DB elérése Apache Cassandra-hoz a Sparkból a YARN-on a HDInsighttal
A KÖVETKEZŐKRE VONATKOZIK: Cassandra
Ez a cikk bemutatja, hogyan érheti el az Apache Cassandra-hoz készült Azure Cosmos DB-t a Spark on YARN-ból a HDInsight-Spark segítségével spark-shell
. A HDInsight a Microsoft Hortonworks Hadoop PaaS-ja az Azure-ban. Objektumtárolót használ a HDFS-hez, és számos ízben érhető el, beleértve a Sparkot is. Bár ez a cikk a HDInsight-Sparkra vonatkozik, az összes Hadoop-disztribúcióra vonatkozik.
Előfeltételek
Mielőtt hozzákezdene, tekintse át az Apache Cassandra-hoz készült Azure Cosmos DB-hez való csatlakozás alapjait.
A következő előfeltételekre van szüksége:
Azure Cosmos DB kiépítése az Apache Cassandra számára. Lásd: Adatbázisfiók létrehozása.
HDInsight-Spark-fürt kiépítése. Lásd: Apache Spark-fürt létrehozása az Azure HDInsightban ARM-sablon használatával.
API a Cassandra konfigurációhoz a Spark2-ben. A Cassandra Spark-összekötőjéhez a Cassandra kapcsolati adatait inicializálni kell a Spark-környezet részeként. Jupyter-jegyzetfüzet indításakor a Spark-munkamenet és a környezet már inicializálva lesz. Ne állítsa le és ne inicializálja újra a Spark-környezetet, kivéve, ha minden konfiguráció be van állítva a HDInsight alapértelmezett Jupyter-notebook indítási folyamatának részeként. Az egyik megkerülő megoldás a Cassandra-példány részleteinek hozzáadása közvetlenül az Ambari, Spark2 szolgáltatáskonfigurációhoz. Ez a megközelítés fürtönként egyszeri tevékenység, amely Spark2 szolgáltatás újraindítását igényli.
Nyissa meg az Ambari, Spark2 szolgáltatást, és válassza ki a konfigurációkat.
Lépjen az egyéni spark2 alapértelmezett értékre, és adjon hozzá egy új tulajdonságot a következővel, majd indítsa újra a Spark2 szolgáltatást:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Az ellenőrzéshez használható cqlsh
. További információ: Csatlakozás az Azure Cosmos DB-hez apache Cassandra-hoz a Sparkból.
Az Apache Cassandra azure Cosmos DB-hez való elérése a Spark Shellből
A Spark Shell teszteléshez és feltáráshoz használható.
Indítás
spark-shell
a fürt Spark-verziójával kompatibilis maven-függőségekkel.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
DDL- és DML-műveletek végrehajtása
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
CRUD-műveletek futtatása
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Azure Cosmos DB elérése Apache Cassandra-hoz Jupyter-jegyzetfüzetekből
A HDInsight-Spark zeppelin és Jupyter notebook-szolgáltatásokkal rendelkezik. Mindkettő webalapú jegyzetfüzet-környezet, amely támogatja a Scalát és a Pythont. A jegyzetfüzetek nagyszerűen használhatók interaktív feltáró elemzésekhez és együttműködéshez, de nem üzemeltetési vagy éles folyamatokhoz.
A következő Jupyter-jegyzetfüzetek feltölthetők a HDInsight Spark-fürtbe, és kész mintákat biztosítanak az Apache Cassandra-hoz készült Azure Cosmos DB használatához. Mindenképpen tekintse át az első jegyzetfüzetet 1.0-ReadMe.ipynb
, amely áttekinti a Spark szolgáltatás konfigurációját az Apache Cassandra-hoz készült Azure Cosmos DB-hez való csatlakozáshoz.
Töltse le a jegyzetfüzeteket az azure-cosmos-db-cassandra-api-spark-notebooks-jupyter alatt a gépére.
Feltöltés
A Jupyter indításakor lépjen a Scala webhelyre. Hozzon létre egy könyvtárat, majd töltse fel a jegyzetfüzeteket a könyvtárba. A Feltöltés gomb a jobb felső sarokban található.
Futtatás
Haladjon végig a jegyzetfüzeteken és az egyes jegyzetfüzetcellákon egymás után. Az egyes jegyzetfüzetek tetején található Futtatás gombra kattintva futtassa az összes cellát, vagy az egyes cellákhoz tartozó Shift+Enter billentyűt.
Hozzáférés az Apache Cassandra-hoz készült Azure Cosmos DB-vel a Spark Scala-programból
Az éles környezetben futó automatizált folyamatok esetében a Spark-programok a Spark-submit használatával kerülnek a fürtbe.