Megosztás a következőn keresztül:


Az adatminőség kezelése adatrendszer-elvárásokkal

Az elvárásokkal olyan minőségi korlátozásokat alkalmazzon, amelyek érvényesítik az adatokat, amint azok az ETL folyamatokon áthaladnak. Az elvárások nagyobb betekintést nyújtanak az adatminőségi metrikákba, és lehetővé teszik a sikertelen frissítéseket vagy a rekordok elvetését az érvénytelen rekordok észlelésekor.

Ez a cikk áttekintést nyújt az elvárásokról, beleértve a szintaxisra vonatkozó példákat és a viselkedési beállításokat. A fejlettebb használati esetekért és ajánlott gyakorlatokért tekintse meg az elvárási ajánlásokat és a fejlett mintákat.

DLT-várakozások folyamatábra

Mik az elvárások?

Az elvárások nem kötelező záradékok a folyamat materializált nézetében, a streamelési táblában vagy a nézetlétrehozási utasításokban, amelyek adatminőség-ellenőrzést alkalmaznak a lekérdezésen áthaladó minden rekordon. Az elvárások szabványos SQL logikai utasításokat használnak a korlátozások megadásához. Egyetlen adathalmazra vonatkozó több elvárást is kombinálhat, és a folyamat összes adathalmaz-deklarációja esetében beállíthat elvárásokat.

A következő szakaszok egy elvárás három összetevőjét mutatják be, és szintaxisbeli példákat nyújtanak.

Várakozás neve

Minden várakozásnak rendelkeznie kell egy névvel, amelyet azonosítóként használnak a várakozás nyomon követéséhez és figyeléséhez. Válasszon egy nevet, amely közli az érvényesítendő metrikákat. Az alábbi példa meghatározza az valid_customer_age elvárást, hogy az életkor 0 és 120 év között legyen.

Fontos

A várakozási névnek egyedinek kell lennie egy adott adathalmazhoz. Több adathalmaz esetében a folyamatban újra felhasználhatja az elvárásokat. Lásd a hordozható és újrafelhasználható elvárásokat és.

Piton

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Kiértékelendő kényszer

A kényszer záradék egy SQL feltételes utasítás, amelyet minden rekord esetében igaz vagy hamis értékre kell kiértékelni. A kényszer tartalmazza az érvényesítés tényleges logikáját. Ha egy rekord nem felel meg ennek a feltételnek, a várakozás aktiválódik.

A kényszereknek érvényes SQL-szintaxist kell használniuk, és nem tartalmazhatják a következőket:

  • Egyéni Python-függvények
  • Külső szolgáltatáshívások
  • Más táblákra hivatkozó albekérdezések

Az alábbi példák olyan korlátozásokra mutatnak be példákat, amelyek hozzáadhatók az adathalmaz-létrehozási utasításokhoz:

Piton

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Érvénytelen bejegyzésen végrehajtott művelet

Meg kell adnia egy műveletet annak meghatározásához, hogy mi történik, ha egy rekord nem felel meg az érvényesítési ellenőrzésnek. Az alábbi táblázat az elérhető műveleteket ismerteti:

Akció SQL-szintaxis Python-szintaxis Eredmény
figyelmeztetés (alapértelmezett) EXPECT dlt.expect A rendszer érvénytelen rekordokat ír a célhoz. A rendszer az érvényes és érvénytelen rekordok számát más adathalmaz-metrikák mellett naplózza.
ejtés EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop A céladatbázisba írás előtt az érvénytelen rekordok törlésre kerülnek. Az elvetett rekordok száma más adathalmaz-metrikákkal együtt van naplózva.
sikertelen EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Érvénytelen rekordok megakadályozzák a frissítés sikerességét. Az újrafeldolgozás előtt manuális beavatkozásra van szükség. Ez az elvárás egyetlen folyamat meghibásodását okozza, és nem okozza a folyamat többi folyamatának meghiúsulását.

Speciális logikát is implementálhat az érvénytelen rekordok karanténba helyezéséhez anélkül, hogy az adatokat elveszítené vagy a folyamat leállna. Lásd érvénytelen rekordok karanténba helyezését.

Várakozáskövetési metrikák

A folyamat felhasználói felületén nyomon követési metrikákat láthat warn vagy drop műveletekhez. Mivel fail érvénytelen rekord észlelésekor a frissítés meghiúsul, a metrikák nem lesznek rögzítve.

A várakozási metrikák megtekintéséhez hajtsa végre a következő lépéseket:

  1. Kattintson az oldalsávon a DLT elemre.
  2. Kattintson a(z) nevű folyamatra.
  3. Kattintson egy definiált várakozással rendelkező adatkészletre.
  4. Válassza az Adatminőség lapot a jobb oldali oldalsávon.

Az adatminőségi metrikákat a DLT-eseménynapló lekérdezésével tekintheti meg. Lásd: Lekérdezési adatok minősége az eseménynaplóból.

Érvénytelen rekordok megőrzése

Az érvénytelen rekordok megőrzése az elvárások alapértelmezett viselkedése. A expect operátort akkor használja, ha olyan rekordokat szeretne megőrizni, amelyek megsértik az elvárást, de metrikák gyűjtésére kerül sor arról, hogy hány rekord felel meg vagy szeg meg egy korlátozást. A várakozást sértő rekordok hozzáadódnak a céladatkészlethez az érvényes rekordokkal együtt:

Piton

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Érvénytelen rekordok elvetése

Az érvénytelen rekordok további feldolgozásának megakadályozásához használja a expect_or_drop operátort. A várakozást sértő rekordokat a rendszer elveti a céladatkészletből:

Piton

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Hibázik érvénytelen rekordok esetén

Ha érvénytelen rekordok nem fogadhatók el, a expect_or_fail operátorral azonnal leállíthatja a végrehajtást, ha egy rekord érvényesítése sikertelen. Ha a művelet táblafrissítés, a rendszer atomi módon visszaállítja a tranzakciót:

Piton

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Fontos

Ha egy folyamatban több párhuzamos folyamat van definiálva, egyetlen folyamat meghibásodása nem okoz más folyamatok meghiúsulását.

DLT-folyamat hiba magyarázatának gráfja

Sikertelen frissítések hibaelhárítása az elvárásokkal szemben

Ha egy folyamat egy várakozási szabálysértés miatt meghiúsul, a folyamat újrafuttatása előtt ki kell javítania a folyamat kódját, hogy megfelelően kezelje az érvénytelen adatokat.

A sikertelen folyamatokra konfigurált elvárások módosítják az átalakítások Spark-lekérdezési tervét a szabálysértések észleléséhez és jelentéséhez szükséges információk nyomon követéséhez. Ezekkel az adatokkal megállapíthatja, hogy melyik bemeneti rekord eredményezte a sok lekérdezés megsértését. Az alábbiakban egy példa várakozást mutatunk be:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

Több elvárás kezelése

Jegyzet

Bár az SQL és a Python is több elvárást támogat egyetlen adatkészleten belül, csak a Python teszi lehetővé több különálló elvárás csoportosítását és kollektív műveletek megadását.

DLT több elvárással fLow graph

Több elvárást is csoportosíthat, és a függvények expect_all, expect_all_or_dropés expect_all_or_failhasználatával adhat meg kollektív műveleteket.

Ezek a dekorátorok argumentumként elfogadnak egy Python-szótárat, ahol a kulcs a várakozás neve, az érték pedig a várakozási kényszer. Ugyanazokat az elvárásokat újra felhasználhatja a folyamat több adathalmazában is. Az alábbi példák az egyes expect_all Python-operátorokra mutatnak be példákat:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset