Bagikan melalui


Mengembangkan kode alur dengan SQL

DLT memperkenalkan beberapa kata kunci dan fungsi SQL baru untuk mendefinisikan tampilan materialisasi dan tabel streaming dalam alur. Dukungan SQL untuk mengembangkan alur dibangun berdasarkan dasar-dasar Spark SQL dan menambahkan dukungan untuk fungsionalitas Streaming Terstruktur.

Pengguna yang terbiasa dengan PySpark DataFrames mungkin lebih suka mengembangkan kode alur dengan Python. Python mendukung pengujian dan operasi yang lebih luas yang menantang untuk diterapkan dengan SQL, seperti operasi metaprogram. Lihat Mengembangkan kode alur dengan Python.

Untuk referensi lengkap sintaks DLT SQL, lihat referensi bahasa DLT SQL.

Dasar-dasar SQL untuk pengembangan alur

Kode SQL yang membuat himpunan data DLT menggunakan sintaks CREATE OR REFRESH untuk menentukan tampilan materialisasi dan tabel streaming terhadap hasil kueri.

Kata kunci STREAM menunjukkan apakah sumber data yang dirujuk dalam klausa SELECT harus dibaca dengan semantik streaming.

Aksi membaca dan menulis secara default dilakukan pada katalog dan skema yang ditentukan selama konfigurasi alur. Lihat Atur katalog dan skema target.

Kode sumber DLT sangat berbeda dari skrip SQL: DLT mengevaluasi semua definisi himpunan data di semua file kode sumber yang dikonfigurasi dalam alur dan membangun grafik aliran data sebelum kueri apa pun dijalankan. Urutan kueri yang muncul di buku catatan atau skrip menentukan urutan evaluasi kode, tetapi bukan urutan eksekusi kueri.

Membuat tampilan materialisasi dengan SQL

Contoh kode berikut menunjukkan sintaks dasar untuk membuat tampilan materialisasi dengan SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Membuat tabel streaming dengan SQL

Contoh kode berikut menunjukkan sintaks dasar untuk membuat tabel streaming dengan SQL:

Nota

Tidak semua sumber data mendukung pembacaan streaming, dan beberapa sumber data harus selalu diproses dengan semantik streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Memuat data dari penyimpanan objek

DLT mendukung pemuatan data dari semua format yang didukung oleh Azure Databricks. Lihat opsi format data .

Nota

Contoh-contoh ini menggunakan data yang tersedia di bawah /databricks-datasets yang secara otomatis dipasang ke ruang kerja Anda. Databricks merekomendasikan penggunaan jalur volume atau URI cloud untuk mereferensikan data yang disimpan dalam penyimpanan objek cloud. Lihat Apa yang dimaksud dengan volume Katalog Unity?.

Databricks merekomendasikan penggunaan Auto Loader dan tabel streaming saat mengonfigurasi beban kerja penyerapan inkremental terhadap data yang disimpan di penyimpanan objek cloud. Lihat Apa itu Auto Loader?.

SQL menggunakan fungsi read_files untuk memanggil fungsionalitas Auto Loader. Anda juga harus menggunakan kata kunci STREAM untuk mengonfigurasi pembacaan streaming dengan read_files.

Contoh berikut membuat tabel streaming dari file JSON menggunakan Auto Loader:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Fungsi read_files juga mendukung semantik batch untuk membuat tampilan materialisasi. Contoh berikut menggunakan semantik batch untuk membaca direktori JSON dan membuat tampilan materialisasi:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Memvalidasi data sesuai dengan ekspetasi

Anda dapat menggunakan ekspektasi untuk mengatur dan menerapkan batasan kualitas data. Lihat Mengelola kualitas data dengan ekspektasi alur.

Kode berikut mendefinisikan ekspektasi bernama valid_data yang menghilangkan rekaman yang null selama penyerapan data:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Kueri tampilan materialisasi dan tabel streaming yang ditentukan dalam alur Anda

Contoh berikut mendefinisikan empat himpunan data:

  • Tabel streaming bernama orders yang memuat data JSON.
  • Tampilan materialisasi bernama customers yang memuat data CSV.
  • Tampilan materialisasi bernama customer_orders yang menggabungkan rekaman dari himpunan data orders dan customers, mengonversi tanda waktu pesanan menjadi tanggal, dan memilih bidang customer_id, order_number, state, dan order_date.
  • Tampilan materialisasi bernama daily_orders_by_state yang menggabungkan jumlah pesanan harian untuk setiap negara bagian.

Nota

Saat mengkueri tampilan atau tabel di alur, Anda dapat menentukan katalog dan skema secara langsung, atau Anda bisa menggunakan default yang dikonfigurasi di alur Anda. Dalam contoh ini, tabel orders, customers, dan customer_orders ditulis dan dibaca dari katalog default dan skema yang dikonfigurasi untuk alur Anda.

Mode penerbitan lama menggunakan skema LIVE untuk menjalankan kueri pada tampilan materialisasi lainnya dan tabel streaming yang ditentukan dalam pipeline Anda. Dalam pipeline baru, sintaks skema LIVE diabaikan tanpa disadari. Lihat skema LANGSUNG (warisan) .

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;