Jalankan beban kerja Streaming Terstruktur pertama Anda
Artikel ini menyediakan contoh kode dan penjelasan konsep dasar yang diperlukan untuk menjalankan kueri Streaming Terstruktur pertama Anda di Azure Databricks. Anda dapat menggunakan Streaming Terstruktur untuk beban kerja pemrosesan hampir real-time dan tambahan.
Streaming Terstruktur adalah salah satu dari beberapa teknologi yang mendukung tabel streaming di DLT. Databricks merekomendasikan penggunaan DLT untuk semua beban kerja ETL, penyerapan, dan Streaming Terstruktur baru. Lihat Apa itu DLT?.
Catatan
Meskipun DLT menyediakan sintaksis yang sedikit dimodifikasi untuk mendeklarasikan tabel streaming, sintaks umum untuk mengonfigurasi bacaan dan transformasi streaming berlaku untuk semua kasus penggunaan streaming di Azure Databricks. DLT juga menyederhanakan streaming dengan mengelola informasi status, metadata, dan banyak konfigurasi.
Menggunakan Auto Loader untuk membaca data streaming dari penyimpanan objek
Contoh berikut menunjukkan pemuatan data JSON dengan Auto Loader, yang digunakan cloudFiles
untuk menunjukkan format dan opsi. Opsi ini schemaLocation
memungkinkan inferensi dan evolusi skema. Tempelkan kode berikut di sel buku catatan Databricks dan jalankan sel untuk membuat DataFrame streaming bernama raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Seperti operasi baca lainnya di Azure Databricks, mengonfigurasi bacaan streaming tidak benar-benar memuat data. Anda harus melakukan tindakan pada data sebelum aliran data dimulai.
Catatan
Memanggil display()
pada DataFrame streaming akan memulai pekerjaan streaming. Untuk sebagian besar kasus penggunaan Structured Streaming, tindakan yang memicu streaming harus menulis data ke sink. Lihat Pertimbangan produksi untuk Streaming Terstruktur.
Melakukan transformasi streaming
Streaming Terstruktur mendukung sebagian besar transformasi yang tersedia di Azure Databricks dan Spark SQL. Anda bahkan dapat memuat model MLflow sebagai UDF dan membuat prediksi streaming sebagai transformasi.
Contoh kode berikut menyelesaikan transformasi sederhana untuk memperkaya data JSON yang diserap dengan informasi tambahan menggunakan fungsi Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
Yang dihasilkan transformed_df
berisi instruksi kueri untuk memuat dan mengubah setiap rekaman saat tiba di sumber data.
Catatan
Pemrosesan Streaming Terstruktur memperlakukan sumber data sebagai dataset tidak terbatas. Dengan demikian, beberapa transformasi tidak didukung dalam beban kerja Streaming Terstruktur karena akan memerlukan pengurutan jumlah item yang tak terbatas.
Sebagian besar agregasi dan banyak gabungan memerlukan pengelolaan informasi status dengan marka air, jendela, dan mode output. Lihat Terapkan watermark untuk mengontrol ambang batas pemrosesan data.
Lakukan penulisan batch bertahap ke Delta Lake
Contoh berikut menulis ke Delta Lake menggunakan jalur file dan titik pemeriksaan tertentu.
Penting
Selalu pastikan Anda menentukan lokasi titik pemeriksaan unik untuk setiap penulis streaming yang Anda konfigurasi. Titik pemeriksaan menyediakan identitas unik untuk streaming Anda, melacak semua rekaman yang diproses dan informasi status yang terkait dengan kueri streaming Anda.
Pengaturan availableNow
untuk pemicu menginstruksikan Streaming Terstruktur untuk memproses semua rekaman yang sebelumnya tidak diproses dari himpunan data sumber lalu dimatikan, sehingga Anda dapat dengan aman menjalankan kode berikut tanpa khawatir meninggalkan aliran yang berjalan:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
Dalam contoh ini, tidak ada catatan baru yang tiba di sumber data kami, jadi eksekusi ulangi kode ini tidak menyerap rekaman baru.
Peringatan
Eksekusi Streaming Terstruktur dapat mencegah penghentian otomatis mematikan sumber daya komputasi. Untuk menghindari biaya tak terduga, pastikan untuk menghentikan kueri streaming.
Membaca data dari Delta Lake, mengubahnya, dan menulis kembali ke Delta Lake
Delta Lake memiliki dukungan luas untuk bekerja dengan Streaming Terstruktur sebagai sumber dan penampung. Lihat Pembacaan dan penulisan tabel Delta secara streaming.
Contoh berikut menunjukkan sintaks contoh untuk memuat semua rekaman baru secara bertahap dari tabel Delta, menggabungkannya dengan rekam jepret tabel Delta lain, dan menulisnya ke tabel Delta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Anda harus memiliki konfigurasi izin yang tepat untuk membaca tabel sumber, menulis ke tabel tujuan, dan lokasi checkpoint yang sudah ditentukan. Isi semua parameter yang ditandai dengan tanda kurung sudut (<>
) menggunakan nilai yang relevan untuk sumber data dan sink Anda.
Catatan
DLT menyediakan sintaksis deklaratif penuh untuk membuat alur Delta Lake dan mengelola properti seperti pemicu dan titik pemeriksaan secara otomatis. Lihat Apa itu DLT?.
Membaca data dari Kafka, mengubah, dan menulis ke Kafka
Apache Kafka dan bus olahpesan lainnya menyediakan beberapa latensi terendah yang tersedia untuk himpunan data besar. Anda dapat menggunakan Azure Databricks untuk menerapkan transformasi ke data yang diserap dari Kafka lalu menulis data kembali ke Kafka.
Catatan
Menulis data ke penyimpanan objek cloud menambahkan overhead latensi tambahan. Jika Anda ingin menyimpan data dari bus olahpesan di Delta Lake tetapi memerlukan latensi serendah mungkin untuk beban kerja streaming, Databricks merekomendasikan untuk mengonfigurasi pekerjaan streaming terpisah untuk menyerap data ke lakehouse dan menerapkan transformasi mendekati real-time untuk sink bus olahpesan hilir.
Contoh kode berikut menunjukkan pola sederhana untuk memperkaya data dari Kafka dengan menggabungkannya dengan data dalam tabel Delta lalu menulis kembali ke Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Anda harus memiliki izin yang tepat yang dikonfigurasi untuk akses ke layanan Kafka Anda. Isi semua parameter yang ditandai dengan tanda kurung sudut (<>
) menggunakan nilai yang relevan untuk sumber data dan sink Anda. Lihat Pemrosesan aliran dengan Apache Kafka dan Azure Databricks.