Filigranlar kullanarak DLT'de durum bilgili işlemeyi optimize etme
Durumda tutulan verileri etkili bir şekilde yönetmek için, DLT'de durum bilgisi olan akış işleme gerçekleştirirken toplamalar, birleştirmeler ve yinelenenleri kaldırma da dahil olmak üzere filigranları kullanın. Bu makalede, DLT sorgularınızda filigranların nasıl kullanılacağı açıklanır ve önerilen işlemlerin örnekleri yer alır.
Not
Toplamalar gerçekleştiren sorguların kademeli olarak işlenmesini ve her güncellemede tam olarak yeniden hesaplanmamasını sağlamak için su işaretlerini kullanmanız gerekir.
Filigran nedir?
Akış işlemede filigran, toplamalar gibi durum bilgisi taşıyan işlemleri gerçekleştirirken verileri işlemek için zamana bağlı eşik tanımlayabilen bir Apache Spark özelliğidir. Gelen veriler eşiğe ulaşılana kadar işlenir ve bu noktada eşik tarafından tanımlanan zaman penceresi kapatılır. Filigranlar, daha büyük veri kümelerini veya uzun süre çalışan işlemleri işlerken sorgu işleme sırasında sorun yaşamamak için kullanılabilir. Bu sorunlar, işleme sırasında durumda tutulan veri miktarı nedeniyle yüksek gecikme ve hatta yetersiz bellek (OOM) hatalarını içerebilir. Akış verileri doğası gereği sıralanmamış olduğundan, filigranlar zaman penceresi toplamaları gibi işlemlerin doğru hesaplanmasına da destek olur.
Akış işlemede filigranları kullanma hakkında daha fazla bilgi edinmek için bkz. Apache Spark Yapılandırılmış Akışta Filigran Oluşturma ve Veri İşleme Eşiklerini Kontrol Etmek İçin Filigranları Uygulama.
Filigranı nasıl tanımlarsınız?
Bir zaman damgası alanı ve geç veri gelmesi için zaman eşiğini temsil eden bir değer belirterek filigran tanımlarsınız. Veriler, tanımlanan zaman eşiğinden sonra ulaşırsa geç kabul edilir. Örneğin, eşik 10 dakika olarak tanımlanırsa, 10 dakikalık eşikten sonra gelen kayıtlar bırakılabilir.
Tanımlanan eşikten sonra gelen kayıtlar bırakılabileceğinden, gecikme süresi ile doğruluk gereksinimlerinizi karşılayan bir eşik seçmek önemlidir. Daha küçük bir eşik seçmek kayıtların daha erken gönderilmesine neden olur, ancak geç kayıtların bırakılma olasılığının daha yüksek olduğu anlamına gelir. Daha büyük bir eşik, verilerin daha uzun beklemesi ancak büyük olasılıkla daha eksiksiz olması anlamına gelir. Büyük durum boyutu nedeniyle, daha büyük bir eşik ek bilgi işlem kaynakları da gerektirebilir. Eşik değeri verilerinize ve işleme gereksinimlerinize bağlı olduğundan, en uygun eşiği belirlemek için işlemenizin test edilmesi ve izlenmesi önemlidir.
Python'da filigran tanımlamak için withWatermark()
işlevini kullanırsınız. SQL'de filigran tanımlamak için WATERMARK
yan tümcesini kullanın:
Piton
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Stream-stream birleşimleriyle filigranları kullanma
Akış akışı birleşimleri için birleştirmenin her iki tarafında bir filigran ve bir zaman aralığı yan tümcesi tanımlamanız gerekir. Her birleştirme kaynağının verilerin tam görünümüne sahip olmaması nedeniyle, akış motoruna artık eşleşmeler yapılamayacağını bildirmek için zaman aralığı ifadesi gereklidir. Zaman aralığı yan tümcesi, filigranları tanımlamak için kullanılan alanlarla aynı olmalıdır.
Her bir akışın filigranlar için farklı eşikler gerektirdiği zamanlar olabileceğinden, akışların aynı eşiklere sahip olması gerekmez. Eksik verileri önlemek için akış motoru, en yavaş akışı temel alan tek bir global filigran tutar.
Aşağıdaki örnek, reklam gösterim akışı ve kullanıcıların reklama tıklama akışını birleştirir. Bu örnekte, gösterimden sonra 3 dakika içinde bir tıklama gerçekleşmelidir. 3 dakikalık zaman aralığı geçtikten sonra, artık eşleştirilemeyecek durumdaki satırlar göz ardı edilir.
Piton
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Filigranlarla pencereli agregasyonlar gerçekleştirme
Akış verileri üzerinde durum bilgisi olan yaygın bir işlem, pencereli toplama işlemidir. Pencereli toplamalar, gruplandırılmış toplamalara benzer, ancak tanımlı pencerenin parçası olan satır kümesi için toplama değerleri döndürülür.
Bir pencere belirli bir uzunluk olarak tanımlanabilir ve bu pencerenin parçası olan tüm satırlarda toplama işlemi gerçekleştirilebilir. Spark Streaming üç pencere türünü destekler:
- Atlayan (sabit) pencereler: Sabit boyutlu, çakışmayan ve bitişik zaman aralıkları dizisi. Giriş kaydı yalnızca tek bir pencereye ait.
- Kayan pencereler: Yuvarlanan pencerelere benzer olarak, kayan pencereler sabit boyutludur, ancak pencereler çakışabilir ve bir kayıt birden fazla pencereye düşebilir.
Veriler pencerenin sonu artı filigran uzunluğunu geçtiğinde, pencere için yeni veri kabul edilmez, toplamanın sonucu çıkartılır ve pencerenin durumu temizlenir.
Aşağıdaki örnek, sabit bir pencere kullanarak her 5 dakikada bir gösterimlerin toplamını hesaplar. Bu örnekte select yan tümcesi impressions_window
diğer adını kullanır ve pencerenin kendisi GROUP BY
yan tümcesinin bir parçası olarak tanımlanır. Pencere, bu örnekte filigran olarak kullanılan clickTimestamp
sütunu ile aynı zaman damgası sütununu temel almalıdır.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Python'da saatlik sabit pencerelerde kar hesaplamaya benzer bir örnek:
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Akış kayıtlarındaki tekrarlananları kaldırma
Yapılandırılmış Akış, tam olarak bir kez işleme garantilerine sahiptir ancak veri kaynaklarından kayıtların kopyalarını otomatik olarak kaldırmaz. Örneğin, birçok ileti kuyruğu en az bir kez teslim garantisine sahip olduğundan, bu tür bir ileti kuyruğundan okurken yinelenen kayıtlar beklenmelidir.
dropDuplicatesWithinWatermark()
işlevini, belirtilen herhangi bir alan üzerindeki kayıtların yinelenenlerini kaldırmak için kullanabilirsiniz, böylece bazı alanlar farklı olsa bile (örneğin, olay zamanı veya varış saati) bir akıştan yinelenenleri kaldırabilirsiniz.
dropDuplicatesWithinWatermark()
işlevini kullanmak için bir filigran belirtmeniz gerekir. Filigran tarafından belirtilen zaman aralığı içinde gelen tüm yinelenen veriler atılır.
Sıralı veriler önemlidir çünkü sıralı olmayan veriler filigran değerinin yanlış bir şekilde atlana neden olur. Daha sonra, eski veriler geldiğinde geç kaldığı kabul edilir ve görmezden gelinir. filigranda belirtilen zaman damgasına göre ilk anlık görüntüyü işlemek için withEventTimeOrder
seçeneğini kullanın.
withEventTimeOrder
seçeneği, veri kümesini tanımlayan kodda veya spark.databricks.delta.withEventTimeOrder.enabled
kullanılarak işlem hattı ayarlarında bildirilebilir. Örneğin:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Not
withEventTimeOrder
seçeneği yalnızca Python ile desteklenir.
Aşağıdaki örnekte, veriler clickTimestamp
sırasına göre işlenir ve 5 saniye içinde birbirine ulaşan ve yinelenen userId
ve clickAdId
sütunlarına sahip kayıtlar silinir.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Durum bilgili işleme için işlem hattı yapılandırmasını optimize etme
Databricks, üretim sorunlarını ve aşırı gecikme süresini önlemeye yardımcı olmak için, özellikle işlemeniz büyük miktarda ara durumdan tasarruf edilmesini gerektiriyorsa durum bilgisi olan akış işlemeniz için RocksDB tabanlı durum yönetiminin etkinleştirilmesini önerir.
Sunucusuz işlem hatları, durum deposu yapılandırmalarını otomatik olarak yönetir.
İşlem hattını dağıtmadan önce aşağıdaki yapılandırmayı ayarlayarak RocksDB tabanlı durum yönetimini etkinleştirebilirsiniz:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
RocksDB yapılandırma önerileri de dahil olmak üzere RocksDB durum deposu hakkında daha fazla bilgi edinmek için bkz. Azure Databricksüzerinde RocksDB durum depounu yapılandırma .