Aracılığıyla paylaş


Yapılandırılmış Akış denetim noktaları

Denetim noktaları ve önceden yazma günlükleri, Yapılandırılmış Akış iş yükleri için işleme garantileri sağlamak üzere birlikte çalışır. Denetim noktası, durum bilgileri ve işlenen kayıtlar dahil olmak üzere sorguyu tanımlayan bilgileri izler. Bir denetim noktası dizinindeki dosyaları sildiğinizde veya yeni bir denetim noktası konumuna geçtiğinizde, sorgunun bir sonraki çalıştırması yeni başlar.

Her sorgu farklı bir denetim noktası konumuna sahip olmalıdır. Birden çok sorgu hiçbir zaman aynı konumu paylaşmamalıdır.

Yapılandırılmış Akış sorguları için denetim noktası oluşturmayı etkinleştirme

Aşağıdaki örnekte olduğu checkpointLocation gibi bir akış sorgusu çalıştırmadan önce seçeneğini belirtmeniz gerekir:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Not

Bazı çıkışlar, not defterlerindeki display() çıkışı ve memory çıkışı gibi, bu seçeneği belirtmezseniz otomatik olarak geçici bir denetim noktası konumu oluşturur. Bu geçici denetim noktası konumları hataya dayanıklılık veya veri tutarlılığı garantileri sağlamaz ve düzgün temizlenmeyebilir. Databricks, bu havuzlar için her zaman bir denetim noktası konumu belirtmeyi önerir.

Yapılandırılmış Akış sorgusundaki değişikliklerden sonra kurtarma

Aynı denetim noktası konumundan yeniden başlatmalar arasında akış sorgusundaki değişikliklere izin verilen değişikliklerle ilgili sınırlamalar vardır. İzin verilmeyen veya değişikliğin etkisi iyi tanımlanmamış birkaç değişiklik aşağıda açıklanmaktadır. Tümü için:

  • İzin verilen terim, belirtilen değişikliği yapabileceğiniz anlamına gelir, ancak etkisinin semantiğinin iyi tanımlanmış olup olmadığı sorguya ve değişikliğe bağlıdır.
  • İzin verilmeyen terimi, yeniden başlatılan sorgu tahmin edilemeyen hatalarla başarısız olduğundan belirtilen değişikliği yapmamalısınız anlamına gelir.
  • sdf ile sparkSession.readStreamoluşturulan bir akış DataFrame/Veri Kümesi temsil eder.

Yapılandırılmış Akış sorgularındaki değişiklik türleri

  • Giriş kaynaklarının sayısı veya türündeki (farklı kaynak) değişiklikler: Buna izin verilmez.
  • Giriş kaynaklarının parametrelerindeki değişiklikler: Buna izin verilip verilmeyeceği ve değişikliğin semantiğinin iyi tanımlanıp tanımlanmadığı kaynağa ve sorguya bağlıdır. İşte birkaç örnek.
    • Hız sınırlarının eklenmesine, silinmesine ve değiştirilmesine izin verilir:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      kullanıcısı

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Sonuçlar tahmin edilemediğinden, abone olunan makalelerde ve dosyalarda yapılan değişikliklere genellikle izin verilmez: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Tetikleyici aralığındaki değişiklikler: Artımlı toplu işlemler ve zaman aralıkları arasındaki tetikleyicileri değiştirebilirsiniz. Bkz . Çalıştırmalar arasındaki tetikleyici aralıklarını değiştirme.
  • Çıkış havuzu türündeki değişiklikler: Havuzların belirli birkaç bileşimi arasındaki değişikliklere izin verilir. Bunun vaka bazında doğrulanması gerekir. İşte birkaç örnek.
    • Kafka havuzuna dosya havuzuna izin verilir. Kafka yalnızca yeni verileri görür.
    • Dosya havuzuna Kafka havuzuna izin verilmiyor.
    • Kafka havuzu foreach olarak değiştirildi veya buna izin verilir.
  • Çıktı havuzu parametrelerindeki değişiklikler: Buna izin verilip verilmeyeceği ve değişikliğin semantiğinin iyi tanımlanıp tanımlanmadığı havuza ve sorguya bağlıdır. İşte birkaç örnek.
    • Dosya havuzu çıkış dizininde yapılan değişikliklere izin verilmez: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Çıkış konusunda yapılan değişikliklere izin verilir: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Kullanıcı tanımlı foreach havuzundaki değişikliklere (kod ForeachWriter ) izin verilir, ancak değişikliğin semantiği koda bağlıdır.
  • Projeksiyon/filtre/harita benzeri işlemlerde değişikliklere izin verilir: Bazı durumlara izin verilir. Örneğin:
    • Filtrelerin eklenmesine/silinmesine izin verilir: sdf.selectExpr("a") için sdf.where(...).selectExpr("a").filter(...).
    • Aynı çıkış şemasına sahip projeksiyonlarda değişikliklere izin verilir: sdf.selectExpr("stringColumn AS json").writeStream ile sdf.select(to_json(...).as("json")).writeStream.
    • Farklı çıkış şemasına sahip projeksiyonlardaki değişikliklere koşullu olarak izin verilir: sdf.selectExpr("a").writeStreamsdf.selectExpr("b").writeStream yalnızca çıkış havuzu şemanın "a"'den "b"'e değiştirilmesine izin veriyorsa izin verilir.
  • Durum bilgisi olan işlemlerdeki değişiklikler: Akış sorgularındaki bazı işlemlerin sonucu sürekli güncelleştirmek için durum verilerini tutması gerekir. Yapılandırılmış Akış, durum verilerini hataya dayanıklı depolamaya (örneğin, DBFS, Azure Blob depolama) otomatik olarak denetler ve yeniden başlatıldıktan sonra geri yükler. Ancak bu, durum verilerinin şemasının yeniden başlatmalar arasında aynı kaldığı varsayılır. Bu, akış sorgusunun durum bilgisi olan işlemlerinde yapılan değişikliklere (eklemeler, silmeler veya şema değişiklikleri) yeniden başlatmalar arasında izin verilmediği anlamına gelir. Durum kurtarmayı güvence altına almak için şeması yeniden başlatmalar arasında değiştirilmemesi gereken durum bilgisi olan işlemlerin listesi aşağıdadır:
    • Akış toplama: Örneğin, sdf.groupBy("a").agg(...). Gruplandırma anahtarlarının veya toplamaların sayısında veya türünde herhangi bir değişikliğe izin verilmez.
    • Yinelenenleri kaldırma akışı: Örneğin, sdf.dropDuplicates("a"). Gruplandırma anahtarlarının veya toplamaların sayısında veya türünde herhangi bir değişikliğe izin verilmez.
    • Stream-stream join: Örneğin, sdf1.join(sdf2, ...) (her iki giriş de sparkSession.readStreamile oluşturulur). Şema veya eşit birleştirme sütunlarında değişikliklere izin verilmez. Birleştirme türündeki (dış veya iç) değişikliklere izin verilmiyor. Birleştirme koşulundaki diğer değişiklikler kötü tanımlanmıştır.
    • Durum bilgisi olan rastgele işlem: Örneğin, sdf.groupByKey(...).mapGroupsWithState(...) veya sdf.groupByKey(...).flatMapGroupsWithState(...). Kullanıcı tanımlı durumun şemasında ve zaman aşımı türünde herhangi bir değişikliğe izin verilmez. Kullanıcı tanımlı durum eşleme işlevindeki herhangi bir değişikliğe izin verilir, ancak değişikliğin anlamsal etkisi kullanıcı tanımlı mantığa bağlıdır. Durum şeması değişikliklerini gerçekten desteklemek istiyorsanız, şema geçişini destekleyen bir kodlama/kod çözme şeması kullanarak karmaşık durum veri yapılarınızı açıkça bayt olarak kodlayabilir/kodunu çözebilirsiniz. Örneğin, durumunuzu Avro ile kodlanmış baytlar olarak kaydederseniz, sorguların yeniden başlatılması arasında ikili durumu geri yükledikçe Avro durum şemasını değiştirebilirsiniz.