Structured Streaming patterns on Azure Databricks
This contains notebooks and code samples for common patterns for working with Structured Streaming on Azure Databricks.
If you are brand new to Structured Streaming, see Run your first Structured Streaming workload.
Apache Cassandra is a distributed, low-latency, scalable, highly-available OLTP database.
Structured Streaming works with Cassandra through the Spark Cassandra Connector. This connector supports both RDD and DataFrame APIs, and it has native support for writing streaming data. Important You must use the corresponding version of the spark-cassandra-connector-assembly.
The following example connects to one or more hosts in a Cassandra database cluster. It also specifies connection configurations such as the checkpoint location and the specific keyspace and table names:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
streamingDF.writeStream.foreachBatch()
allows you to reuse existing batch data writers to write the
output of a streaming query to Azure Synapse Analytics. See the foreachBatch documentation for details.
To run this example, you need the Azure Synapse Analytics connector. For details on the Azure Synapse Analytics connector, see Query data in Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
These two notebooks show how to use stream-stream joins in Python and Scala.