Mengelola kualitas data dengan ekspektasi alur
Gunakan ekspektasi untuk menerapkan batasan kualitas yang memvalidasi data saat mengalir melalui alur ETL. Ekspektasi memberikan wawasan yang lebih besar tentang metrik kualitas data dan memungkinkan Anda untuk gagal memperbarui atau menghilangkan rekaman saat mendeteksi rekaman yang tidak valid.
Artikel ini menyediakan gambaran umum ekspektasi, termasuk contoh sintaks dan opsi perilaku. Untuk kasus penggunaan tingkat lanjut dan praktik terbaik yang direkomendasikan, lihat rekomendasi ekspektasi dan pola tingkat lanjut.
Apa itu ekspektasi?
Ekspektasi adalah klausul opsional dalam tampilan materialisasi jalur pengolahan, tabel streaming, atau pernyataan pembuatan tampilan yang menerapkan pemeriksaan kualitas data pada setiap catatan yang diproses oleh kueri. Ekspektasi menggunakan pernyataan SQL Boolean standar untuk menentukan batasan. Anda dapat menggabungkan beberapa harapan untuk satu himpunan data dan menetapkan harapan di semua deklarasi himpunan data dalam alur.
Bagian berikut memperkenalkan tiga komponen ekspektasi dan memberikan contoh sintaks.
Nama ekspektasi
Setiap harapan harus memiliki nama, yang digunakan sebagai pengidentifikasi untuk melacak dan memantau harapan. Pilih nama yang mengomunikasikan metrik yang sedang divalidasi. Contoh berikut mendefinisikan harapan valid_customer_age
untuk mengonfirmasi bahwa usia antara 0 dan 120 tahun:
Penting
Nama harapan harus unik untuk himpunan data tertentu. Anda dapat menggunakan kembali ekspektasi di beberapa himpunan data dalam proses. Lihat Ekspektasi portabel dan dapat digunakan kembali.
Python
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Batasan untuk mengevaluasi
Klausa batasan adalah pernyataan kondisional SQL yang harus dievaluasi ke true atau false untuk setiap rekaman. Batasan berisi logika aktual untuk apa yang sedang divalidasi. Ketika rekaman gagal memenuhi kondisi ini, ekspektasi diaktifkan.
Batasan harus menggunakan sintaks SQL yang valid dan tidak boleh berisi yang berikut ini:
- Fungsi Python kustom
- Panggilan layanan eksternal
- Subkueri yang mereferensikan tabel lain
Berikut ini adalah contoh batasan yang dapat ditambahkan ke pernyataan pembuatan himpunan data:
Python
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Tindakan pada rekaman yang tidak valid
Anda harus menentukan tindakan untuk menentukan apa yang terjadi ketika rekaman gagal dalam pemeriksaan validasi. Tabel berikut ini menjelaskan tindakan yang tersedia:
Perbuatan | Sintaks SQL | Sintaks Python | Hasil |
---|---|---|---|
memperingatkan (default) | EXPECT |
dlt.expect |
Rekaman yang tidak valid ditulis ke target. Jumlah rekaman yang valid dan tidak valid dicatat bersama metrik himpunan data lainnya. |
drop | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
Rekaman yang tidak valid dihilangkan sebelum data ditulis ke target. Jumlah rekaman yang dihilangkan dicatat bersama metrik himpunan data lainnya. |
gagal | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Rekaman yang tidak valid mencegah pembaruan berhasil. Intervensi manual diperlukan sebelum pemrosesan ulang. Ekspektasi ini menyebabkan kegagalan satu alur dan tidak menyebabkan alur lain di alur Anda gagal. |
Anda juga dapat menerapkan logika tingkat lanjut untuk mengkarantina rekaman yang tidak valid tanpa gagal atau menghilangkan data. Lihat Rekaman tidak valid yang harus dikarantina.
Metrik pelacakan ekspektasi
Anda dapat melihat metrik pelacakan untuk tindakan warn
atau drop
dari antarmuka pengguna alur kerja. Karena fail
menyebabkan pembaruan gagal ketika rekaman yang tidak valid terdeteksi, metrik tidak direkam.
Untuk melihat metrik ekspektasi, selesaikan langkah-langkah berikut:
- Klik DLT di bilah samping.
- Klik Nama alur Anda.
- Klik himpunan data dengan harapan yang ditentukan.
- Pilih tab kualitas data di bar samping kanan.
Anda dapat melihat metrik kualitas data dengan mengkueri log peristiwa DLT. Periksa kualitas data kueri dari log peristiwa .
Menyimpan rekaman yang tidak valid
Menyimpan rekaman yang tidak valid adalah perilaku default untuk ekspektasi. Gunakan operator expect
saat Anda ingin menyimpan rekaman yang melanggar harapan tetapi mengumpulkan metrik tentang berapa banyak rekaman yang lolos atau gagal batasan. Rekaman yang melanggar harapan ditambahkan ke himpunan data target bersama dengan rekaman yang valid:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Hapus rekaman yang tidak valid
Gunakan operator expect_or_drop
untuk mencegah pemrosesan rekaman yang tidak valid lebih lanjut. Rekaman yang melanggar harapan dihilangkan dari himpunan data target:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Gagal pada rekaman yang tidak valid
Ketika rekaman yang tidak valid tidak dapat diterima, gunakan operator expect_or_fail
untuk segera menghentikan eksekusi ketika rekaman gagal validasi. Jika operasi adalah pembaruan tabel, sistem secara atomik membatalkan transaksi.
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Penting
Jika Anda memiliki beberapa alur paralel yang ditentukan dalam sebuah pipa, kegagalan satu alur tidak menyebabkan alur lainnya gagal.
Pemecahan masalah pembaruan yang gagal dari ekspektasi
Ketika alur gagal karena pelanggaran ekspektasi, Anda harus memperbaiki kode alur untuk menangani data yang tidak valid dengan benar sebelum menjalankan kembali alur.
Ekspektasi yang dikonfigurasi untuk pipeline yang gagal memodifikasi rencana kueri Spark pada transformasi Anda untuk melacak informasi yang diperlukan untuk mendeteksi dan melaporkan pelanggaran. Anda dapat menggunakan informasi ini untuk mengidentifikasi rekaman input mana yang mengakibatkan pelanggaran untuk berbagai kueri. Berikut ini adalah contoh harapan:
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
Manajemen beberapa ekspektasi
Nota
Meskipun SQL dan Python mendukung beberapa harapan dalam satu himpunan data, hanya Python yang memungkinkan Anda mengelompokkan beberapa harapan terpisah bersama-sama dan menentukan tindakan kolektif.
Anda dapat mengelompokkan beberapa harapan bersama-sama dan menentukan tindakan kolektif menggunakan fungsi expect_all
, expect_all_or_drop
, dan expect_all_or_fail
.
Dekorator ini menerima kamus Python sebagai argumen, di mana kuncinya adalah nama harapan dan nilainya adalah batasan ekspektasi. Anda dapat menggunakan kembali serangkaian ekspektasi yang sama dalam beberapa himpunan data di alur Anda. Berikut ini menunjukkan contoh masing-masing operator Python expect_all
:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset