Menerapkan marka air untuk mengontrol ambang batas pemrosesan data
Artikel ini memperkenalkan konsep dasar marka air dan memberikan rekomendasi untuk menggunakan marka air dalam operasi streaming stateful umum. Anda harus menerapkan marka air ke operasi streaming stateful untuk menghindari perluasan jumlah data yang disimpan dalam status tak terbatas, yang dapat memperkenalkan masalah memori dan meningkatkan latensi pemrosesan selama operasi streaming yang berjalan lama.
Apa itu marka air?
Streaming Terstruktur menggunakan marka air untuk mengontrol ambang batas berapa lama untuk terus memproses pembaruan untuk entitas status tertentu. Contoh umum entitas negara meliputi:
- Agregasi dalam rentang waktu.
- Kunci unik dalam penggabungan antara dua aliran data.
Saat Anda mendeklarasikan marka air, Anda menentukan bidang stempel waktu dan ambang marka air pada DataFrame streaming. Saat data baru tiba, manajer status melacak tanda waktu terbaru di bidang yang ditentukan dan memproses semua rekaman dalam ambang keterlambatan.
Contoh berikut menerapkan ambang batas marka air 10 menit ke jumlah berjendela:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Dalam contoh ini:
- Kolom
event_time
digunakan untuk menentukan marka air 10 menit dan jendela bergulir 5 menit. - Jumlah dikumpulkan untuk setiap
id
yang diamati untuk setiap jendela 5 menit yang tidak tumpang tindih. - Informasi status dipertahankan untuk setiap hitungan hingga akhir jendela lebih lama 10 menit dari
event_time
terbaru yang diamati.
Penting
Ambang batas marka air menjamin bahwa rekaman yang tiba dalam ambang yang ditentukan diproses sesuai dengan semantik kueri yang ditentukan. Rekaman yang terlambat tiba di luar ambang yang ditentukan mungkin masih diproses menggunakan metrik kueri, tetapi ini tidak dijamin.
Bagaimana marka air memengaruhi waktu dan throughput pemrosesan?
Marka air berinteraksi dengan mode output untuk mengontrol kapan data ditulis ke sink. Karena marka air mengurangi jumlah total informasi status yang akan diproses, penggunaan marka air yang efektif sangat penting untuk throughput streaming stateful yang efisien.
Catatan
Tidak semua mode output didukung untuk semua operasi stateful.
Marka air dan mode output untuk agregasi berjendela
Tabel berikut ini merinci pemrosesan kueri dengan agregasi pada cap waktu dengan watermark yang ditentukan.
Mode output | Perilaku |
---|---|
Lampirkan | Baris ditulis ke tabel target setelah ambang batas marka air berlalu. Semua penulisan tertunda berdasarkan ambang keterlambatan. Status agregasi lama dihilangkan setelah ambang batas berlalu. |
Pemutakhiran | Baris ditulis ke tabel target saat hasil dihitung, dan dapat ditimpa dan diperbarui saat data baru tiba. Status agregasi lama dihilangkan setelah ambang batas berlalu. |
Selesai | Status agregasi tidak dihilangkan. Tabel target ditulis ulang setiap kali trigger diaktifkan. |
Marka air dan output untuk gabungan stream-stream
Gabungan antara beberapa aliran hanya mendukung mode penampingan, dan rekaman yang cocok ditulis di setiap batch yang ditemukan. Untuk join dalam, Databricks merekomendasikan pengaturan batas watermark pada setiap sumber data streaming. Ini memungkinkan informasi status dibuang untuk rekaman lama. Tanpa watermark, Structured Streaming mencoba menggabungkan setiap kunci dari kedua sisi join pada setiap pemicu.
Streaming Terstruktur memiliki semantik khusus untuk mendukung gabungan luar. Watermarking wajib untuk gabungan luar, karena menunjukkan kapan kunci harus ditulis dengan nilai null setelah tidak cocok. Perhatikan bahwa meskipun gabungan luar dapat berguna untuk merekam rekaman yang tidak pernah cocok selama pemrosesan data, karena gabungan hanya menulis ke tabel sebagai operasi penambahan, data yang hilang ini tidak direkam sampai setelah ambang keterlambatan berlalu.
Mengontrol ambang batas data terlambat dengan kebijakan multi watermark dalam Streaming Tersusun
Saat bekerja dengan beberapa input Streaming Terstruktur, Anda dapat mengatur beberapa marka air untuk mengontrol ambang toleransi untuk data yang terlambat tiba. Mengonfigurasi marka air memungkinkan Anda mengontrol informasi status dan memengaruhi latensi.
Suatu kueri streaming dapat memiliki beberapa aliran input yang disatukan atau digabungkan bersama. Masing-masing aliran input dapat memiliki ambang data akhir yang berbeda yang perlu ditoleransi untuk operasi stateful. Tentukan ambang batas ini menggunakan withWatermarks("eventTime", delay)
pada setiap aliran input. Berikut ini adalah contoh kueri dengan gabungan stream-stream.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Saat menjalankan kueri, Streaming Terstruktur secara tersendiri melacak waktu peristiwa maksimum yang terlihat di setiap aliran input, menghitung tanda air berdasarkan penundaan yang ada, dan memilih satu tanda air global yang akan digunakan untuk operasi stateful. Secara default, batas minimum dipilih sebagai marka air global karena memastikan bahwa tidak ada data yang secara tidak sengaja dihapus sebagai terlambat jika salah satu aliran tertinggal dari yang lain (misalnya, salah satu aliran berhenti menerima data karena kegagalan upstream). Dengan kata lain, marka air global bergerak dengan aman pada kecepatan aliran paling lambat, dan sebagai akibatnya, output kueri ditunda.
Jika Anda ingin mendapatkan hasil yang lebih cepat, Anda dapat mengatur beberapa kebijakan marka air untuk memilih nilai maksimum sebagai marka air global dengan mengatur konfigurasi SQL spark.sql.streaming.multipleWatermarkPolicy
ke max
(defaultnya adalah min
). Ini memungkinkan marka air global bergerak pada kecepatan aliran tercepat. Namun, konfigurasi ini menghilangkan data dari aliran terlambat. Karena itu, Databricks merekomendasikan agar Anda menggunakan konfigurasi ini secara peradilan.
Menghilangkan duplikat dalam tanda air
Di Databricks Runtime 13.3 LTS ke atas, Anda dapat menghapus duplikasi catatan dalam batas waktu watermark menggunakan pengidentifikasi unik.
Streaming Terstruktur menyediakan jaminan pemrosesan tepat sekali, tetapi tidak secara otomatis mendeduplikasi rekaman dari sumber data. Anda dapat menggunakan dropDuplicatesWithinWatermark
untuk menghapus duplikasi rekaman berdasarkan bidang tertentu, memungkinkan Anda menghapus duplikat dari aliran meskipun beberapa bidang berbeda (misalnya waktu peristiwa atau waktu kedatangan).
Rekaman duplikat yang tiba dalam ambang batas waktu yang ditentukan dijamin dihapus. Jaminan ini hanya ketat dalam satu arah, dan rekaman duplikat yang tiba di luar ambang yang ditentukan mungkin juga dihilangkan. Anda harus mengatur batas waktu penundaan penanda air lebih lama dari selisih waktu maksimum di antara peristiwa duplikat untuk menghapus semua duplikat.
Anda harus menentukan marka air untuk menggunakan metode dropDuplicatesWithinWatermark
, seperti dalam contoh berikut:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])