Megosztás a következőn keresztül:


Azure Cosmos DB elérése Apache Cassandra-hoz a Sparkból a YARN-on a HDInsighttal

A KÖVETKEZŐKRE VONATKOZIK: Cassandra

Ez a cikk bemutatja, hogyan érheti el az Apache Cassandra-hoz készült Azure Cosmos DB-t a Spark on YARN-ból a HDInsight-Spark segítségével spark-shell. A HDInsight a Microsoft Hortonworks Hadoop PaaS-ja az Azure-ban. Objektumtárolót használ a HDFS-hez, és számos ízben érhető el, beleértve a Sparkot is. Bár ez a cikk a HDInsight-Sparkra vonatkozik, az összes Hadoop-disztribúcióra vonatkozik.

Előfeltételek

Mielőtt hozzákezdene, tekintse át az Apache Cassandra-hoz készült Azure Cosmos DB-hez való csatlakozás alapjait.

A következő előfeltételekre van szüksége:

  • Azure Cosmos DB kiépítése az Apache Cassandra számára. Lásd: Adatbázisfiók létrehozása.

  • HDInsight-Spark-fürt kiépítése. Lásd: Apache Spark-fürt létrehozása az Azure HDInsightban ARM-sablon használatával.

  • API a Cassandra konfigurációhoz a Spark2-ben. A Cassandra Spark-összekötőjéhez a Cassandra kapcsolati adatait inicializálni kell a Spark-környezet részeként. Jupyter-jegyzetfüzet indításakor a Spark-munkamenet és a környezet már inicializálva lesz. Ne állítsa le és ne inicializálja újra a Spark-környezetet, kivéve, ha minden konfiguráció be van állítva a HDInsight alapértelmezett Jupyter-notebook indítási folyamatának részeként. Az egyik megkerülő megoldás a Cassandra-példány részleteinek hozzáadása közvetlenül az Ambari, Spark2 szolgáltatáskonfigurációhoz. Ez a megközelítés fürtönként egyszeri tevékenység, amely Spark2 szolgáltatás újraindítását igényli.

    1. Nyissa meg az Ambari, Spark2 szolgáltatást, és válassza ki a konfigurációkat.

    2. Lépjen az egyéni spark2 alapértelmezett értékre, és adjon hozzá egy új tulajdonságot a következővel, majd indítsa újra a Spark2 szolgáltatást:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Az ellenőrzéshez használható cqlsh . További információ: Csatlakozás az Azure Cosmos DB-hez apache Cassandra-hoz a Sparkból.

Az Apache Cassandra azure Cosmos DB-hez való elérése a Spark Shellből

A Spark Shell teszteléshez és feltáráshoz használható.

  • Indítás spark-shell a fürt Spark-verziójával kompatibilis maven-függőségekkel.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • DDL- és DML-műveletek végrehajtása

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • CRUD-műveletek futtatása

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Azure Cosmos DB elérése Apache Cassandra-hoz Jupyter-jegyzetfüzetekből

A HDInsight-Spark zeppelin és Jupyter notebook-szolgáltatásokkal rendelkezik. Mindkettő webalapú jegyzetfüzet-környezet, amely támogatja a Scalát és a Pythont. A jegyzetfüzetek nagyszerűen használhatók interaktív feltáró elemzésekhez és együttműködéshez, de nem üzemeltetési vagy éles folyamatokhoz.

A következő Jupyter-jegyzetfüzetek feltölthetők a HDInsight Spark-fürtbe, és kész mintákat biztosítanak az Apache Cassandra-hoz készült Azure Cosmos DB használatához. Mindenképpen tekintse át az első jegyzetfüzetet 1.0-ReadMe.ipynb , amely áttekinti a Spark szolgáltatás konfigurációját az Apache Cassandra-hoz készült Azure Cosmos DB-hez való csatlakozáshoz.

Töltse le a jegyzetfüzeteket az azure-cosmos-db-cassandra-api-spark-notebooks-jupyter alatt a gépére.

Feltöltés

A Jupyter indításakor lépjen a Scala webhelyre. Hozzon létre egy könyvtárat, majd töltse fel a jegyzetfüzeteket a könyvtárba. A Feltöltés gomb a jobb felső sarokban található.

Futtatás

Haladjon végig a jegyzetfüzeteken és az egyes jegyzetfüzetcellákon egymás után. Az egyes jegyzetfüzetek tetején található Futtatás gombra kattintva futtassa az összes cellát, vagy az egyes cellákhoz tartozó Shift+Enter billentyűt.

Hozzáférés az Apache Cassandra-hoz készült Azure Cosmos DB-vel a Spark Scala-programból

Az éles környezetben futó automatizált folyamatok esetében a Spark-programok a Spark-submit használatával kerülnek a fürtbe.

Következő lépések