İşlem hattı beklentileriyle veri kalitesini yönetme
ETL işlem hatlarında akan verileri doğrulayan kalite kısıtlamaları uygulamak için beklentileri kullanın. Beklentiler, veri kalitesi ölçümleri hakkında daha fazla içgörü sağlar ve geçersiz kayıtları algılarken güncelleştirmelerde başarısız olmanıza veya kayıtları bırakmanıza olanak sağlar.
Bu makalede, söz dizimi örnekleri ve davranış seçenekleri de dahil olmak üzere beklentilere genel bir bakış sağlanır. Daha gelişmiş kullanım örnekleri ve önerilen en iyi yöntemler için bkz. Beklenti önerileri ve gelişmiş desenler.
Beklentiler nelerdir?
Beklentiler, sorgudan geçen her kayda veri kalitesi denetimleri uygulayan, işlem hattı üzerinden gerçekleştirilen maddi görünümler, akış tabloları ve görünüm oluşturma deyimlerinde isteğe bağlı şartlardır. Beklentiler, kısıtlamaları belirtmek için standart SQL Boole deyimlerini kullanır. Tek bir veri kümesi için birden çok beklentiyi birleştirebilir ve bir işlem hattındaki tüm veri kümesi bildirimlerinde beklentileri ayarlayabilirsiniz.
Aşağıdaki bölümlerde, bir beklentinin üç bileşeni tanıtılarak söz dizimi örnekleri sağlanır.
Beklenti adı
Her beklentinin, takip ve izleme için kullanılan bir tanımlayıcı olan bir adı olmalıdır. Doğrulanan ölçümleri bildiren bir ad seçin. Aşağıdaki örnek, yaş değerinin 0 ile 120 arasında olduğunu onaylamak için beklenti valid_customer_age
tanımlar:
Önemli
Belirli bir veri kümesi için bir beklenti adı benzersiz olmalıdır. Bir işlem hattındaki birden çok veri kümesinde beklentileri yeniden kullanabilirsiniz. bkz. Taşınabilir ve yeniden kullanılabilir beklentiler.
Piton
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Değerlendirme kısıtlaması
Constraint yan tümcesi, her kayıt için doğru veya yanlış olarak değerlendirilmesi gereken bir SQL koşullu deyimidir. Kısıtlama, doğrulanan şeyin gerçek mantığını içerir. Bir kayıt bu koşulda başarısız olduğunda, beklenen durum tetiklenir.
Kısıtlamalar geçerli SQL söz dizimini kullanmalıdır ve aşağıdakileri içeremez:
- Özel Python işlevleri
- Dış hizmet çağrıları
- Diğer tablolara başvuran alt sorgular
Aşağıda, veri kümesi oluşturma deyimlerine eklenebilen kısıtlamaların örnekleri verilmiştir:
Piton
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Geçersiz kayıt üzerinde eylem
Kayıt doğrulama denetiminde başarısız olduğunda ne olacağını belirlemek için bir eylem belirtmeniz gerekir. Aşağıdaki tabloda kullanılabilir eylemler açıklanmaktadır:
Eylem | SQL söz dizimi | Python söz dizimi | Sonuç |
---|---|---|---|
uyarı (varsayılan) | EXPECT |
dlt.expect |
Hedefe geçersiz kayıtlar yazılır. Geçerli ve geçersiz kayıtların sayısı diğer veri kümesi ölçümleriyle birlikte günlüğe kaydedilir. |
bırak | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
Veriler hedefe yazılmadan önce geçersiz kayıtlar atılır. Bırakılan kayıtların sayısı diğer veri kümesi ölçümleriyle birlikte günlüğe kaydedilir. |
başarısız | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Geçersiz kayıtlar güncelleştirmenin başarılı olmasını engelliyor. Yeniden işlemeden önce el ile müdahale gerekir. Bu beklenti tek bir akışın başarısız olmasına neden olur ve işlem hattınızdaki diğer akışların başarısız olmasına neden olmaz. |
Ayrıca, veri kaybetmeden veya süreci aksatmadan geçersiz kayıtları karantinaya almak için gelişmiş mantık uygulayabilirsiniz. geçersiz kayıtları karantinaya alınbakın.
Beklentiler izleme ölçümleri
İşlem hattı kullanıcı arabiriminden warn
veya drop
eylemleri için izleme ölçümlerini görebilirsiniz.
fail
geçersiz bir kayıt algılandığında güncelleştirmenin başarısız olmasına neden olduğundan ölçümler kaydedilmez.
Beklenti ölçümlerini görüntülemek için aşağıdaki adımları tamamlayın:
- Kenar çubuğunda DLT tıklayın.
- İşlem hattınızın Adı üzerine tıklayın.
- Beklentileri tanımlanmış bir veri kümesine tıklayın.
- Sağ kenar çubuğunda Veri kalitesi sekmesini seçin.
DLT olay günlüğünü sorgulayarak veri kalitesi ölçümlerini görüntüleyebilirsiniz. olay günlüğündenSorgu veri kalitesi bölümüne bakın.
Geçersiz kayıtları tutma
Beklentilerin varsayılan davranışı geçersiz kayıtların korunmasıdır. Beklentiyi ihlal eden kayıtları tutmak ancak kısıtlamayı geçen veya başarısız olan kayıtların sayısını toplamak istediğinizde expect
işlecini kullanın. Beklentiyi ihlal eden kayıtlar, geçerli kayıtlarla birlikte hedef veri kümesine eklenir:
Piton
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Geçersiz kayıtları bırakma
Geçersiz kayıtların daha fazla işlenmesini önlemek için expect_or_drop
işlecini kullanın. Beklentiyi ihlal eden kayıtlar hedef veri kümesinden bırakılır:
Piton
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Geçersiz kayıtlarda başarısız ol
Geçersiz kayıtlar kabul edilemez olduğunda, expect_or_fail
işlecini kullanarak kayıt doğrulama başarısız olduğunda yürütmeyi hemen durdurun. İşlem bir tablo güncelleştirmesiyse, sistem atomik olarak işlemi geri alır:
Piton
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Önemli
İşlem hattında tanımlanmış birden çok paralel akışınız varsa, tek bir akışın başarısız olması diğer akışların başarısız olmasına neden olmaz.
Beklenen başarısız güncelleştirmelerle ilgili sorunları giderme
İşlem hattı, bir beklenti ihlali nedeniyle başarısız olduğunda, işlem hattını yeniden çalıştırmadan önce geçersiz verileri doğru şekilde işlemek için işlem hattı kodunu düzeltmeniz gerekir.
İşlem hatlarının başarısız olması için yapılandırılan beklentiler, ihlalleri algılamak ve bildirmek için gereken bilgileri izlemek için dönüşümlerinizin Spark sorgu planını değiştirir. Bu bilgileri hangi giriş kaydının birçok sorgu için ihlale neden olduğunu belirlemek için kullanabilirsiniz. Aşağıda örnek bir beklenti verilmiştir:
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
Birden çok beklenti yönetimi
Not
Hem SQL hem de Python tek bir veri kümesinde birden çok beklentiyi desteklese de, yalnızca Python birden çok ayrı beklentiyi birlikte gruplandırmanıza ve toplu eylemleri belirtmenize olanak tanır.
DLT
Birden çok beklentiyi birlikte gruplandırabilir ve expect_all
, expect_all_or_drop
ve expect_all_or_fail
işlevlerini kullanarak toplu eylemleri belirtebilirsiniz.
Bu dekoratörler bağımsız değişken olarak bir Python sözlüğü kabul eder; burada anahtar, beklenti adı ve değer de beklenti kısıtlamasıdır. İşlem hattınızdaki birden çok veri kümesinde aynı beklenti kümesini yeniden kullanabilirsiniz. Aşağıda, expect_all
Python işleçlerinin her birinin örnekleri gösterilmektedir:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset