Az adatminőség kezelése adatrendszer-elvárásokkal
Az elvárásokkal olyan minőségi korlátozásokat alkalmazzon, amelyek érvényesítik az adatokat, amint azok az ETL folyamatokon áthaladnak. Az elvárások nagyobb betekintést nyújtanak az adatminőségi metrikákba, és lehetővé teszik a sikertelen frissítéseket vagy a rekordok elvetését az érvénytelen rekordok észlelésekor.
Ez a cikk áttekintést nyújt az elvárásokról, beleértve a szintaxisra vonatkozó példákat és a viselkedési beállításokat. A fejlettebb használati esetekért és ajánlott gyakorlatokért tekintse meg az elvárási ajánlásokat és a fejlett mintákat.
Mik az elvárások?
Az elvárások nem kötelező záradékok a folyamat materializált nézetében, a streamelési táblában vagy a nézetlétrehozási utasításokban, amelyek adatminőség-ellenőrzést alkalmaznak a lekérdezésen áthaladó minden rekordon. Az elvárások szabványos SQL logikai utasításokat használnak a korlátozások megadásához. Egyetlen adathalmazra vonatkozó több elvárást is kombinálhat, és a folyamat összes adathalmaz-deklarációja esetében beállíthat elvárásokat.
A következő szakaszok egy elvárás három összetevőjét mutatják be, és szintaxisbeli példákat nyújtanak.
Várakozás neve
Minden várakozásnak rendelkeznie kell egy névvel, amelyet azonosítóként használnak a várakozás nyomon követéséhez és figyeléséhez. Válasszon egy nevet, amely közli az érvényesítendő metrikákat. Az alábbi példa meghatározza az valid_customer_age
elvárást, hogy az életkor 0 és 120 év között legyen.
Fontos
A várakozási névnek egyedinek kell lennie egy adott adathalmazhoz. Több adathalmaz esetében a folyamatban újra felhasználhatja az elvárásokat. Lásd a hordozható és újrafelhasználható elvárásokat és.
Piton
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Kiértékelendő kényszer
A kényszer záradék egy SQL feltételes utasítás, amelyet minden rekord esetében igaz vagy hamis értékre kell kiértékelni. A kényszer tartalmazza az érvényesítés tényleges logikáját. Ha egy rekord nem felel meg ennek a feltételnek, a várakozás aktiválódik.
A kényszereknek érvényes SQL-szintaxist kell használniuk, és nem tartalmazhatják a következőket:
- Egyéni Python-függvények
- Külső szolgáltatáshívások
- Más táblákra hivatkozó albekérdezések
Az alábbi példák olyan korlátozásokra mutatnak be példákat, amelyek hozzáadhatók az adathalmaz-létrehozási utasításokhoz:
Piton
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Érvénytelen bejegyzésen végrehajtott művelet
Meg kell adnia egy műveletet annak meghatározásához, hogy mi történik, ha egy rekord nem felel meg az érvényesítési ellenőrzésnek. Az alábbi táblázat az elérhető műveleteket ismerteti:
Akció | SQL-szintaxis | Python-szintaxis | Eredmény |
---|---|---|---|
figyelmeztetés (alapértelmezett) | EXPECT |
dlt.expect |
A rendszer érvénytelen rekordokat ír a célhoz. A rendszer az érvényes és érvénytelen rekordok számát más adathalmaz-metrikák mellett naplózza. |
ejtés | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
A céladatbázisba írás előtt az érvénytelen rekordok törlésre kerülnek. Az elvetett rekordok száma más adathalmaz-metrikákkal együtt van naplózva. |
sikertelen | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Érvénytelen rekordok megakadályozzák a frissítés sikerességét. Az újrafeldolgozás előtt manuális beavatkozásra van szükség. Ez az elvárás egyetlen folyamat meghibásodását okozza, és nem okozza a folyamat többi folyamatának meghiúsulását. |
Speciális logikát is implementálhat az érvénytelen rekordok karanténba helyezéséhez anélkül, hogy az adatokat elveszítené vagy a folyamat leállna. Lásd érvénytelen rekordok karanténba helyezését.
Várakozáskövetési metrikák
A folyamat felhasználói felületén nyomon követési metrikákat láthat warn
vagy drop
műveletekhez. Mivel fail
érvénytelen rekord észlelésekor a frissítés meghiúsul, a metrikák nem lesznek rögzítve.
A várakozási metrikák megtekintéséhez hajtsa végre a következő lépéseket:
- Kattintson az oldalsávon a DLT elemre.
- Kattintson a(z) nevű folyamatra.
- Kattintson egy definiált várakozással rendelkező adatkészletre.
- Válassza az Adatminőség lapot a jobb oldali oldalsávon.
Az adatminőségi metrikákat a DLT-eseménynapló lekérdezésével tekintheti meg. Lásd: Lekérdezési adatok minősége az eseménynaplóból.
Érvénytelen rekordok megőrzése
Az érvénytelen rekordok megőrzése az elvárások alapértelmezett viselkedése. A expect
operátort akkor használja, ha olyan rekordokat szeretne megőrizni, amelyek megsértik az elvárást, de metrikák gyűjtésére kerül sor arról, hogy hány rekord felel meg vagy szeg meg egy korlátozást. A várakozást sértő rekordok hozzáadódnak a céladatkészlethez az érvényes rekordokkal együtt:
Piton
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Érvénytelen rekordok elvetése
Az érvénytelen rekordok további feldolgozásának megakadályozásához használja a expect_or_drop
operátort. A várakozást sértő rekordokat a rendszer elveti a céladatkészletből:
Piton
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Hibázik érvénytelen rekordok esetén
Ha érvénytelen rekordok nem fogadhatók el, a expect_or_fail
operátorral azonnal leállíthatja a végrehajtást, ha egy rekord érvényesítése sikertelen. Ha a művelet táblafrissítés, a rendszer atomi módon visszaállítja a tranzakciót:
Piton
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Fontos
Ha egy folyamatban több párhuzamos folyamat van definiálva, egyetlen folyamat meghibásodása nem okoz más folyamatok meghiúsulását.
Sikertelen frissítések hibaelhárítása az elvárásokkal szemben
Ha egy folyamat egy várakozási szabálysértés miatt meghiúsul, a folyamat újrafuttatása előtt ki kell javítania a folyamat kódját, hogy megfelelően kezelje az érvénytelen adatokat.
A sikertelen folyamatokra konfigurált elvárások módosítják az átalakítások Spark-lekérdezési tervét a szabálysértések észleléséhez és jelentéséhez szükséges információk nyomon követéséhez. Ezekkel az adatokkal megállapíthatja, hogy melyik bemeneti rekord eredményezte a sok lekérdezés megsértését. Az alábbiakban egy példa várakozást mutatunk be:
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
Több elvárás kezelése
Jegyzet
Bár az SQL és a Python is több elvárást támogat egyetlen adatkészleten belül, csak a Python teszi lehetővé több különálló elvárás csoportosítását és kollektív műveletek megadását.
Több elvárást is csoportosíthat, és a függvények expect_all
, expect_all_or_drop
és expect_all_or_fail
használatával adhat meg kollektív műveleteket.
Ezek a dekorátorok argumentumként elfogadnak egy Python-szótárat, ahol a kulcs a várakozás neve, az érték pedig a várakozási kényszer. Ugyanazokat az elvárásokat újra felhasználhatja a folyamat több adathalmazában is. Az alábbi példák az egyes expect_all
Python-operátorokra mutatnak be példákat:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset