다음을 통해 공유


예상 권장 사항 및 고급 패턴

이 문서에는 대규모로 기대치를 구현하기 위한 권장 사항과 기대에서 지원하는 고급 패턴의 예가 포함되어 있습니다. 이러한 패턴은 예상과 함께 여러 데이터 세트를 사용하며 사용자가 구체화된 뷰, 스트리밍 테이블 및 기대치의 구문과 의미 체계를 이해하도록 요구합니다.

기대 동작 및 구문에 대한 기본 개요는 파이프라인 기대치를 통한 데이터 품질 관리을 참조하세요.

이식 가능하고 재사용 가능한 기대치

Databricks는 이식성을 개선하고 유지 관리 부담을 줄이기 위해 기대치를 구현할 때 다음과 같은 모범 사례를 권장합니다.

추천 영향
파이프라인 논리와 별도로 예상 정의를 저장합니다. 여러 데이터 세트 또는 파이프라인에 기대치를 쉽게 적용할 수 있습니다. 파이프라인 소스 코드를 수정하지 않고 기대치를 업데이트, 감사 및 유지 관리합니다.
사용자 지정 태그를 추가하여 관련 예상 그룹을 만듭니다. 태그를 기준으로 기대치를 필터링합니다.
유사한 데이터 세트 간에 지속적으로 기대치를 적용합니다. 여러 데이터 세트 및 파이프라인에서 동일한 기대치를 사용하여 동일한 논리를 평가합니다.

다음 예제에서는 델타 테이블 또는 사전을 사용하여 중앙 예상 리포지토리를 만드는 방법을 보여 줍니다. 그런 다음 사용자 지정 Python 함수는 예제 파이프라인의 데이터 세트에 이러한 기대치를 적용합니다.

델타 테이블

다음 예제에서는 규칙을 유지하기 위해 rules 테이블을 만듭니다.

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

다음 Python 예제에서는 rules 테이블의 규칙을 기반으로 데이터 품질 기대치를 정의합니다. get_rules() 함수는 rules 테이블에서 규칙을 읽고 함수에 전달된 tag 인수와 일치하는 규칙이 포함된 Python 사전을 반환합니다.

이 예제에서는 @dlt.expect_all_or_drop() 데코레이터를 사용하여 사전을 적용하여 데이터 품질 제약 조건을 적용합니다.

예를 들어 validity 태그가 지정된 규칙에 실패한 레코드는 raw_farmers_market 테이블에서 삭제됩니다.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  df = spark.read.table("rules").filter(col("tag") == tag).collect()
  return {
      row['name']: row['constraint']
      for row in df
  }

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

Python 모듈

다음 예제에서는 규칙을 유지 관리하는 Python 모듈을 만듭니다. 이 예제에서는 이 코드를 파이프라인의 소스 코드로 사용되는 Notebook과 동일한 폴더의 rules_module.py 파일에 저장합니다.

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

다음 Python 예제에서는 rules_module.py 파일에 정의된 규칙에 따라 데이터 품질 기대치를 정의합니다. get_rules() 함수는 전달된 tag 인수와 일치하는 규칙을 포함하는 Python 사전을 반환합니다.

이 예제에서는 @dlt.expect_all_or_drop() 데코레이터를 사용하여 사전을 적용하여 데이터 품질 제약 조건을 적용합니다.

예를 들어 validity 태그가 지정된 규칙에 실패한 레코드는 raw_farmers_market 테이블에서 삭제됩니다.

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  return {
    row['name']: row['constraint']
    for row in get_rules_as_list_of_dict()
    if row['tag'] == tag
  }

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

행 개수 유효성 검사

다음 예제에서는 table_atable_b 간의 행 개수 같음의 유효성을 검사하여 변환 중에 데이터가 손실되지 않는지 확인합니다.

DLT 행 개수 유효성 검사 그래프 예상 활용

파이썬

@dlt.view(
  name="count_verification",
  comment="Validates equal row counts between tables"
)
@dlt.expect_or_fail("no_rows_dropped", "a_count == b_count")
def validate_row_counts():
  return spark.sql("""
    SELECT * FROM
      (SELECT COUNT(*) AS a_count FROM table_a),
      (SELECT COUNT(*) AS b_count FROM table_b)""")

SQL

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM table_a),
  (SELECT COUNT(*) AS b_count FROM table_b)

누락된 레코드 검색

다음 예제에서는 모든 예상 레코드가 report 테이블에 있는지 확인합니다.

DLT 누락된 행 탐지 그래프와 예상 사용량

파이썬

@dlt.view(
  name="report_compare_tests",
  comment="Validates no records are missing after joining"
)
@dlt.expect_or_fail("no_missing_records", "r_key IS NOT NULL")
def validate_report_completeness():
  return (
    dlt.read("validation_copy").alias("v")
      .join(
        dlt.read("report").alias("r"),
        on="key",
        how="left_outer"
      )
      .select(
        "v.*",
        "r.key as r_key"
      )
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r_key IS NOT NULL)
)
AS SELECT v.*, r.key as r_key FROM validation_copy v
  LEFT OUTER JOIN report r ON v.key = r.key

기본 키 고유성

다음 예제에서는 여러 테이블에서 기본 키 제약 조건의 유효성을 검사합니다.

DLT 기본 키 고유성 그래프 예상 사용량 사용하여

파이썬

@dlt.view(
  name="report_pk_tests",
  comment="Validates primary key uniqueness"
)
@dlt.expect_or_fail("unique_pk", "num_entries = 1")
def validate_pk_uniqueness():
  return (
    dlt.read("report")
      .groupBy("pk")
      .count()
      .withColumnRenamed("count", "num_entries")
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
  FROM report
  GROUP BY pk

스키마 진화 패턴

다음 예제에서는 추가 열에 대한 스키마 진화를 처리하는 방법을 보여 줍니다. 데이터 원본을 마이그레이션하거나 여러 버전의 업스트림 데이터를 처리할 때 이 패턴을 사용하여 데이터 품질을 적용하면서 이전 버전과의 호환성을 보장합니다.

기대 사용을 통한 DLT 스키마 진화 유효성 검사

파이썬

@dlt.table
@dlt.expect_all_or_fail({
  "required_columns": "col1 IS NOT NULL AND col2 IS NOT NULL",
  "valid_col3": "CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END"
})
def evolving_table():
  # Legacy data (V1 schema)
  legacy_data = spark.read.table("legacy_source")

  # New data (V2 schema)
  new_data = spark.read.table("new_source")

  # Combine both sources
  return legacy_data.unionByName(new_data, allowMissingColumns=True)

SQL

CREATE OR REFRESH MATERIALIZED VIEW evolving_table(
  -- Merging multiple constraints into one as expect_all is Python-specific API
  CONSTRAINT valid_migrated_data EXPECT (
    (col1 IS NOT NULL AND col2 IS NOT NULL) AND (CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END)
  ) ON VIOLATION FAIL UPDATE
) AS
  SELECT * FROM new_source
  UNION
  SELECT *, NULL as col3 FROM legacy_source;

범위 기반 유효성 검사 패턴

다음 예제에서는 기록 통계 범위에 대해 새 데이터 요소의 유효성을 검사하여 데이터 흐름에서 이상값 및 변칙을 식별하는 방법을 보여 줍니다.

DLT 범위 기반 유효성 검사 예상 사용량

파이썬

@dlt.view
def stats_validation_view():
  # Calculate statistical bounds from historical data
  bounds = spark.sql("""
    SELECT
      avg(amount) - 3 * stddev(amount) as lower_bound,
      avg(amount) + 3 * stddev(amount) as upper_bound
    FROM historical_stats
    WHERE
      date >= CURRENT_DATE() - INTERVAL 30 DAYS
  """)

  # Join with new data and apply bounds
  return spark.read.table("new_data").crossJoin(bounds)

@dlt.table
@dlt.expect_or_drop(
  "within_statistical_range",
  "amount BETWEEN lower_bound AND upper_bound"
)
def validated_amounts():
  return dlt.read("stats_validation_view")

SQL

CREATE OR REFRESH MATERIALIZED VIEW stats_validation_view AS
  WITH bounds AS (
    SELECT
    avg(amount) - 3 * stddev(amount) as lower_bound,
    avg(amount) + 3 * stddev(amount) as upper_bound
    FROM historical_stats
    WHERE date >= CURRENT_DATE() - INTERVAL 30 DAYS
  )
  SELECT
    new_data.*,
    bounds.*
  FROM new_data
  CROSS JOIN bounds;

CREATE OR REFRESH MATERIALIZED VIEW validated_amounts (
  CONSTRAINT within_statistical_range EXPECT (amount BETWEEN lower_bound AND upper_bound)
)
AS SELECT * FROM stats_validation_view;

잘못된 레코드 격리

이 패턴은 예상을 임시 테이블 및 뷰와 결합하여 파이프라인 업데이트 중에 데이터 품질 메트릭을 추적하고 다운스트림 작업에서 유효하고 잘못된 레코드에 대해 별도의 처리 경로를 사용하도록 설정합니다.

DLT 데이터 격리 패턴 , 예상 사용량

파이썬

import dlt
from pyspark.sql.functions import expr

rules = {
  "valid_pickup_zip": "(pickup_zip IS NOT NULL)",
  "valid_dropoff_zip": "(dropoff_zip IS NOT NULL)",
}
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.view
def raw_trips_data():
  return spark.readStream.table("samples.nyctaxi.trips")

@dlt.table(
  temporary=True,
  partition_cols=["is_quarantined"],
)
@dlt.expect_all(rules)
def trips_data_quarantine():
  return (
    dlt.readStream("raw_trips_data").withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view
def valid_trips_data():
  return dlt.read("trips_data_quarantine").filter("is_quarantined=false")

@dlt.view
def invalid_trips_data():
  return dlt.read("trips_data_quarantine").filter("is_quarantined=true")

SQL

CREATE TEMPORARY STREAMING LIVE VIEW raw_trips_data AS
  SELECT * FROM STREAM(samples.nyctaxi.trips);

CREATE OR REFRESH TEMPORARY STREAMING TABLE trips_data_quarantine(
  -- Option 1 - merge all expectations to have a single name in the pipeline event log
  CONSTRAINT quarantined_row EXPECT (pickup_zip IS NOT NULL OR dropoff_zip IS NOT NULL),
  -- Option 2 - Keep the expectations separate, resulting in multiple entries under different names
  CONSTRAINT invalid_pickup_zip EXPECT (pickup_zip IS NOT NULL),
  CONSTRAINT invalid_dropoff_zip EXPECT (dropoff_zip IS NOT NULL)
)
PARTITIONED BY (is_quarantined)
AS
  SELECT
    *,
    NOT ((pickup_zip IS NOT NULL) and (dropoff_zip IS NOT NULL)) as is_quarantined
  FROM STREAM(raw_trips_data);

CREATE TEMPORARY LIVE VIEW valid_trips_data AS
SELECT * FROM trips_data_quarantine WHERE is_quarantined=FALSE;

CREATE TEMPORARY LIVE VIEW invalid_trips_data AS
SELECT * FROM trips_data_quarantine WHERE is_quarantined=TRUE;