Aracılığıyla paylaş


Azure Databricks'te yapılandırılmış Akış desenleri

Bu, Azure Databricks'te Yapılandırılmış Akış ile çalışmaya yönelik yaygın desenlere yönelik not defterlerini ve kod örneklerini içerir.

Yapılandırılmış Akış'ı kullanmaya başlama

Yapılandırılmış Akış'ta yeniyseniz bkz . İlk Yapılandırılmış Akış iş yükünüzü çalıştırma.

Python'da Yapılandırılmış Akış için Cassandra'ya havuz olarak yazma

Apache Cassandra dağıtılmış, düşük gecikme süreli, ölçeklenebilir, yüksek oranda kullanılabilir bir OLTP veritabanıdır.

Yapılandırılmış Akış, Spark Cassandra Bağlayıcısı aracılığıyla Cassandra ile birlikte çalışır. Bu bağlayıcı hem RDD hem de DataFrame API'lerini destekler ve akış verileri yazmak için yerel desteğe sahiptir. Önemli Spark-cassandra-connector-assembly'in ilgili sürümünü kullanmanız gerekir.

Aşağıdaki örnek, Cassandra veritabanı kümesindeki bir veya daha fazla ana bilgisayara bağlanır. Ayrıca denetim noktası konumu, belirli anahtar alanı ve tablo adları gibi bağlantı yapılandırmalarını da belirtir:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Python'da kullanarak foreachBatch() Azure Synapse Analytics'e yazma

streamingDF.writeStream.foreachBatch() , bir akış sorgusunun çıkışını Azure Synapse Analytics'e yazmak için mevcut toplu veri yazıcılarını yeniden kullanmanıza olanak tanır. Ayrıntılar için foreachBatch belgelerine bakın.

Bu örneği çalıştırmak için Azure Synapse Analytics bağlayıcısına ihtiyacınız vardır. Azure Synapse Analytics bağlayıcısı hakkında ayrıntılı bilgi için bkz . Azure Synapse Analytics'te verileri sorgulama.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Akış birleştirmeleri

Bu iki not defteri, Python ve Scala'da akış akışı birleştirmelerinin nasıl kullanılacağını gösterir.

Stream-Stream, Python not defterine katılıyor

not defteri alma

Stream-Stream Scala not defterine katılıyor

not defteri alma