İlk Yapılandırılmış Akış iş yükünüzü çalıştırma
Bu makalede, Azure Databricks'te ilk Yapılandırılmış Akış sorgularınızı çalıştırmak için gereken temel kavramların kod örnekleri ve açıklaması sağlanır. Neredeyse gerçek zamanlı ve artımlı işleme iş yükleri için Yapılandırılmış Akış'ı kullanabilirsiniz.
Yapılandırılmış Akış, DLT'de akış tablolarını destekleyen çeşitli teknolojilerden biridir. Databricks tüm yeni ETL, alım ve Yapılandırılmış Akış iş yükleri için DLT kullanılmasını önerir. Bkz. DLT nedir?.
Not
DLT, akış tablolarını bildirmek için biraz değiştirilmiş bir söz dizimi sağlarken, akış okumalarını ve dönüşümlerini yapılandırmaya yönelik genel söz dizimi Azure Databricks'te tüm akış kullanım örnekleri için geçerlidir. DLT ayrıca durum bilgilerini, meta verileri ve çok sayıda yapılandırmayı yöneterek akışı basitleştirir.
Nesne depolamadan akış verilerini okumak için Otomatik Yükleyici'yi kullanma
Aşağıdaki örnekte JSON verilerinin Otomatik Yükleyici ile yüklenmesi gösterilmektedir. Bu, biçim ve seçenekleri belirtmek için kullanılır cloudFiles
.
schemaLocation
seçeneği şema çıkarımı ve evrimini etkinleştirir. Aşağıdaki kodu bir Databricks not defteri hücresine yapıştırın ve raw_df
adlı bir DataFrame akışı oluşturmak için hücreyi çalıştırın.
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Azure Databricks'te yapılan diğer okuma işlemleri gibi, bir akış okuma yapılandırması da veri yüklemez. Akış başlamadan önce verilerde bir eylem tetiklemeniz gerekir.
Not
Akışkan bir DataFrame üzerinde display()
çağrısı yapmak, bir akış işi başlatır. Çoğu Yapılandırılmış Akış kullanım örneğinde, akışı tetikleyen eylem havuza veri yazmak olmalıdır. Bakınız Yapılandırılmış Akış için Üretimle İlgili Dikkat Edilmesi Gerekenler.
Akış dönüştürmesi yapmak
Yapılandırılmış Akış, Azure Databricks ve Spark SQL'de kullanılabilen çoğu dönüştürmeyi destekler. Hatta MLflow modellerini UDF olarak yükleyebilir ve akış tahminlerini bir dönüşüm olarak gerçekleştirebilirsiniz.
Aşağıdaki kod örneği, Spark SQL işlevlerini kullanarak alınan JSON verilerini ek bilgilerle zenginleştirmek için basit bir dönüştürme işlemini tamamlar:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
Sonuçta transformed_df
, veri kaynağına ulaşan her kaydı yüklemek ve dönüştürmek için sorgu yönergeleri yer alır.
Not
Yapılandırılmış Akış, veri kaynaklarını ilişkisiz veya sonsuz veri kümeleri olarak kabul eder. Bu nedenle, bazı dönüştürmeler sınırsız sayıda öğenin sıralanması gerekeceğinden Yapılandırılmış Akış iş yüklerinde desteklenmez.
Çoğu toplama ve birçok birleştirme, filigranlar, pencereler ve çıkış moduyla durum bilgilerinin yönetilmesini gerektirir. Bkz Veri işleme eşiklerini denetlemek için filigranları uygulama.
Delta Lake'e kademeli toplu yazma yapın
Aşağıdaki örnek, belirtilen dosya yolunu ve denetim noktasını kullanarak Delta Lake'e yazar.
Önemli
Yapılandırdığınız her akış yazıcısı için her zaman benzersiz bir denetim noktası konumu belirttiğinizden emin olun. Denetim noktası, akış sorgunuzla ilişkili işlenen tüm kayıtları ve durum bilgilerini izleyerek akışınız için benzersiz kimlik sağlar.
availableNow
Tetikleyicinin ayarı Yapılandırılmış Akış'a kaynak veri kümesinden daha önce işlenmemiş tüm kayıtları işlemesini ve ardından kapatmasını bildirir; böylece bir akışı çalışır durumda bırakma konusunda endişelenmeden aşağıdaki kodu güvenle yürütebilirsiniz:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
Bu örnekte, veri kaynağımıza yeni kayıt ulaşmadığından, bu kodun yinelenmiş yürütülmesi yeni kayıtları almaz.
Uyarı
Yapılandırılmış Akış yürütmesi, otomatik sonlandırmanın işlem kaynaklarını kapatmasını engelleyebilir. Beklenmeyen maliyetleri önlemek için akış sorgularını sonlandırdığından emin olun.
Delta Lake'ten veri okuma, dönüştürme ve Delta Lake'e yazma
Delta Lake hem kaynak hem de havuz olarak Yapılandırılmış Akış ile çalışmak için kapsamlı desteğe sahiptir. Bkz. Delta tablo akışı okuma ve yazma işlemleri.
Aşağıdaki örnekte, delta tablosundaki tüm yeni kayıtları artımlı olarak yüklemek, bunları başka bir Delta tablosunun anlık görüntüsüyle birleştirmek ve bir Delta tablosuna yazmak için örnek söz dizimi gösterilmektedir:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Kaynak tabloları okumak ve hedef tablolara ve belirtilen denetim noktası konumuna yazmak için yapılandırılmış uygun izinlere sahip olmanız gerekir. Veri kaynaklarınız ve havuzlarınız için ilgili değerleri kullanarak açılı ayraçlarla (<>
) belirtilen tüm parametreleri doldurun.
Not
DLT, Delta Lake işlem hatları oluşturmak için tam bildirim temelli bir söz dizimi sağlar ve tetikleyiciler ve denetim noktaları gibi özellikleri otomatik olarak yönetir. Bkz. DLT nedir?.
Kafka'dan veri okuma, dönüştürme ve Kafka'ya yazma
Apache Kafka ve diğer mesajlaşma veri yolları, büyük veri kümeleri için en düşük gecikme sürelerinden bazılarını sağlar. Azure Databricks'i kullanarak Kafka'dan alınan verilere dönüştürmeler uygulayabilir ve sonra verileri Kafka'ya geri yazabilirsiniz.
Not
Bulut nesne depolama alanına veri yazmak ek gecikme yükü ekler. Delta Lake'te bir mesajlaşma veri yolu verilerini depolamak istiyorsanız ancak akış iş yükleri için mümkün olan en düşük gecikme süresini istiyorsanız Databricks, verileri lakehouse'a almak ve aşağı akış mesajlaşma veri yolu havuzları için neredeyse gerçek zamanlı dönüştürmeler uygulamak için ayrı akış işleri yapılandırmanızı önerir.
Aşağıdaki kod örneğinde, Verileri Delta tablosundaki verilerle birleştirip Kafka'ya geri yazarak Kafka'dan verileri zenginleştirmeye yönelik basit bir desen gösterilmektedir:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Kafka hizmetinize erişim için yapılandırılmış uygun izinlere sahip olmanız gerekir. Veri kaynaklarınız ve havuzlarınız için ilgili değerleri kullanarak açılı ayraçlarla (<>
) belirtilen tüm parametreleri doldurun. Bkz. Apache Kafka ve Azure Databricks ile akış işleme.