Dela via


Förväntningarsrekommendationer och avancerade mönster

Den här artikeln innehåller rekommendationer för att implementera förväntningar i stor skala och exempel på avancerade mönster som stöds av förväntningar. Dessa mönster använder flera datauppsättningar tillsammans med förväntningar och kräver att användarna förstår syntaxen och semantiken för materialiserade vyer, strömmande tabeller och förväntningar.

En översikt över grundläggande förväntningar och syntax finns i Hantera datakvalitet med pipelineförväntningar.

bärbara och återanvändbara förväntningar

Databricks rekommenderar följande metodtips när du implementerar förväntningar för att förbättra portabiliteten och minska underhållsbelastningarna:

Rekommendation Effekt
Lagra förväntansdefinitioner separat från pipelinelogik. Tillämpa enkelt förväntningar på flera datauppsättningar eller pipelines. Uppdatera, granska och underhålla förväntningar utan att ändra pipeline-källkoden.
Lägg till anpassade taggar för att skapa grupper med relaterade förväntningar. Filtrera förväntningar baserat på taggar.
Tillämpa förväntningar konsekvent över liknande datauppsättningar. Använd samma förväntningar i flera datauppsättningar och pipelines för att utvärdera identisk logik.

Följande exempel visar hur du använder en Delta-tabell eller ordlista för att skapa en central lagringsplats för förväntningar. Anpassade Python-funktioner tillämpar sedan dessa förväntningar på datauppsättningar i en exempelpipeline:

Deltatabell

I följande exempel skapas en tabell med namnet rules för att underhålla regler:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

I följande Python-exempel definieras datakvalitetsförväntningar baserat på reglerna i tabellen rules. Funktionen get_rules() läser reglerna från tabellen rules och returnerar en Python-ordlista som innehåller regler som matchar det tag argument som skickas till funktionen.

I det här exemplet används ordlistan med hjälp av @dlt.expect_all_or_drop()-dekoratörer för att framtvinga datakvalitetskrav.

Till exempel tas alla poster som inte uppfyller reglerna märkta med validity bort från tabellen raw_farmers_market.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  df = spark.read.table("rules").filter(col("tag") == tag).collect()
  return {
      row['name']: row['constraint']
      for row in df
  }

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

Python-modul

I följande exempel skapas en Python-modul för att underhålla regler. I det här exemplet ska du lagra koden i en fil med namnet rules_module.py i samma mapp som anteckningsboken som används som källkod för pipelinen:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

I följande Python-exempel definieras förväntningar på datakvalitet baserat på de regler som definierats i filen rules_module.py. Funktionen get_rules() returnerar en Python-ordlista som innehåller regler som matchar det tag argument som skickas till den.

I det här exemplet används ordlistan med hjälp av @dlt.expect_all_or_drop() dekoratörer för att framtvinga datakvalitetsbegränsningar.

Till exempel tas alla poster som inte uppfyller reglerna, taggade med validity, bort från tabellen raw_farmers_market.

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  return {
    row['name']: row['constraint']
    for row in get_rules_as_list_of_dict()
    if row['tag'] == tag
  }

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

Validering av antal rader

I följande exempel verifieras radantalets likhet mellan table_a och table_b för att kontrollera att inga data går förlorade under transformeringar:

valideringsdiagram för antal DLT-rader med förväntad användning

Python

@dlt.view(
  name="count_verification",
  comment="Validates equal row counts between tables"
)
@dlt.expect_or_fail("no_rows_dropped", "a_count == b_count")
def validate_row_counts():
  return spark.sql("""
    SELECT * FROM
      (SELECT COUNT(*) AS a_count FROM table_a),
      (SELECT COUNT(*) AS b_count FROM table_b)""")

SQL

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM table_a),
  (SELECT COUNT(*) AS b_count FROM table_b)

Postidentifiering saknas

I följande exempel verifieras att alla förväntade poster finns i tabellen report:

identifieringsdiagram för rader som saknas i DLT med förväntad användning

Python

@dlt.view(
  name="report_compare_tests",
  comment="Validates no records are missing after joining"
)
@dlt.expect_or_fail("no_missing_records", "r_key IS NOT NULL")
def validate_report_completeness():
  return (
    dlt.read("validation_copy").alias("v")
      .join(
        dlt.read("report").alias("r"),
        on="key",
        how="left_outer"
      )
      .select(
        "v.*",
        "r.key as r_key"
      )
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r_key IS NOT NULL)
)
AS SELECT v.*, r.key as r_key FROM validation_copy v
  LEFT OUTER JOIN report r ON v.key = r.key

primärnyckeln unikhet

I följande exempel verifieras primära nyckelbegränsningar mellan tabeller:

unikhetsdiagram för DLT-primärnyckel med förväntad användning

Python

@dlt.view(
  name="report_pk_tests",
  comment="Validates primary key uniqueness"
)
@dlt.expect_or_fail("unique_pk", "num_entries = 1")
def validate_pk_uniqueness():
  return (
    dlt.read("report")
      .groupBy("pk")
      .count()
      .withColumnRenamed("count", "num_entries")
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
  FROM report
  GROUP BY pk

mönster för schemautveckling

I följande exempel visas hur du hanterar schemautvecklingen för ytterligare kolumner. Använd det här mönstret när du migrerar datakällor eller hanterar flera versioner av överordnade data, vilket säkerställer bakåtkompatibilitet samtidigt som datakvaliteten upprätthålls:

DLT-schemautvecklingsverifiering med förväntad användning

Python

@dlt.table
@dlt.expect_all_or_fail({
  "required_columns": "col1 IS NOT NULL AND col2 IS NOT NULL",
  "valid_col3": "CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END"
})
def evolving_table():
  # Legacy data (V1 schema)
  legacy_data = spark.read.table("legacy_source")

  # New data (V2 schema)
  new_data = spark.read.table("new_source")

  # Combine both sources
  return legacy_data.unionByName(new_data, allowMissingColumns=True)

SQL

CREATE OR REFRESH MATERIALIZED VIEW evolving_table(
  -- Merging multiple constraints into one as expect_all is Python-specific API
  CONSTRAINT valid_migrated_data EXPECT (
    (col1 IS NOT NULL AND col2 IS NOT NULL) AND (CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END)
  ) ON VIOLATION FAIL UPDATE
) AS
  SELECT * FROM new_source
  UNION
  SELECT *, NULL as col3 FROM legacy_source;

Intervallbaserat valideringsmönster

I följande exempel visas hur du validerar nya datapunkter mot historiska statistiska intervall, vilket hjälper dig att identifiera avvikande värden och avvikelser i ditt dataflöde:

DLT-intervallbaserad validering med förväntad användning

Python

@dlt.view
def stats_validation_view():
  # Calculate statistical bounds from historical data
  bounds = spark.sql("""
    SELECT
      avg(amount) - 3 * stddev(amount) as lower_bound,
      avg(amount) + 3 * stddev(amount) as upper_bound
    FROM historical_stats
    WHERE
      date >= CURRENT_DATE() - INTERVAL 30 DAYS
  """)

  # Join with new data and apply bounds
  return spark.read.table("new_data").crossJoin(bounds)

@dlt.table
@dlt.expect_or_drop(
  "within_statistical_range",
  "amount BETWEEN lower_bound AND upper_bound"
)
def validated_amounts():
  return dlt.read("stats_validation_view")

SQL

CREATE OR REFRESH MATERIALIZED VIEW stats_validation_view AS
  WITH bounds AS (
    SELECT
    avg(amount) - 3 * stddev(amount) as lower_bound,
    avg(amount) + 3 * stddev(amount) as upper_bound
    FROM historical_stats
    WHERE date >= CURRENT_DATE() - INTERVAL 30 DAYS
  )
  SELECT
    new_data.*,
    bounds.*
  FROM new_data
  CROSS JOIN bounds;

CREATE OR REFRESH MATERIALIZED VIEW validated_amounts (
  CONSTRAINT within_statistical_range EXPECT (amount BETWEEN lower_bound AND upper_bound)
)
AS SELECT * FROM stats_validation_view;

Ogiltiga poster i karantän

Det här mönstret kombinerar förväntningar med tillfälliga tabeller och vyer för att spåra datakvalitetsmått under pipelineuppdateringar och aktivera separata bearbetningssökvägar för giltiga och ogiltiga poster i underordnade åtgärder.

DLT-datakarantänmönster med förväntad användning

Python

import dlt
from pyspark.sql.functions import expr

rules = {
  "valid_pickup_zip": "(pickup_zip IS NOT NULL)",
  "valid_dropoff_zip": "(dropoff_zip IS NOT NULL)",
}
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.view
def raw_trips_data():
  return spark.readStream.table("samples.nyctaxi.trips")

@dlt.table(
  temporary=True,
  partition_cols=["is_quarantined"],
)
@dlt.expect_all(rules)
def trips_data_quarantine():
  return (
    dlt.readStream("raw_trips_data").withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view
def valid_trips_data():
  return dlt.read("trips_data_quarantine").filter("is_quarantined=false")

@dlt.view
def invalid_trips_data():
  return dlt.read("trips_data_quarantine").filter("is_quarantined=true")

SQL

CREATE TEMPORARY STREAMING LIVE VIEW raw_trips_data AS
  SELECT * FROM STREAM(samples.nyctaxi.trips);

CREATE OR REFRESH TEMPORARY STREAMING TABLE trips_data_quarantine(
  -- Option 1 - merge all expectations to have a single name in the pipeline event log
  CONSTRAINT quarantined_row EXPECT (pickup_zip IS NOT NULL OR dropoff_zip IS NOT NULL),
  -- Option 2 - Keep the expectations separate, resulting in multiple entries under different names
  CONSTRAINT invalid_pickup_zip EXPECT (pickup_zip IS NOT NULL),
  CONSTRAINT invalid_dropoff_zip EXPECT (dropoff_zip IS NOT NULL)
)
PARTITIONED BY (is_quarantined)
AS
  SELECT
    *,
    NOT ((pickup_zip IS NOT NULL) and (dropoff_zip IS NOT NULL)) as is_quarantined
  FROM STREAM(raw_trips_data);

CREATE TEMPORARY LIVE VIEW valid_trips_data AS
SELECT * FROM trips_data_quarantine WHERE is_quarantined=FALSE;

CREATE TEMPORARY LIVE VIEW invalid_trips_data AS
SELECT * FROM trips_data_quarantine WHERE is_quarantined=TRUE;