Pola Streaming Terstruktur di Azure Databricks
Ini berisi buku catatan dan sampel kode untuk pola umum untuk bekerja dengan Streaming Terstruktur di Azure Databricks.
Mulai menggunakan Streaming Terstruktur
Jika Anda baru menggunakan Streaming Terstruktur, lihat Menjalankan beban kerja Streaming Terstruktur pertama Anda.
Menulis ke Cassandra sebagai sink untuk Streaming Terstruktur di Python
Apache Cassandra adalah database OLTP terdistribusi, latensi rendah, dapat diskalakan, dan sangat tersedia.
Streaming Terstruktur berfungsi dengan Cassandra melalui Konektor Spark Cassandra. Konektor ini mendukung API RDD dan DataFrame, dan memiliki dukungan asli untuk menulis data streaming. Penting Anda harus menggunakan versi yang sesuai dari spark-cassandra-connector-assembly.
Contoh berikut terhubung ke satu atau beberapa host di kluster database Cassandra. Ini juga menentukan konfigurasi koneksi seperti lokasi pos pemeriksaan dan keyspace serta nama tabel tertentu.
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Menulis ke Azure Synapse Analytics menggunakan foreachBatch()
di Python
streamingDF.writeStream.foreachBatch()
memungkinkan Anda menggunakan kembali penulis data batch yang ada untuk mengalirkan output kueri streaming ke Azure Synapse Analytics. Lihat dokumentasi foreachBatch untuk detailnya.
Untuk menjalankan contoh ini, Anda memerlukan konektor Azure Synapse Analytics. Untuk detail tentang konektor Azure Synapse Analytics, lihat Data kueri di Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Aliran-Aliran gabungan
Kedua buku catatan tersebut menunjukkan cara menggunakan aliran-aliran gabungan di Python dan Scala.
Aliran-Aliran gabungan buku catatan Python notebook
Ambil buku catatan