Aracılığıyla paylaş


İşlem hattı beklentileriyle veri kalitesini yönetme

ETL işlem hatlarında akan verileri doğrulayan kalite kısıtlamaları uygulamak için beklentileri kullanın. Beklentiler, veri kalitesi ölçümleri hakkında daha fazla içgörü sağlar ve geçersiz kayıtları algılarken güncelleştirmelerde başarısız olmanıza veya kayıtları bırakmanıza olanak sağlar.

Bu makalede, söz dizimi örnekleri ve davranış seçenekleri de dahil olmak üzere beklentilere genel bir bakış sağlanır. Daha gelişmiş kullanım örnekleri ve önerilen en iyi yöntemler için bkz. Beklenti önerileri ve gelişmiş desenler.

DLT beklentileri akış grafiği

Beklentiler nelerdir?

Beklentiler, sorgudan geçen her kayda veri kalitesi denetimleri uygulayan, işlem hattı üzerinden gerçekleştirilen maddi görünümler, akış tabloları ve görünüm oluşturma deyimlerinde isteğe bağlı şartlardır. Beklentiler, kısıtlamaları belirtmek için standart SQL Boole deyimlerini kullanır. Tek bir veri kümesi için birden çok beklentiyi birleştirebilir ve bir işlem hattındaki tüm veri kümesi bildirimlerinde beklentileri ayarlayabilirsiniz.

Aşağıdaki bölümlerde, bir beklentinin üç bileşeni tanıtılarak söz dizimi örnekleri sağlanır.

Beklenti adı

Her beklentinin, takip ve izleme için kullanılan bir tanımlayıcı olan bir adı olmalıdır. Doğrulanan ölçümleri bildiren bir ad seçin. Aşağıdaki örnek, yaş değerinin 0 ile 120 arasında olduğunu onaylamak için beklenti valid_customer_age tanımlar:

Önemli

Belirli bir veri kümesi için bir beklenti adı benzersiz olmalıdır. Bir işlem hattındaki birden çok veri kümesinde beklentileri yeniden kullanabilirsiniz. bkz. Taşınabilir ve yeniden kullanılabilir beklentiler.

Piton

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Değerlendirme kısıtlaması

Constraint yan tümcesi, her kayıt için doğru veya yanlış olarak değerlendirilmesi gereken bir SQL koşullu deyimidir. Kısıtlama, doğrulanan şeyin gerçek mantığını içerir. Bir kayıt bu koşulda başarısız olduğunda, beklenen durum tetiklenir.

Kısıtlamalar geçerli SQL söz dizimini kullanmalıdır ve aşağıdakileri içeremez:

  • Özel Python işlevleri
  • Dış hizmet çağrıları
  • Diğer tablolara başvuran alt sorgular

Aşağıda, veri kümesi oluşturma deyimlerine eklenebilen kısıtlamaların örnekleri verilmiştir:

Piton

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Geçersiz kayıt üzerinde eylem

Kayıt doğrulama denetiminde başarısız olduğunda ne olacağını belirlemek için bir eylem belirtmeniz gerekir. Aşağıdaki tabloda kullanılabilir eylemler açıklanmaktadır:

Eylem SQL söz dizimi Python söz dizimi Sonuç
uyarı (varsayılan) EXPECT dlt.expect Hedefe geçersiz kayıtlar yazılır. Geçerli ve geçersiz kayıtların sayısı diğer veri kümesi ölçümleriyle birlikte günlüğe kaydedilir.
bırak EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop Veriler hedefe yazılmadan önce geçersiz kayıtlar atılır. Bırakılan kayıtların sayısı diğer veri kümesi ölçümleriyle birlikte günlüğe kaydedilir.
başarısız EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Geçersiz kayıtlar güncelleştirmenin başarılı olmasını engelliyor. Yeniden işlemeden önce el ile müdahale gerekir. Bu beklenti tek bir akışın başarısız olmasına neden olur ve işlem hattınızdaki diğer akışların başarısız olmasına neden olmaz.

Ayrıca, veri kaybetmeden veya süreci aksatmadan geçersiz kayıtları karantinaya almak için gelişmiş mantık uygulayabilirsiniz. geçersiz kayıtları karantinaya alınbakın.

Beklentiler izleme ölçümleri

İşlem hattı kullanıcı arabiriminden warn veya drop eylemleri için izleme ölçümlerini görebilirsiniz. fail geçersiz bir kayıt algılandığında güncelleştirmenin başarısız olmasına neden olduğundan ölçümler kaydedilmez.

Beklenti ölçümlerini görüntülemek için aşağıdaki adımları tamamlayın:

  1. Kenar çubuğunda DLT tıklayın.
  2. İşlem hattınızın Adı üzerine tıklayın.
  3. Beklentileri tanımlanmış bir veri kümesine tıklayın.
  4. Sağ kenar çubuğunda Veri kalitesi sekmesini seçin.

DLT olay günlüğünü sorgulayarak veri kalitesi ölçümlerini görüntüleyebilirsiniz. olay günlüğündenSorgu veri kalitesi bölümüne bakın.

Geçersiz kayıtları tutma

Beklentilerin varsayılan davranışı geçersiz kayıtların korunmasıdır. Beklentiyi ihlal eden kayıtları tutmak ancak kısıtlamayı geçen veya başarısız olan kayıtların sayısını toplamak istediğinizde expect işlecini kullanın. Beklentiyi ihlal eden kayıtlar, geçerli kayıtlarla birlikte hedef veri kümesine eklenir:

Piton

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Geçersiz kayıtları bırakma

Geçersiz kayıtların daha fazla işlenmesini önlemek için expect_or_drop işlecini kullanın. Beklentiyi ihlal eden kayıtlar hedef veri kümesinden bırakılır:

Piton

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Geçersiz kayıtlarda başarısız ol

Geçersiz kayıtlar kabul edilemez olduğunda, expect_or_fail işlecini kullanarak kayıt doğrulama başarısız olduğunda yürütmeyi hemen durdurun. İşlem bir tablo güncelleştirmesiyse, sistem atomik olarak işlemi geri alır:

Piton

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Önemli

İşlem hattında tanımlanmış birden çok paralel akışınız varsa, tek bir akışın başarısız olması diğer akışların başarısız olmasına neden olmaz.

DLT akış hatası açıklama grafiği

Beklenen başarısız güncelleştirmelerle ilgili sorunları giderme

İşlem hattı, bir beklenti ihlali nedeniyle başarısız olduğunda, işlem hattını yeniden çalıştırmadan önce geçersiz verileri doğru şekilde işlemek için işlem hattı kodunu düzeltmeniz gerekir.

İşlem hatlarının başarısız olması için yapılandırılan beklentiler, ihlalleri algılamak ve bildirmek için gereken bilgileri izlemek için dönüşümlerinizin Spark sorgu planını değiştirir. Bu bilgileri hangi giriş kaydının birçok sorgu için ihlale neden olduğunu belirlemek için kullanabilirsiniz. Aşağıda örnek bir beklenti verilmiştir:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

Birden çok beklenti yönetimi

Not

Hem SQL hem de Python tek bir veri kümesinde birden çok beklentiyi desteklese de, yalnızca Python birden çok ayrı beklentiyi birlikte gruplandırmanıza ve toplu eylemleri belirtmenize olanak tanır.

DLT birden çok beklentiye sahip fLow graph

Birden çok beklentiyi birlikte gruplandırabilir ve expect_all, expect_all_or_dropve expect_all_or_failişlevlerini kullanarak toplu eylemleri belirtebilirsiniz.

Bu dekoratörler bağımsız değişken olarak bir Python sözlüğü kabul eder; burada anahtar, beklenti adı ve değer de beklenti kısıtlamasıdır. İşlem hattınızdaki birden çok veri kümesinde aynı beklenti kümesini yeniden kullanabilirsiniz. Aşağıda, expect_all Python işleçlerinin her birinin örnekleri gösterilmektedir:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset