Azure Databricks'te yapılandırılmış Akış desenleri
Bu, Azure Databricks'te Yapılandırılmış Akış ile çalışmaya yönelik yaygın desenlere yönelik not defterlerini ve kod örneklerini içerir.
Yapılandırılmış Akış'ı kullanmaya başlama
Yapılandırılmış Akış'ta yeniyseniz bkz . İlk Yapılandırılmış Akış iş yükünüzü çalıştırma.
Python'da Yapılandırılmış Akış için Cassandra'ya havuz olarak yazma
Apache Cassandra dağıtılmış, düşük gecikme süreli, ölçeklenebilir, yüksek oranda kullanılabilir bir OLTP veritabanıdır.
Yapılandırılmış Akış, Spark Cassandra Bağlayıcısı aracılığıyla Cassandra ile birlikte çalışır. Bu bağlayıcı hem RDD hem de DataFrame API'lerini destekler ve akış verileri yazmak için yerel desteğe sahiptir. Önemli Spark-cassandra-connector-assembly'in ilgili sürümünü kullanmanız gerekir.
Aşağıdaki örnek, Cassandra veritabanı kümesindeki bir veya daha fazla ana bilgisayara bağlanır. Ayrıca denetim noktası konumu, belirli anahtar alanı ve tablo adları gibi bağlantı yapılandırmalarını da belirtir:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Python'da kullanarak foreachBatch()
Azure Synapse Analytics'e yazma
streamingDF.writeStream.foreachBatch()
, bir akış sorgusunun çıkışını Azure Synapse Analytics'e yazmak için mevcut toplu veri yazıcılarını yeniden kullanmanıza olanak tanır.
Ayrıntılar için foreachBatch belgelerine bakın.
Bu örneği çalıştırmak için Azure Synapse Analytics bağlayıcısına ihtiyacınız vardır. Azure Synapse Analytics bağlayıcısı hakkında ayrıntılı bilgi için bkz . Azure Synapse Analytics'te verileri sorgulama.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Akış birleştirmeleri
Bu iki not defteri, Python ve Scala'da akış akışı birleştirmelerinin nasıl kullanılacağını gösterir.
Stream-Stream, Python not defterine katılıyor
not defteri alma
Stream-Stream Scala not defterine katılıyor
not defteri alma