Veri işleme eşiklerini denetlemek için filigranları uygulama
Bu makalede, filigran oluşturmanın temel kavramları tanıtılarak, durum bilgisi olan yaygın akış işlemlerinde filigran kullanımına yönelik öneriler sağlanır. Durum bilgisi olan akış işlemlerine filigranlar uygulayarak, bellek sorunlarına neden olabilecek ve uzun süre çalışan akış işlemleri sırasında işleme gecikmelerini artırabilecek durumda tutulan veri miktarının sonsuz olarak genişletilmesinden kaçınmalısınız.
Filigran nedir?
Yapılandırılmış Akış, belirli bir durum varlığı için güncelleştirmeleri işlemeye devam etme eşiğini denetlemek için filigranları kullanır. Durum varlıklarının yaygın örnekleri şunlardır:
- Bir zaman penceresindeki birikimler.
- İki akış arasındaki birleştirme işlemi için benzersiz anahtarlar.
Bir filigran belirttiğinizde, veri akışındaki DataFrame üzerinde bir zaman damgası alanı ve bir filigran eşiği ayarlarsınız. Yeni veriler geldikçe, durum yöneticisi belirtilen alandaki en son zaman damgasını izler ve geçlik eşiği içindeki tüm kayıtları işler.
Aşağıdaki örnek, pencereli sayım için 10 dakikalık bir filigran eşiği uygular.
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Bu örnekte:
-
event_time
sütunu, 10 dakikalık bir zaman damgası ve 5 dakikalık bir çarpmalı pencere tanımlamak için kullanılır. - Her çakışmayan 5 dakikalık pencereler için gözlemlenen her biri için
id
bir sayı toplanır. - Pencere süresi, en son gözlemlenen
event_time
'dan 10 dakika eski olana kadar her bir sayı için durum bilgileri korunur.
Önemli
Filigran eşikleri, belirtilen eşik içinde gelen kayıtların tanımlı sorgunun semantiğine göre işlenmesini garanti eder. Belirtilen eşiğin dışına gelen geç gelen kayıtlar sorgu ölçümleri kullanılarak yine de işlenebilir, ancak bu garanti edilmemektedir.
Filigranlar işleme süresini ve aktarım hızını nasıl etkiler?
Filigranlar, verilerin havuza ne zaman yazıldığından denetlemek için çıkış modlarıyla etkileşim kurar. Filigranlar işlenecek toplam durum bilgisi miktarını azalttığı için, etkili durum bilgisi aktarım hızı için filigranların etkili kullanımı gereklidir.
Not
Durum bilgisi olan tüm işlemler için tüm çıkış modları desteklenmez.
Pencereli toplamalar için filigranlar ve çıkış modu
Aşağıdaki tabloda, eşik tanımlı bir zaman damgasında toplamaya sahip sorgular için işleme ayrıntıları yer alır:
Çıkış modu | Davranış |
---|---|
Arkasına Ekle | Filigran eşiği geçtikten sonra satırlar hedef tabloya yazılır. Tüm yazma işlemleri gecikme eşiğine göre geciktirilir. Eşik geçtikten sonra eski toplama durumu bırakılır. |
Güncelleştirmek | Sonuçlar hesaplandığında satırlar hedef tabloya yazılır ve yeni veriler geldikçe güncelleştirilebilir ve üzerine yazılabilir. Eşik geçtikten sonra eski toplama durumu bırakılır. |
Tamamla | Toplama durumu bırakılmaz. Hedef tablo her tetikleyiciyle yeniden yazılır. |
Akış akışı birleşimleri için filigranlar ve çıkış
Birden çok akış arasındaki birleştirmeler yalnızca ekleme modunu destekler ve bulunan her toplu işleme eşleşen kayıtlar yazılır. İç birleşimler için Databricks, her akış veri kaynağında bir filigran eşiği ayarlamanızı önerir. Bu, eski kayıtlar için durum bilgilerinin atılmasına olanak tanır. Filigranlar olmadan Yapılandırılmış Akış, birleştirmenin her iki tarafındaki her anahtarı her tetikleyiciyle birleştirmeyi dener.
Yapılandırılmış Akış, dış birleşimleri desteklemek için özel semantiklere sahiptir. Eşik, eşleşmeyen bir anahtara null değer yazılması gerektiğini gösterdiği için dış birleşimler için zorunludur. Dış birleşimlerin veri işleme sırasında hiçbir zaman eşleşmeyen kayıtları kaydetmek için yararlı olabileceğini unutmayın, çünkü birleşimler yalnızca ekleme işlemleri olarak tablolara yazılır, bu eksik veriler geçlik eşiği geçene kadar kaydedilmez.
Yapılandırılmış Akış'ta birden çok filigran politikası ile gecikmiş veri eşiğini denetleme
Birden çok Yapılandırılmış Akış girişiyle çalışırken, geç gelen verilerin tolerans eşiklerini denetlemek için birden çok filigran ayarlayabilirsiniz. Filigranları yapılandırmak, durum bilgilerini denetlemenize ve gecikme süresini etkilemenize olanak tanır.
Akış sorgusu, birleştirilmiş veya birleştirilmiş birden çok giriş akışına sahip olabilir. Giriş akışlarının her biri, durum bilgisi olan işlemler için tolere edilmesi gereken farklı bir geç veri eşiğine sahip olabilir. Giriş akışlarının her birinde kullanarak withWatermarks("eventTime", delay)
bu eşikleri belirtin. Aşağıda stream-stream birleşimlerine sahip örnek bir sorgu verilmiştir.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Yapılandırılmış Akış İşleme, sorguyu çalıştırırken her giriş akışındaki en yüksek olay zamanını ayrı ayrı izler, ilgili gecikmeye göre filigranları hesaplar ve durumlu işlemler için kullanılacak tek bir genel filigran seçer. Varsayılan olarak, minimum değer genel filigran olarak seçilir çünkü bu, akışlardan biri diğerlerinin gerisinde kalırsa (örneğin, akışlardan biri yukarı akış hataları nedeniyle veri almayı durdurduğu için) hiçbir verinin yanlışlıkla çok geç bırakılmamasını sağlar. Başka bir deyişle, genel filigran en yavaş akışın hızında güvenle hareket eder ve sorgu çıkışı buna göre geciktirilir.
Daha hızlı sonuç almak istiyorsanız, SQL yapılandırma spark.sql.streaming.multipleWatermarkPolicy
max
olarak ayarlayarak birden çok filigran ilkesini genel filigran olarak en yüksek değeri seçecek şekilde ayarlayabilirsiniz (varsayılan değer min
). Bu, genel filigranın en hızlı akışın hızında hareket etmesini sağlar. Ancak, bu yapılandırma en yavaş akışlardan verileri bırakır. Bu nedenle, Databricks bu yapılandırmayı yargısal olarak kullanmanızı önerir.
Filigran içindeki yinelenenleri kaldır
Databricks Runtime 13.3 LTS ve üzerinde benzersiz bir tanımlayıcı kullanarak filigran eşiği içindeki kayıtların yinelenenlerini kaldırabilirsiniz.
Yapılandırılmış Akış, tam olarak bir kez işleme garantisi sağlar, ancak veri kaynaklarından kayıtları otomatik olarak yinelenenleri kaldırmaz.
dropDuplicatesWithinWatermark
kullanarak belirtilen herhangi bir alandaki kayıtların yinelenenlerini kaldırabilir ve böylece olay zamanı veya varış saati gibi bazı alanlar farklı olsa bile bir akıştaki yinelenenleri temizleyebilirsiniz.
Belirtilen filigran süresi içerisinde gelen yinelenen kayıtların düşürülmesi garanti edilir. Bu garanti yalnızca bir yönde katıdır ve belirtilen eşiğin dışına ulaşan yinelenen kayıtlar da bırakılabilir. Tüm yinelenenleri kaldırmak için, filigranın gecikme eşiğini, yinelenen olaylar arasındaki maksimum zaman damgası farklarından daha uzun olacak şekilde belirlemeniz gerekir.
Aşağıdaki örnekte olduğu gibi dropDuplicatesWithinWatermark
yöntemini kullanmak için bir filigran belirtmeniz gerekir:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])