Oktatóanyag: Csatlakozás az Azure Cosmos DB for NoSQL-hez a Spark használatával
A KÖVETKEZŐRE VONATKOZIK: NoSQL
Ebben az oktatóanyagban az Azure Cosmos DB Spark-összekötő használatával olvashat vagy írhat adatokat egy Azure Cosmos DB for NoSQL-fiókból. Ez az oktatóanyag az Azure Databricks és egy Jupyter notebook használatával mutatja be, hogyan integrálható a NoSQL-hez készült API-val a Sparkból. Ez az oktatóanyag a Pythonra és a Scalára összpontosít, bár a Spark által támogatott nyelveket vagy felületeket használhatja.
Ebben az oktatóanyagban az alábbiakkal fog megismerkedni:
- Csatlakozzon egy API for NoSQL-fiókhoz a Spark és egy Jupyter notebook használatával.
- Adatbázis- és tárolóerőforrások létrehozása.
- Adatok betöltése a tárolóba.
- Adatok lekérdezése a tárolóban.
- Gyakori műveletek végrehajtása a tároló elemein.
Előfeltételek
- Egy meglévő Azure Cosmos DB for NoSQL-fiók.
- Ha már rendelkezik Azure-előfizetéssel, hozzon létre egy új fiókot.
- Nincs Azure-előfizetés? Ingyenesen kipróbálhatja az Azure Cosmos DB-t hitelkártya nélkül.
- Egy meglévő Azure Databricks-munkaterület.
Csatlakozás a Spark és a Jupyter használatával
A meglévő Azure Databricks-munkaterület használatával hozzon létre egy számítási fürtöt, amely készen áll az Apache Spark 3.4.x használatára az Azure Cosmos DB for NoSQL-fiókhoz való csatlakozáshoz.
Nyissa meg az Azure Databricks-munkaterületet.
A munkaterület felületén hozzon létre egy új fürtöt. Konfigurálja a fürtöt az alábbi beállításokkal, legalább:
Verzió Érték Futtatókörnyezet verziója 13.3 LTS (Scala 2.12, Spark 3.4.1) A munkaterületi felületen maven-csomagokat kereshet a Maven Centralból a csoportazonosítóval
com.azure.cosmos.spark
. Telepítse a csomagot kifejezetten a Spark 3.4-hez a fürthöz előtaggal ellátott Artifact ID azonosítóvalazure-cosmos-spark_3-4
.Végül hozzon létre egy új jegyzetfüzetet.
Tipp.
Alapértelmezés szerint a jegyzetfüzet a nemrég létrehozott fürthöz van csatolva.
A jegyzetfüzeten belül állítsa be az online tranzakciófeldolgozási (OLTP) konfigurációs beállításokat a NoSQL-fiókvégponthoz, az adatbázis nevéhez és a tároló nevéhez.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Adatbázis és tároló létrehozása
A Catalog API használatával kezelheti a fiókerőforrásokat, például az adatbázisokat és a tárolókat. Ezután az OLTP használatával kezelheti a tárolóerőforrásokon belüli adatokat.
Konfigurálja a Catalog API-t a NoSQL-erőforrások API-jának kezelésére a Spark használatával.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Hozzon létre egy új, a következővel
CREATE DATABASE IF NOT EXISTS
elnevezettcosmicworks
adatbázist: .# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Hozzon létre egy új, a következővel
CREATE TABLE IF NOT EXISTS
elnevezettproducts
tárolót: . Győződjön meg arról, hogy a partíciókulcs elérési útját/category
úgy állítja be és engedélyezi az automatikus skálázási átviteli sebességet, hogy a kérelemegységek maximális átviteli sebessége1000
másodpercenként legyen.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Hozzon létre egy másik, hierarchikus partíciókulcs-konfigurációval elnevezett
employees
tárolót. Használja/organization
a ,/department
és/team
a partíciókulcs elérési útjait. Kövesse az adott sorrendet. Emellett állítsa az átviteli sebességet manuális mennyiségű kérelemegységre400
.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Futtassa a jegyzetfüzetcellákat annak ellenőrzéséhez, hogy az adatbázis és a tárolók létre lettek-e hozva a NoSQL-fiók api-jába.
Adatok betöltése
Hozzon létre egy mintaadatkészletet. Ezután használja az OLTP-t az adatok noSQL-tárolóhoz készült API-ba való betöltéséhez.
Hozzon létre egy mintaadatkészletet.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
A korábban mentett OLTP-konfigurációval
spark.createDataFrame
mintaadatokat adhat hozzá a céltárolóhoz.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Adatok lekérdezése
Töltse be az OLTP-adatokat egy adatkeretbe, hogy gyakori lekérdezéseket hajtson végre az adatokon. Az adatok szűréséhez és lekérdezéséhez különböző szintaxisokat használhat.
Az OLTP-adatok adatkeret-objektumba való betöltésére használható
spark.read
. Használja ugyanazt a konfigurációt, amelyet az oktatóanyag korábbi részében használt. Azt is beállíthatjaspark.cosmos.read.inferSchema.enabled
, hogytrue
a Spark-összekötő a meglévő elemek mintavételezésével következtethesse a sémát.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Renderelje az adatkeretbe betöltött adatok sémáját a következő használatával
printSchema
: .# Render schema df.printSchema()
// Render schema df.printSchema()
Adatsorok megjelenítése, ahol az
quantity
oszlop kisebb, mint20
. A lekérdezés végrehajtásához használja azwhere
ésshow
a függvényeket.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Az első adatsor megjelenítése, ahol az
clearance
oszlop találhatótrue
. A lekérdezés végrehajtásához használja afilter
függvényt.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Öt adatsor megjelenítése szűrés vagy csonkolás nélkül. A függvény használatával
show
testre szabhatja a megjelenített sorok megjelenését és számát.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Az adatok lekérdezése ezzel a nyers NoSQL-lekérdezési sztringgel:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Gyakori műveletek végrehajtása
Ha a Sparkban a NoSQL-adatok API-val dolgozik, részleges frissítéseket hajthat végre, vagy nyers JSON-ként dolgozhat az adatokkal.
Elem részleges frissítésének végrehajtása:
Másolja ki a meglévő
config
konfigurációs változót, és módosítsa a tulajdonságokat az új példányban. Pontosabban konfigurálja az írási stratégiát a következőreItemPatch
: . Ezután tiltsa le a tömeges támogatást. Állítsa be az oszlopokat és a megfeleltetett műveleteket. Végül állítsa be az alapértelmezett művelettípust a következőreSet
: .# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Hozzon létre változókat a javításművelet részeként megcélozni kívánt elem partíciókulcsához és egyedi azonosítójához.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Hozzon létre egy javításobjektum-készletet a célelem megadásához, és adja meg a módosítani kívánt mezőket.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Hozzon létre egy adatkeretet a javításobjektumok készletével. A javításművelet végrehajtására használható
write
.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Futtasson egy lekérdezést a javításművelet eredményeinek áttekintéséhez. Az elemet mostantól más módosítás nélkül el kell nevezni
Yamba New Surfboard
.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Nyers JSON-adatok használata:
Másolja ki a meglévő
config
konfigurációs változót, és módosítsa a tulajdonságokat az új példányban. Pontosabban módosítsa a céltárolót a következőreemployees
: . Ezután konfigurálja azcontacts
oszlopot/mezőt nyers JSON-adatok használatára.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Hozzon létre egy alkalmazottkészletet, amely betölti a tárolót.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Hozzon létre egy adatkeretet, és használja
write
az alkalmazotti adatok betöltésére.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Renderelje az adatokat az adatkeretből a következő használatával
show
: . Figyelje meg, hogy azcontacts
oszlop nyers JSON a kimenetben.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Kapcsolódó tartalom
- Apache Spark
- Azure Cosmos DB Catalog API
- Konfigurációs paraméter referenciája
- Azure Cosmos DB Spark Connector-minták
- Migrálás a Spark 2.4-ről a Spark 3-ra.*
- Verziókompatibilitás:
- Kibocsátási megjegyzések:
- Letöltési hivatkozások: