Memuat data dengan DLT
Anda dapat memuat data dari sumber data apa pun yang didukung oleh Apache Spark di Azure Databricks menggunakan DLT. Anda dapat menentukan himpunan data (tabel dan tampilan) di DLT terhadap kueri apa pun yang mengembalikan Spark DataFrame, termasuk Streaming DataFrame dan Panda untuk Spark DataFrames. Untuk tugas penyerapan data, Databricks merekomendasikan penggunaan tabel streaming untuk sebagian besar kasus penggunaan. Tabel streaming baik untuk menyerap data dari penyimpanan objek cloud menggunakan Auto Loader atau dari bus pesan seperti Kafka. Contoh di bawah ini menunjukkan beberapa pola umum.
Penting
Tidak semua sumber data memiliki dukungan SQL. Anda dapat mencampur notebook SQL dan Python dalam alur DLT untuk menggunakan SQL untuk semua operasi di luar penyerapan.
Untuk detail tentang bekerja dengan pustaka yang tidak dikemas di DLT secara default, lihat Mengelola dependensi Python untuk alur DLT.
Memuat file dari penyimpanan objek cloud
Databricks merekomendasikan penggunaan Auto Loader dengan DLT untuk sebagian besar tugas penyerapan data dari penyimpanan objek cloud. Auto Loader dan DLT dirancang untuk memuat data yang terus bertambah dengan cara bertahap dan secara idempoten saat tiba di penyimpanan cloud. Contoh berikut menggunakan Auto Loader untuk membuat himpunan data dari file CSV dan JSON:
Nota
Untuk memuat file dengan Auto Loader dalam alur yang mengaktifkan Katalog Unity, Anda harus menggunakan lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Unity Catalog dengan DLT, lihat Menggunakan Unity Catalog dengan alur DLT Anda.
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
Lihat Apa itu Auto Loader? dan sintaks SQL Auto Loader.
Peringatan
Jika Anda menggunakan Auto Loader dengan pemberitahuan file dan menjalankan refresh penuh untuk alur atau tabel streaming, Anda harus membersihkan sumber daya Anda secara manual. Anda dapat menggunakan CloudFilesResourceManager di notebook untuk melakukan pembersihan.
Memuat data dari sistem bus pesan
Anda dapat mengonfigurasi alur DLT untuk menyerap data dari bus pesan dengan tabel streaming. Databricks merekomendasikan untuk menggabungkan tabel streaming dengan eksekusi berkelanjutan dan penskalaan otomatis yang ditingkatkan untuk memberikan proses penyerapan yang paling efisien untuk pemuatan dengan latensi rendah dari bus pesan. Lihat Mengoptimalkan pemanfaatan kluster alur DLT dengan penskalaan otomatis yang ditingkatkan.
Misalnya, kode berikut mengonfigurasi tabel streaming untuk menyerap data dari Kafka:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
Anda dapat menulis operasi hilir di SQL murni untuk melakukan transformasi streaming pada data ini, seperti dalam contoh berikut:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(kafka_raw)
WHERE ...
Untuk contoh bekerja dengan Azure Event Hubs, lihat Menggunakan Azure Event Hubs sebagai sumber data DLT.
Lihat bagian tentang mengonfigurasi sumber data streaming.
Memuat data dari sistem eksternal
DLT mendukung pemuatan data dari sumber data apa pun yang didukung oleh Azure Databricks. Lihat Menyambungkan ke sumber data. Anda juga dapat memuat data eksternal menggunakan Federasi Lakehouse untuk sumber data yang didukung. Karena Federasi Lakehouse memerlukan Databricks Runtime 13.3 LTS atau lebih tinggi, untuk menggunakan Federasi Lakehouse, alur Anda harus dikonfigurasi untuk menggunakan saluran pratinjau .
Beberapa sumber data tidak memiliki dukungan yang setara di SQL. Jika Anda tidak dapat menggunakan Federasi Lakehouse dengan salah satu sumber data ini, Anda bisa menggunakan buku catatan Python untuk menyerap data dari sumbernya. Anda dapat menambahkan kode sumber Python dan SQL ke alur DLT yang sama. Contoh berikut mendeklarasikan tampilan materialisasi untuk mengakses status data saat ini dalam tabel PostgreSQL jarak jauh:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Memuat himpunan data kecil atau statis dari penyimpanan objek cloud
Anda dapat memuat himpunan data kecil atau statis menggunakan sintaks beban Apache Spark. DLT mendukung semua format file yang didukung oleh Apache Spark di Azure Databricks. Untuk daftar lengkapnya, lihat Opsi format data .
Contoh berikut menunjukkan pemuatan JSON untuk membuat tabel DLT:
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Nota
Konstruksi SQL SELECT * FROM format.`path`;
umum untuk semua lingkungan SQL di Azure Databricks. Ini adalah pola yang direkomendasikan untuk akses file langsung menggunakan SQL dengan DLT.
Akses kredensial penyimpanan dengan aman menggunakan rahasia dalam alur kerja
Anda dapat menggunakan Azure Databricks rahasia untuk menyimpan kredensial seperti kunci akses atau kata sandi. Untuk mengonfigurasi kata sandi dalam alur Anda, gunakan properti Spark dalam konfigurasi kluster pengaturan alur. Lihat Atur komputasi untuk jalur DLT.
Contoh berikut menggunakan rahasia untuk menyimpan kunci akses yang diperlukan untuk membaca data input dari akun penyimpanan Azure Data Lake Storage Gen2 (ADLS Gen2) menggunakan Auto Loader. Anda dapat menggunakan metode yang sama ini untuk mengonfigurasi rahasia apa pun yang diperlukan oleh alur Anda, misalnya, kunci AWS untuk mengakses S3, atau kata sandi ke metastore Apache Hive.
Untuk informasi lebih lanjut tentang penggunaan Azure Data Lake Storage Gen2, lihat Menyambungkan ke Azure Data Lake Storage Gen2 dan Blob Storage.
Nota
Anda harus menambahkan awalan spark.hadoop.
ke kunci konfigurasi spark_conf
yang mengatur nilai rahasia.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Mengganti
-
<storage-account-name>
dengan nama akun penyimpanan ADLS Gen2. -
<scope-name>
sebagai nama cakupan rahasia untuk Azure Databricks. -
<secret-name>
dengan nama kunci yang berisi kunci akses akun penyimpanan Azure.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Mengganti
-
<container-name>
dengan nama kontainer akun penyimpanan Azure yang menyimpan data input. -
<storage-account-name>
dengan nama akun penyimpanan ADLS Gen2. -
<path-to-input-dataset>
dengan jalur ke himpunan data input.
Memuat data dari Azure Event Hubs
Azure Event Hubs adalah layanan streaming data yang menyediakan antarmuka yang kompatibel dengan Apache Kafka. Anda dapat menggunakan konektor Kafka Streaming Terstruktur, yang disertakan dalam runtime DLT, untuk memuat pesan dari Azure Event Hubs. Untuk mempelajari selengkapnya tentang memuat dan memproses pesan dari Azure Event Hubs, lihat Menggunakan Azure Event Hubs sebagai sumber data DLT.