Condividi tramite


Raccomandazioni per le aspettative e modelli avanzati

Questo articolo contiene raccomandazioni per implementare le aspettative su larga scala ed esempi di modelli avanzati supportati dalle aspettative. Questi modelli usano più set di dati in combinazione con le aspettative e richiedono che gli utenti comprendano la sintassi e la semantica delle viste materializzate, delle tabelle di streaming e delle aspettative.

Per una panoramica di base del comportamento e della sintassi delle aspettative, vedere Gestire la qualità dei dati con le aspettative della pipeline.

aspettative portabili e riutilizzabili

Databricks consiglia le procedure consigliate seguenti quando si implementano aspettative per migliorare la portabilità e ridurre i carichi di lavoro di manutenzione:

Raccomandazione Impatto
Archiviare le definizioni delle aspettative separatamente dalla logica della pipeline. Applicare facilmente le aspettative a più set di dati o pipeline. Aggiornare, controllare e mantenere le aspettative senza modificare il codice sorgente della pipeline.
Aggiungere tag personalizzati per creare gruppi di aspettative correlate. Filtra le aspettative in base ai tag.
Applicare le aspettative in modo coerente tra set di dati simili. Usare le stesse aspettative in più set di dati e pipeline per valutare la logica identica.

Gli esempi seguenti illustrano l'uso di una tabella o di un dizionario Delta per creare un repository di aspettative centrale. Le funzioni Python personalizzate applicano quindi queste aspettative ai set di dati in una pipeline di esempio:

Tabella delta

Nell'esempio seguente viene creata una tabella denominata rules per gestire le regole:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

L'esempio python seguente definisce le aspettative sulla qualità dei dati in base alle regole nella tabella rules. La funzione get_rules() legge le regole dalla tabella rules e restituisce un dizionario Python contenente regole corrispondenti all'argomento tag passato alla funzione.

In questo esempio il dizionario viene applicato usando @dlt.expect_all_or_drop() decorator per applicare vincoli di qualità dei dati.

Ad esempio, tutti i record che hanno esito negativo sulle regole contrassegnate con validity verranno eliminati dalla tabella raw_farmers_market:

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  df = spark.read.table("rules").filter(col("tag") == tag).collect()
  return {
      row['name']: row['constraint']
      for row in df
  }

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

Modulo Python

L'esempio seguente crea un modulo Python per gestire le regole. Per questo esempio, archiviare questo codice in un file denominato rules_module.py nella stessa cartella del notebook usato come codice sorgente per la pipeline:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

L'esempio python seguente definisce le aspettative sulla qualità dei dati in base alle regole definite nel file rules_module.py. La funzione get_rules() restituisce un dizionario Python contenente regole corrispondenti all'argomento tag passato.

In questo esempio il dizionario viene applicato usando @dlt.expect_all_or_drop() decorator per applicare vincoli di qualità dei dati.

Ad esempio, tutti i record che hanno esito negativo sulle regole contrassegnate con validity verranno eliminati dalla tabella raw_farmers_market:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  return {
    row['name']: row['constraint']
    for row in get_rules_as_list_of_dict()
    if row['tag'] == tag
  }

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

Convalida del conteggio delle righe

Nell'esempio seguente viene convalidata l'uguaglianza del numero di righe tra table_a e table_b per verificare che non vengano persi dati durante le trasformazioni:

grafico di convalida del numero di righe DLT con aspettative di utilizzo

Pitone

@dlt.view(
  name="count_verification",
  comment="Validates equal row counts between tables"
)
@dlt.expect_or_fail("no_rows_dropped", "a_count == b_count")
def validate_row_counts():
  return spark.sql("""
    SELECT * FROM
      (SELECT COUNT(*) AS a_count FROM table_a),
      (SELECT COUNT(*) AS b_count FROM table_b)""")

SQL

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM table_a),
  (SELECT COUNT(*) AS b_count FROM table_b)

Rilevamento di record mancanti

Nell'esempio seguente viene verificato che tutti i record previsti siano presenti nella tabella report:

grafico di rilevamento delle righe mancanti DLT con aspettative di utilizzo

Pitone

@dlt.view(
  name="report_compare_tests",
  comment="Validates no records are missing after joining"
)
@dlt.expect_or_fail("no_missing_records", "r_key IS NOT NULL")
def validate_report_completeness():
  return (
    dlt.read("validation_copy").alias("v")
      .join(
        dlt.read("report").alias("r"),
        on="key",
        how="left_outer"
      )
      .select(
        "v.*",
        "r.key as r_key"
      )
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r_key IS NOT NULL)
)
AS SELECT v.*, r.key as r_key FROM validation_copy v
  LEFT OUTER JOIN report r ON v.key = r.key

univocità della chiave primaria

Nell'esempio seguente vengono convalidati i vincoli di chiave primaria tra le tabelle:

grafico di univocità della chiave primaria DLT con aspettative di utilizzo

Pitone

@dlt.view(
  name="report_pk_tests",
  comment="Validates primary key uniqueness"
)
@dlt.expect_or_fail("unique_pk", "num_entries = 1")
def validate_pk_uniqueness():
  return (
    dlt.read("report")
      .groupBy("pk")
      .count()
      .withColumnRenamed("count", "num_entries")
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
  FROM report
  GROUP BY pk

modello di evoluzione dello schema

Nell'esempio seguente viene illustrato come gestire l'evoluzione dello schema per colonne aggiuntive. Usare questo modello quando si esegue la migrazione di origini dati o si gestiscono più versioni di dati upstream, garantendo la compatibilità con le versioni precedenti, applicando al tempo stesso la qualità dei dati:

convalida dell'evoluzione dello schema DLT con aspettative di utilizzo

Pitone

@dlt.table
@dlt.expect_all_or_fail({
  "required_columns": "col1 IS NOT NULL AND col2 IS NOT NULL",
  "valid_col3": "CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END"
})
def evolving_table():
  # Legacy data (V1 schema)
  legacy_data = spark.read.table("legacy_source")

  # New data (V2 schema)
  new_data = spark.read.table("new_source")

  # Combine both sources
  return legacy_data.unionByName(new_data, allowMissingColumns=True)

SQL

CREATE OR REFRESH MATERIALIZED VIEW evolving_table(
  -- Merging multiple constraints into one as expect_all is Python-specific API
  CONSTRAINT valid_migrated_data EXPECT (
    (col1 IS NOT NULL AND col2 IS NOT NULL) AND (CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END)
  ) ON VIOLATION FAIL UPDATE
) AS
  SELECT * FROM new_source
  UNION
  SELECT *, NULL as col3 FROM legacy_source;

modello di convalida basato su intervalli

L'esempio seguente illustra come convalidare i nuovi punti dati rispetto agli intervalli statistici cronologici, consentendo di identificare outlier e anomalie nel flusso di dati:

convalida basata su intervalli DLT con utilizzo delle aspettative

Pitone

@dlt.view
def stats_validation_view():
  # Calculate statistical bounds from historical data
  bounds = spark.sql("""
    SELECT
      avg(amount) - 3 * stddev(amount) as lower_bound,
      avg(amount) + 3 * stddev(amount) as upper_bound
    FROM historical_stats
    WHERE
      date >= CURRENT_DATE() - INTERVAL 30 DAYS
  """)

  # Join with new data and apply bounds
  return spark.read.table("new_data").crossJoin(bounds)

@dlt.table
@dlt.expect_or_drop(
  "within_statistical_range",
  "amount BETWEEN lower_bound AND upper_bound"
)
def validated_amounts():
  return dlt.read("stats_validation_view")

SQL

CREATE OR REFRESH MATERIALIZED VIEW stats_validation_view AS
  WITH bounds AS (
    SELECT
    avg(amount) - 3 * stddev(amount) as lower_bound,
    avg(amount) + 3 * stddev(amount) as upper_bound
    FROM historical_stats
    WHERE date >= CURRENT_DATE() - INTERVAL 30 DAYS
  )
  SELECT
    new_data.*,
    bounds.*
  FROM new_data
  CROSS JOIN bounds;

CREATE OR REFRESH MATERIALIZED VIEW validated_amounts (
  CONSTRAINT within_statistical_range EXPECT (amount BETWEEN lower_bound AND upper_bound)
)
AS SELECT * FROM stats_validation_view;

Mettere in quarantena i record non validi

Questo modello combina le aspettative con tabelle e viste temporanee per tenere traccia delle metriche della qualità dei dati durante gli aggiornamenti della pipeline e abilitare percorsi di elaborazione separati per record validi e non validi nelle operazioni downstream.

modello di quarantena dei dati DLT con aspettative di utilizzo

Pitone

import dlt
from pyspark.sql.functions import expr

rules = {
  "valid_pickup_zip": "(pickup_zip IS NOT NULL)",
  "valid_dropoff_zip": "(dropoff_zip IS NOT NULL)",
}
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.view
def raw_trips_data():
  return spark.readStream.table("samples.nyctaxi.trips")

@dlt.table(
  temporary=True,
  partition_cols=["is_quarantined"],
)
@dlt.expect_all(rules)
def trips_data_quarantine():
  return (
    dlt.readStream("raw_trips_data").withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view
def valid_trips_data():
  return dlt.read("trips_data_quarantine").filter("is_quarantined=false")

@dlt.view
def invalid_trips_data():
  return dlt.read("trips_data_quarantine").filter("is_quarantined=true")

SQL

CREATE TEMPORARY STREAMING LIVE VIEW raw_trips_data AS
  SELECT * FROM STREAM(samples.nyctaxi.trips);

CREATE OR REFRESH TEMPORARY STREAMING TABLE trips_data_quarantine(
  -- Option 1 - merge all expectations to have a single name in the pipeline event log
  CONSTRAINT quarantined_row EXPECT (pickup_zip IS NOT NULL OR dropoff_zip IS NOT NULL),
  -- Option 2 - Keep the expectations separate, resulting in multiple entries under different names
  CONSTRAINT invalid_pickup_zip EXPECT (pickup_zip IS NOT NULL),
  CONSTRAINT invalid_dropoff_zip EXPECT (dropoff_zip IS NOT NULL)
)
PARTITIONED BY (is_quarantined)
AS
  SELECT
    *,
    NOT ((pickup_zip IS NOT NULL) and (dropoff_zip IS NOT NULL)) as is_quarantined
  FROM STREAM(raw_trips_data);

CREATE TEMPORARY LIVE VIEW valid_trips_data AS
SELECT * FROM trips_data_quarantine WHERE is_quarantined=FALSE;

CREATE TEMPORARY LIVE VIEW invalid_trips_data AS
SELECT * FROM trips_data_quarantine WHERE is_quarantined=TRUE;