Titik pemeriksaan Streaming Terstruktur
Titik pemeriksaan dan log write-ahead bekerja sama untuk memberikan jaminan pemrosesan untuk beban kerja Streaming Terstruktur. Titik pemeriksaan melacak informasi yang mengidentifikasi kueri, termasuk informasi status dan rekaman yang diproses. Saat Anda menghapus file dalam direktori titik pemeriksaan atau mengubah ke lokasi titik pemeriksaan baru, eksekusi kueri berikutnya dimulai dengan segar.
Setiap kueri harus memiliki lokasi titik pemeriksaan yang berbeda. Beberapa kueri tidak boleh berbagi lokasi yang sama.
Mengaktifkan titik pemeriksaan untuk kueri Streaming Terstruktur
Anda harus menentukan checkpointLocation
opsi sebelum menjalankan kueri streaming, seperti dalam contoh berikut:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Catatan
Beberapa sink, seperti output untuk display()
di notebook dan memory
sink, secara otomatis menghasilkan lokasi titik pemeriksaan sementara jika Anda menghilangkan opsi ini. Lokasi titik pemeriksaan sementara ini tidak memastikan toleransi kesalahan atau jaminan konsistensi data dan mungkin tidak dibersihkan dengan benar. Databricks merekomendasikan untuk selalu menentukan lokasi titik pemeriksaan untuk sink ini.
Pulihkan setelah perubahan dalam kueri Streaming Terstruktur
Ada batasan pada perubahan apa yang diizinkan dalam kueri streaming di antara beberapa coba ulang dari lokasi titik pemeriksaan yang sama. Berikut adalah beberapa perubahan yang tidak diizinkan atau efek perubahan tidak ditentukan dengan baik. Untuk semuanya:
- Istilah diizinkan berarti Anda dapat melakukan perubahan yang ditentukan, tetapi apakah semantik efeknya didefinisikan dengan baik, itu tergantung pada kueri dan perubahannya.
- Istilah tidak diizinkan berarti Anda tidak boleh melakukan perubahan yang ditentukan karena kueri yang dimulai ulang kemungkinan akan gagal dengan kesalahan yang tidak dapat diprediksi.
-
sdf
mewakili DataFrame/Dataset streaming yang dihasilkan dengansparkSession.readStream
.
Jenis perubahan dalam kueri Streaming Terstruktur
- Perubahan pada jumlah atau jenis (sumber yang berbeda) dari sumber input: Ini tidak diizinkan.
-
Perubahan parameter sumber input: Terlebih ini diizinkan atau semantik perubahannya didefinisikan dengan baik atau tidak, itu tergantung pada sumber dan kueri. Berikut beberapa contohnya.
Penambahan, penghapusan, dan modifikasi batas tarif diperbolehkan:
spark.readStream.format("kafka").option("subscribe", "article")
ke
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Perubahan pada artikel dan file langganan umumnya tidak diizinkan karena hasilnya tidak dapat diprediksi:
spark.readStream.format("kafka").option("subscribe", "article")
menjadispark.readStream.format("kafka").option("subscribe", "newarticle")
- Perubahan interval pemicu: Anda dapat mengubah pemicu antara batch inkremental dan interval waktu. Lihat Mengubah interval pemicu di antara eksekusi.
-
Perubahan jenis sink output: Diperbolehkan melakukan perubahan di antara beberapa kombinasi sink tertentu. Perlu diverifikasi berdasarkan kasus per kasus. Berikut beberapa contohnya.
- Sink file ke sink Kafka diperbolehkan. Kafka hanya akan melihat data baru.
- Sink Kafka ke sink file tidak diperbolehkan.
- Sink Kafka diubah menjadi foreach, atau sebaliknya, diperbolehkan.
-
Perubahan parameter sink output: Terlebih ini diizinkan atau semantik perubahannya didefinisikan dengan baik atau tidak, itu tergantung pada sink dan kueri. Berikut beberapa contohnya.
- Perubahan pada direktori output sink file tidak diperbolehkan:
sdf.writeStream.format("parquet").option("path", "/somePath")
menjadisdf.writeStream.format("parquet").option("path", "/anotherPath")
- Perubahan pada topik output diizinkan:
sdf.writeStream.format("kafka").option("topic", "topic1")
kesdf.writeStream.format("kafka").option("topic", "topic2")
- Perubahan pada sink foreach yang ditentukan pengguna (yaitu, kode
ForeachWriter
) diperbolehkan, tetapi semantik perubahannya tergantung pada kode.
- Perubahan pada direktori output sink file tidak diperbolehkan:
-
Perubahan dalam proyeksi / filter / operasi seperti peta: Beberapa kasus diperbolehkan. Misalnya:
- Penambahan / penghapusan filter diperbolehkan:
sdf.selectExpr("a")
menjadisdf.where(...).selectExpr("a").filter(...)
. - Perubahan proyeksi dengan skema output yang sama diperbolehkan:
sdf.selectExpr("stringColumn AS json").writeStream
menjadisdf.select(to_json(...).as("json")).writeStream
. - Perubahan proyeksi dengan skema output yang berbeda diperbolehkan secara kondisional:
sdf.selectExpr("a").writeStream
menjadisdf.selectExpr("b").writeStream
hanya diperbolehkan jika sink output memungkinkan perubahan skema dari"a"
ke"b"
.
- Penambahan / penghapusan filter diperbolehkan:
-
Perubahan dalam operasi stateful: Beberapa operasi dalam kueri streaming perlu mempertahankan data status untuk terus memperbarui hasilnya. Structured Streaming secara otomatis memeriksa data status ke penyimpanan yang toleran terhadap kesalahan (misalnya, DBFS, penyimpanan Azure Blob) dan memulihkannya setelah restart. Namun, ini akan mengasumsikan bahwa skema data status tetap sama di seluruh restart. Ini berarti bahwa setiap perubahan (yaitu, penambahan, penghapusan, atau modifikasi skema) pada operasi stateful kueri streaming tidak diizinkan di antara proses restart. Berikut adalah daftar operasi stateful yang skemanya tidak boleh diubah antara restart untuk memastikan pemulihan status:
-
Agregasi streaming: Misalnya,
sdf.groupBy("a").agg(...)
. Tidak diperbolehkan adanya perubahan pada jumlah atau jenis kunci pengelompokan atau agregat. -
Deduplikasi streaming: Misalnya,
sdf.dropDuplicates("a")
. Tidak diperbolehkan adanya perubahan pada jumlah atau jenis kunci pengelompokan atau agregat. -
Gabungan stream-stream: Misalnya,
sdf1.join(sdf2, ...)
(yaitu kedua input dihasilkan dengansparkSession.readStream
). Perubahan dalam skema atau kolom equi-join tidak diperbolehkan. Perubahan pada jenis gabungan (luar atau dalam) tidak diperbolehkan. Perubahan lain yang terjadi dalam kondisi bergabung tidaklah jelas. -
Operasi stateful arbitrer: Misalnya,
sdf.groupByKey(...).mapGroupsWithState(...)
atausdf.groupByKey(...).flatMapGroupsWithState(...)
. Setiap perubahan pada skema status yang ditentukan pengguna dan jenis batas waktu, tidak diperbolehkan. Setiap perubahan dalam fungsi pemetaan status yang ditentukan pengguna diperbolehkan, tetapi efek semantik dari perubahannya tergantung pada logika yang ditentukan pengguna. Jika Anda benar-benar ingin mendukung perubahan skema status, maka Anda dapat secara eksplisit mengode/mendekode struktur data status kompleks Anda menjadi byte dengan skema pengodean/pendekodean yang mendukung migrasi skema. Misalnya, jika Anda menyimpan status Anda sebagai byte yang dikodekan Avro, maka Anda dapat mengubah skema Avro-state-schema antara kueri dimulai ulang karena ini memulihkan status biner.
-
Agregasi streaming: Misalnya,