Aracılığıyla paylaş


Filigranlar kullanarak DLT'de durum bilgili işlemeyi optimize etme

Durumda tutulan verileri etkili bir şekilde yönetmek için, DLT'de durum bilgisi olan akış işleme gerçekleştirirken toplamalar, birleştirmeler ve yinelenenleri kaldırma da dahil olmak üzere filigranları kullanın. Bu makalede, DLT sorgularınızda filigranların nasıl kullanılacağı açıklanır ve önerilen işlemlerin örnekleri yer alır.

Not

Toplamalar gerçekleştiren sorguların kademeli olarak işlenmesini ve her güncellemede tam olarak yeniden hesaplanmamasını sağlamak için su işaretlerini kullanmanız gerekir.

Filigran nedir?

Akış işlemede filigran, toplamalar gibi durum bilgisi taşıyan işlemleri gerçekleştirirken verileri işlemek için zamana bağlı eşik tanımlayabilen bir Apache Spark özelliğidir. Gelen veriler eşiğe ulaşılana kadar işlenir ve bu noktada eşik tarafından tanımlanan zaman penceresi kapatılır. Filigranlar, daha büyük veri kümelerini veya uzun süre çalışan işlemleri işlerken sorgu işleme sırasında sorun yaşamamak için kullanılabilir. Bu sorunlar, işleme sırasında durumda tutulan veri miktarı nedeniyle yüksek gecikme ve hatta yetersiz bellek (OOM) hatalarını içerebilir. Akış verileri doğası gereği sıralanmamış olduğundan, filigranlar zaman penceresi toplamaları gibi işlemlerin doğru hesaplanmasına da destek olur.

Akış işlemede filigranları kullanma hakkında daha fazla bilgi edinmek için bkz. Apache Spark Yapılandırılmış Akışta Filigran Oluşturma ve Veri İşleme Eşiklerini Kontrol Etmek İçin Filigranları Uygulama.

Filigranı nasıl tanımlarsınız?

Bir zaman damgası alanı ve geç veri gelmesi için zaman eşiğini temsil eden bir değer belirterek filigran tanımlarsınız. Veriler, tanımlanan zaman eşiğinden sonra ulaşırsa geç kabul edilir. Örneğin, eşik 10 dakika olarak tanımlanırsa, 10 dakikalık eşikten sonra gelen kayıtlar bırakılabilir.

Tanımlanan eşikten sonra gelen kayıtlar bırakılabileceğinden, gecikme süresi ile doğruluk gereksinimlerinizi karşılayan bir eşik seçmek önemlidir. Daha küçük bir eşik seçmek kayıtların daha erken gönderilmesine neden olur, ancak geç kayıtların bırakılma olasılığının daha yüksek olduğu anlamına gelir. Daha büyük bir eşik, verilerin daha uzun beklemesi ancak büyük olasılıkla daha eksiksiz olması anlamına gelir. Büyük durum boyutu nedeniyle, daha büyük bir eşik ek bilgi işlem kaynakları da gerektirebilir. Eşik değeri verilerinize ve işleme gereksinimlerinize bağlı olduğundan, en uygun eşiği belirlemek için işlemenizin test edilmesi ve izlenmesi önemlidir.

Python'da filigran tanımlamak için withWatermark() işlevini kullanırsınız. SQL'de filigran tanımlamak için WATERMARK yan tümcesini kullanın:

Piton

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Stream-stream birleşimleriyle filigranları kullanma

Akış akışı birleşimleri için birleştirmenin her iki tarafında bir filigran ve bir zaman aralığı yan tümcesi tanımlamanız gerekir. Her birleştirme kaynağının verilerin tam görünümüne sahip olmaması nedeniyle, akış motoruna artık eşleşmeler yapılamayacağını bildirmek için zaman aralığı ifadesi gereklidir. Zaman aralığı yan tümcesi, filigranları tanımlamak için kullanılan alanlarla aynı olmalıdır.

Her bir akışın filigranlar için farklı eşikler gerektirdiği zamanlar olabileceğinden, akışların aynı eşiklere sahip olması gerekmez. Eksik verileri önlemek için akış motoru, en yavaş akışı temel alan tek bir global filigran tutar.

Aşağıdaki örnek, reklam gösterim akışı ve kullanıcıların reklama tıklama akışını birleştirir. Bu örnekte, gösterimden sonra 3 dakika içinde bir tıklama gerçekleşmelidir. 3 dakikalık zaman aralığı geçtikten sonra, artık eşleştirilemeyecek durumdaki satırlar göz ardı edilir.

Piton

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Filigranlarla pencereli agregasyonlar gerçekleştirme

Akış verileri üzerinde durum bilgisi olan yaygın bir işlem, pencereli toplama işlemidir. Pencereli toplamalar, gruplandırılmış toplamalara benzer, ancak tanımlı pencerenin parçası olan satır kümesi için toplama değerleri döndürülür.

Bir pencere belirli bir uzunluk olarak tanımlanabilir ve bu pencerenin parçası olan tüm satırlarda toplama işlemi gerçekleştirilebilir. Spark Streaming üç pencere türünü destekler:

  • Atlayan (sabit) pencereler: Sabit boyutlu, çakışmayan ve bitişik zaman aralıkları dizisi. Giriş kaydı yalnızca tek bir pencereye ait.
  • Kayan pencereler: Yuvarlanan pencerelere benzer olarak, kayan pencereler sabit boyutludur, ancak pencereler çakışabilir ve bir kayıt birden fazla pencereye düşebilir.

Veriler pencerenin sonu artı filigran uzunluğunu geçtiğinde, pencere için yeni veri kabul edilmez, toplamanın sonucu çıkartılır ve pencerenin durumu temizlenir.

Aşağıdaki örnek, sabit bir pencere kullanarak her 5 dakikada bir gösterimlerin toplamını hesaplar. Bu örnekte select yan tümcesi impressions_windowdiğer adını kullanır ve pencerenin kendisi GROUP BY yan tümcesinin bir parçası olarak tanımlanır. Pencere, bu örnekte filigran olarak kullanılan clickTimestamp sütunu ile aynı zaman damgası sütununu temel almalıdır.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Python'da saatlik sabit pencerelerde kar hesaplamaya benzer bir örnek:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Akış kayıtlarındaki tekrarlananları kaldırma

Yapılandırılmış Akış, tam olarak bir kez işleme garantilerine sahiptir ancak veri kaynaklarından kayıtların kopyalarını otomatik olarak kaldırmaz. Örneğin, birçok ileti kuyruğu en az bir kez teslim garantisine sahip olduğundan, bu tür bir ileti kuyruğundan okurken yinelenen kayıtlar beklenmelidir. dropDuplicatesWithinWatermark() işlevini, belirtilen herhangi bir alan üzerindeki kayıtların yinelenenlerini kaldırmak için kullanabilirsiniz, böylece bazı alanlar farklı olsa bile (örneğin, olay zamanı veya varış saati) bir akıştan yinelenenleri kaldırabilirsiniz. dropDuplicatesWithinWatermark() işlevini kullanmak için bir filigran belirtmeniz gerekir. Filigran tarafından belirtilen zaman aralığı içinde gelen tüm yinelenen veriler atılır.

Sıralı veriler önemlidir çünkü sıralı olmayan veriler filigran değerinin yanlış bir şekilde atlana neden olur. Daha sonra, eski veriler geldiğinde geç kaldığı kabul edilir ve görmezden gelinir. filigranda belirtilen zaman damgasına göre ilk anlık görüntüyü işlemek için withEventTimeOrder seçeneğini kullanın. withEventTimeOrder seçeneği, veri kümesini tanımlayan kodda veya spark.databricks.delta.withEventTimeOrder.enabledkullanılarak işlem hattı ayarlarında bildirilebilir. Örneğin:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Not

withEventTimeOrder seçeneği yalnızca Python ile desteklenir.

Aşağıdaki örnekte, veriler clickTimestampsırasına göre işlenir ve 5 saniye içinde birbirine ulaşan ve yinelenen userId ve clickAdId sütunlarına sahip kayıtlar silinir.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Durum bilgili işleme için işlem hattı yapılandırmasını optimize etme

Databricks, üretim sorunlarını ve aşırı gecikme süresini önlemeye yardımcı olmak için, özellikle işlemeniz büyük miktarda ara durumdan tasarruf edilmesini gerektiriyorsa durum bilgisi olan akış işlemeniz için RocksDB tabanlı durum yönetiminin etkinleştirilmesini önerir.

Sunucusuz işlem hatları, durum deposu yapılandırmalarını otomatik olarak yönetir.

İşlem hattını dağıtmadan önce aşağıdaki yapılandırmayı ayarlayarak RocksDB tabanlı durum yönetimini etkinleştirebilirsiniz:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

RocksDB yapılandırma önerileri de dahil olmak üzere RocksDB durum deposu hakkında daha fazla bilgi edinmek için bkz. Azure Databricksüzerinde RocksDB durum depounu yapılandırma .