Aracılığıyla paylaş


Rastgele veri havuzlarına yazmak için foreachBatch kullanın

Bu makalede, bir akış sorgusunun çıkışını var olan akış havuzu olmayan veri kaynaklarına yazmak için Yapılandırılmış Akış ile birlikte kullanımı foreachBatch ele alınmaktadır.

Kod düzeni streamingDF.writeStream.foreachBatch(...) , akış sorgusunun her mikro toplu işleminin çıkış verilerine toplu iş işlevleri uygulamanıza olanak tanır. foreachBatch ile kullanılan işlevler iki parametre alır:

  • Bir mikro toplu işlemin çıkış verilerini içeren bir DataFrame.
  • Mikro toplu işlemin benzersiz kimliği.

Yapılandırılmış Akış'ta Delta Lake birleştirme işlemleri için kullanmanız foreachBatch gerekir. Bkz . foreachBatch kullanarak akış sorgularından Upsert.

Ek DataFrame işlemleri uygulama

Spark bu gibi durumlarda artımlı plan oluşturulmasını desteklemediğinden birçok DataFrame ve Veri Kümesi işlemi akış DataFrame'lerde desteklenmez. kullanarak foreachBatch() bu işlemlerin bazılarını her bir mikro toplu iş çıkışına uygulayabilirsiniz. Örneğin, akış toplamalarının çıkışını güncelleştirme modunda bir Delta tablosuna yazmak için foreachBatch() ve SQL MERGE INTO işlemini kullanabilirsiniz. Daha fazla ayrıntı için MERGE INTO'a bakın.

Önemli

  • foreachBatch() yalnızca en az bir kez yazma garantisi sağlar. Ancak, işleve sağlanan batchId'ı, çıktıyı tekrardan arındırmak ve yalnız bir kez işlenmesini garanti etmek için kullanabilirsiniz. Her iki durumda da, uçtan uca semantiği kendiniz düşünmeniz gerekir.
  • foreachBatch() temel olarak bir akış sorgusunun mikro toplu yürütmesine bağlı olduğundan sürekli işleme moduyla çalışmaz. Verileri sürekli modda yazarsanız, bunun yerine kullanın foreach() .

ile foreachBatch() boş bir veri çerçevesi çağrılabilir ve doğru işleme izin vermek için kullanıcı kodunun dayanıklı olması gerekir. Aşağıda bir örnek verilmiştir:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0'da için foreachBatch davranış değişiklikleri

Paylaşılan erişim moduyla yapılandırılan işlemde Databricks Runtime 14.0 ve üzerinde aşağıdaki davranış değişiklikleri geçerlidir:

  • print() komutları sürücü günlüklerine çıkış yazar.
  • İşlevin dbutils.widgets içindeki alt modüle erişemezsiniz.
  • İşlevde başvurulan tüm dosyalar, modüller veya nesneler serileştirilebilir ve Spark'ta kullanılabilir olmalıdır.

Mevcut toplu iş veri kaynaklarını yeniden kullanma

kullanarak foreachBatch(), Yapılandırılmış Akış desteğine sahip olmayan veri havuzları için mevcut toplu veri yazıcılarını kullanabilirsiniz. İşte birkaç örnek:

diğer birçok toplu iş veri kaynağı adresinden foreachBatch()kullanılabilir. Bkz . Veri kaynaklarına bağlanma.

Birden çok konuma yazma

Bir akış sorgusunun çıkışını birden çok konuma yazmanız gerekiyorsa Databricks, en iyi paralelleştirme ve aktarım hızı için birden çok Yapılandırılmış Akış yazıcısı kullanmanızı önerir.

Birden foreachBatch çok havuza yazmak için kullanılması, akış yazma işlemlerinin yürütülmesini seri hale getirerek her mikro toplu işlem için gecikme süresini artırabilir.

Birden çok Delta tablosuna yazmak için kullanırsanız, foreachBatch'deki İdempotent tablo yazımları bölümüne bakınız.