Mengalirkan rekaman ke layanan eksternal dengan sink DLT
Penting
API sink
DLT berada dalam Pratinjau Umum.
Artikel ini menjelaskan API sink
DLT dan cara menggunakannya dengan alur DLT untuk menulis rekaman yang diubah oleh pipeline ke sink data eksternal seperti Katalog Unity untuk tabel yang dikelola dan eksternal, tabel metastore Apache Hive, dan layanan streaming peristiwa seperti Apache Kafka atau Azure Event Hubs.
Apa itu sink DLT?
Sink DLT memungkinkan Anda menulis data yang telah diubah ke target seperti layanan streaming acara seperti Apache Kafka atau Azure Event Hubs, dan tabel eksternal yang dikelola oleh Katalog Unity atau metastore Apache Hive. Sebelumnya, tabel streaming dan tampilan materialisasi yang dibuat dalam alur DLT hanya dapat dipertahankan pada tabel Delta yang terkelola oleh Azure Databricks. Dengan menggunakan sink, Anda sekarang memiliki lebih banyak opsi untuk mempertahankan output alur DLT Anda.
Kapan saya harus menggunakan sink DLT?
Databricks merekomendasikan penggunaan sink DLT jika Anda perlu:
- Buat kasus penggunaan operasional seperti deteksi penipuan, analitik real time, dan rekomendasi pelanggan. Kasus penggunaan operasional biasanya membaca data dari bus pesan, seperti topik Apache Kafka, lalu memproses data dengan latensi rendah dan menulis rekaman yang diproses kembali ke bus pesan. Pendekatan ini memungkinkan Anda untuk mencapai latensi yang lebih rendah dengan tidak menulis atau membaca dari penyimpanan cloud.
- Tulis data yang telah diubah dari alur DLT Anda ke tabel yang dikelola oleh instans Delta eksternal, termasuk tabel yang dikelola oleh Katalog Unity dan tabel eksternal, serta tabel metastore Apache Hive.
- Lakukan proses extract-transform-load (ETL) terbalik ke sink di luar Databricks, seperti topik Apache Kafka. Pendekatan ini memungkinkan Anda untuk secara efektif mendukung kasus penggunaan di mana data perlu dibaca atau digunakan di luar tabel Unity Catalog atau penyimpanan terkelola Databricks lainnya.
Bagaimana cara menggunakan sink DLT?
Nota
- Hanya kueri streaming yang menggunakan
spark.readStream
dandlt.read_stream
yang didukung. Kueri "batch" tidak didukung. - Hanya
append_flow
yang dapat digunakan untuk menulis ke sink. Alur lain, sepertiapply_changes
, tidak didukung. - Menjalankan pembaruan refresh penuh tidak membersihkan data hasil yang dihitung sebelumnya pada sink. Ini berarti bahwa setiap data yang diolah ulang akan ditambahkan ke sink, dan data yang ada tidak akan diubah.
Karena data peristiwa diserap dari sumber streaming ke dalam alur DLT Anda, Anda memproses dan memperbaiki data ini menggunakan fungsionalitas DLT lalu menggunakan pemrosesan aliran tambahan untuk mengalirkan rekaman data yang diubah ke sink DLT. Anda membuat sink ini menggunakan fungsi create_sink()
. Untuk detail selengkapnya tentang menggunakan fungsi create_sink
, lihat referensi API sink .
Untuk menerapkan sink DLT, gunakan langkah-langkah berikut:
- Siapkan alur DLT untuk memproses data peristiwa streaming dan menyiapkan rekaman data untuk ditulis ke sink DLT.
- Konfigurasikan dan buat sink DLT untuk menggunakan format sink target pilihan.
- Gunakan alur tambahan untuk menulis rekaman yang disiapkan ke sink.
Langkah-langkah ini tercakup dalam topik lainnya.
Menyiapkan alur DLT untuk menyiapkan rekaman untuk ditulis ke sink
Langkah pertama adalah menyiapkan alur DLT untuk mengubah data aliran peristiwa mentah menjadi data yang disiapkan yang akan Anda tulis ke sink Anda.
Untuk lebih memahami proses ini, Anda dapat mengikuti contoh alur DLT ini yang memproses data peristiwa clickstream dari data sampel wikipedia-datasets
di Databricks. Alur ini menguraikan himpunan data mentah untuk mengidentifikasi halaman Wikipedia yang ditautkan ke halaman dokumentasi Apache Spark dan secara progresif menyempurnakan data tersebut hanya ke baris tabel tempat tautan yang merujuk berisi Apache_Spark.
Dalam contoh ini, alur DLT disusun menggunakan arsitektur medali , yang mengatur data ke dalam lapisan yang berbeda untuk meningkatkan kualitas dan efisiensi pemrosesan.
Untuk memulai, muat rekaman JSON mentah dari himpunan data ke lapisan perunggu Anda menggunakan Auto Loader. Kode Python ini menunjukkan cara membuat tabel streaming bernama clickstream_raw
, yang berisi data mentah yang tidak diolah dari sumber:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
Setelah kode ini berjalan, data sekarang berada di tingkat "perunggu" (atau "data mentah") dari arsitektur Medallion dan harus dibersihkan. Langkah selanjutnya menyempurnakan data ke tingkat "perak", yang melibatkan pembersihan jenis data dan nama kolom dan menggunakan harapan DLT untuk memastikan integritas data.
Kode berikut menunjukkan cara melakukan ini dengan membersihkan dan memvalidasi data lapisan perunggu ke dalam tabel perak clickstream_clean
:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Mengembangkan lapisan "emas" dari struktur alur Anda, Anda memfilter data clickstream yang telah dibersihkan untuk mengisolasi entri di mana halaman rujukan adalah Apache_Spark
. Dalam contoh kode terakhir ini, pilihlah hanya kolom yang diperlukan untuk menulis ke tabel target sink.
Kode berikut menggambarkan cara membuat tabel yang disebut spark_referrers
mewakili lapisan emas:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Setelah proses persiapan data ini selesai, Anda harus mengatur wadah tujuan tempat catatan yang telah dibersihkan akan ditulis.
Mengonfigurasi penampung DLT
Databricks mendukung tiga jenis sink tujuan tempat Anda menulis rekaman yang diproses dari data streaming Anda:
- Wastafel Delta
- Pembuangan Apache Kafka
- Pengumpulan Data Azure Event Hubs
Di bawah ini adalah contoh konfigurasi untuk sink Delta, Kafka, dan Azure Event Hubs:
Delta tenggelam
Untuk membuat sink Delta berdasarkan jalur file:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Untuk membuat sink Delta berdasarkan nama tabel menggunakan katalog dan jalur skema yang lengkap:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Sink Kafka dan Azure Event Hubs
Kode ini berfungsi untuk kedua-dua sink Apache Kafka dan Azure Event Hubs.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Sekarang setelah sink Anda dikonfigurasi dan alur DLT Anda disiapkan, Anda dapat memulai streaming rekaman yang diproses ke sink.
Menulis ke sink DLT dengan aliran tambahan
Dengan sink Anda dikonfigurasi, langkah selanjutnya adalah menulis rekaman yang diproses ke dalamnya dengan menentukannya sebagai target untuk output rekaman oleh alur penambahan. Anda melakukan ini dengan menentukan sink Anda sebagai nilai target
di dekorator append_flow
.
- Untuk Tabel terkelola dan eksternal Unity Catalog, gunakan format
delta
dan tentukan jalur atau nama tabel dalam opsi. Alur DLT Anda harus dikonfigurasi untuk menggunakan Katalog Unity. - Untuk topik Apache Kafka, gunakan format
kafka
dan tentukan nama topik, informasi koneksi, dan informasi autentikasi dalam opsi. Ini adalah opsi yang sama yang didukung oleh sink Spark Structured Streaming Kafka. Lihat Mengonfigurasi penulis Streaming Terstruktur Kafka. - Untuk Azure Event Hubs, gunakan format
kafka
dan tentukan nama Event Hubs, informasi koneksi, dan informasi autentikasi dalam opsi. Ini adalah opsi yang sama yang didukung di sink Spark Structured Streaming Event Hubs yang menggunakan antarmuka Kafka. Lihat autentikasi Perwakilan Layanan dengan ID Microsoft Entra dan Azure Event Hubs. - Untuk tabel metastore Apache Hive, gunakan format
delta
dan tentukan jalur atau nama tabel dalam opsi. Alur DLT Anda harus dikonfigurasi untuk menggunakan metastore Hive.
Di bawah ini adalah contoh cara menyiapkan alur untuk menulis ke sink Delta, Kafka, dan Azure Event Hubs dengan rekaman yang diproses oleh alur DLT Anda.
Bak cuci Delta
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Sink Kafka dan Azure Event Hubs
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Parameter value
wajib untuk sink Azure Event Hubs. Parameter tambahan seperti key
, partition
, headers
, dan topic
bersifat opsional.
Untuk detail selengkapnya tentang dekorator append_flow
, lihat Menggunakan alur tambahan untuk menulis ke tabel streaming dari beberapa aliran sumber.
Batasan
Hanya API Python yang didukung. SQL tidak didukung.
Hanya kueri streaming yang menggunakan
spark.readStream
dandlt.read_stream
yang didukung. Kueri batch tidak didukung.Hanya
append_flow
yang dapat digunakan untuk menulis ke saluran keluaran. Alur lain, sepertiapply_changes
, tidak didukung, dan Anda tidak dapat menggunakan sink dalam definisi himpunan data DLT. Misalnya, berikut ini tidak didukung:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
Untuk sink Delta, nama tabel harus sepenuhnya memenuhi syarat. Secara khusus, untuk tabel eksternal yang dikelola oleh Katalog Unity, nama tabel harus memiliki bentuk
<catalog>.<schema>.<table>
. Untuk metastore hive, harus dalam bentuk<schema>.<table>
.Menjalankan
FullRefresh
tidak akan membersihkan data hasil komputasi sebelumnya di sink. Ini berarti bahwa setiap data yang diolah ulang akan ditambahkan ke sink, dan data yang ada tidak akan diubah.Ekspektasi DLT tidak didukung.
Sumber daya
- Mengembangkan alur DLT
- Memuat dan memproses data secara bertahap dengan alur DLT
- referensi API sink Python