Mengembangkan kode alur dengan SQL
DLT memperkenalkan beberapa kata kunci dan fungsi SQL baru untuk mendefinisikan tampilan materialisasi dan tabel streaming dalam alur. Dukungan SQL untuk mengembangkan alur dibangun berdasarkan dasar-dasar Spark SQL dan menambahkan dukungan untuk fungsionalitas Streaming Terstruktur.
Pengguna yang terbiasa dengan PySpark DataFrames mungkin lebih suka mengembangkan kode alur dengan Python. Python mendukung pengujian dan operasi yang lebih luas yang menantang untuk diterapkan dengan SQL, seperti operasi metaprogram. Lihat Mengembangkan kode alur dengan Python.
Untuk referensi lengkap sintaks DLT SQL, lihat referensi bahasa DLT SQL.
Dasar-dasar SQL untuk pengembangan alur
Kode SQL yang membuat himpunan data DLT menggunakan sintaks CREATE OR REFRESH
untuk menentukan tampilan materialisasi dan tabel streaming terhadap hasil kueri.
Kata kunci STREAM
menunjukkan apakah sumber data yang dirujuk dalam klausa SELECT
harus dibaca dengan semantik streaming.
Aksi membaca dan menulis secara default dilakukan pada katalog dan skema yang ditentukan selama konfigurasi alur. Lihat Atur katalog dan skema target.
Kode sumber DLT sangat berbeda dari skrip SQL: DLT mengevaluasi semua definisi himpunan data di semua file kode sumber yang dikonfigurasi dalam alur dan membangun grafik aliran data sebelum kueri apa pun dijalankan. Urutan kueri yang muncul di buku catatan atau skrip menentukan urutan evaluasi kode, tetapi bukan urutan eksekusi kueri.
Membuat tampilan materialisasi dengan SQL
Contoh kode berikut menunjukkan sintaks dasar untuk membuat tampilan materialisasi dengan SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Membuat tabel streaming dengan SQL
Contoh kode berikut menunjukkan sintaks dasar untuk membuat tabel streaming dengan SQL:
Nota
Tidak semua sumber data mendukung pembacaan streaming, dan beberapa sumber data harus selalu diproses dengan semantik streaming.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Memuat data dari penyimpanan objek
DLT mendukung pemuatan data dari semua format yang didukung oleh Azure Databricks. Lihat opsi format data .
Nota
Contoh-contoh ini menggunakan data yang tersedia di bawah /databricks-datasets
yang secara otomatis dipasang ke ruang kerja Anda. Databricks merekomendasikan penggunaan jalur volume atau URI cloud untuk mereferensikan data yang disimpan dalam penyimpanan objek cloud. Lihat Apa yang dimaksud dengan volume Katalog Unity?.
Databricks merekomendasikan penggunaan Auto Loader dan tabel streaming saat mengonfigurasi beban kerja penyerapan inkremental terhadap data yang disimpan di penyimpanan objek cloud. Lihat Apa itu Auto Loader?.
SQL menggunakan fungsi read_files
untuk memanggil fungsionalitas Auto Loader. Anda juga harus menggunakan kata kunci STREAM
untuk mengonfigurasi pembacaan streaming dengan read_files
.
Contoh berikut membuat tabel streaming dari file JSON menggunakan Auto Loader:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Fungsi read_files
juga mendukung semantik batch untuk membuat tampilan materialisasi. Contoh berikut menggunakan semantik batch untuk membaca direktori JSON dan membuat tampilan materialisasi:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Memvalidasi data sesuai dengan ekspetasi
Anda dapat menggunakan ekspektasi untuk mengatur dan menerapkan batasan kualitas data. Lihat Mengelola kualitas data dengan ekspektasi alur.
Kode berikut mendefinisikan ekspektasi bernama valid_data
yang menghilangkan rekaman yang null selama penyerapan data:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Kueri tampilan materialisasi dan tabel streaming yang ditentukan dalam alur Anda
Contoh berikut mendefinisikan empat himpunan data:
- Tabel streaming bernama
orders
yang memuat data JSON. - Tampilan materialisasi bernama
customers
yang memuat data CSV. - Tampilan materialisasi bernama
customer_orders
yang menggabungkan rekaman dari himpunan dataorders
dancustomers
, mengonversi tanda waktu pesanan menjadi tanggal, dan memilih bidangcustomer_id
,order_number
,state
, danorder_date
. - Tampilan materialisasi bernama
daily_orders_by_state
yang menggabungkan jumlah pesanan harian untuk setiap negara bagian.
Nota
Saat mengkueri tampilan atau tabel di alur, Anda dapat menentukan katalog dan skema secara langsung, atau Anda bisa menggunakan default yang dikonfigurasi di alur Anda. Dalam contoh ini, tabel orders
, customers
, dan customer_orders
ditulis dan dibaca dari katalog default dan skema yang dikonfigurasi untuk alur Anda.
Mode penerbitan lama menggunakan skema LIVE
untuk menjalankan kueri pada tampilan materialisasi lainnya dan tabel streaming yang ditentukan dalam pipeline Anda. Dalam pipeline baru, sintaks skema LIVE
diabaikan tanpa disadari. Lihat skema LANGSUNG (warisan) .
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;