Aracılığıyla paylaş


Python ile işlem hattı kodu geliştirme

DLT, işlem hatlarında gerçekleştirilmiş görünümleri ve akış tablolarını tanımlamaya yönelik birkaç yeni Python kod yapısı sunar. PySpark DataFrame ve Yapılandırılmış Akış API'lerinin temellerini temel alan işlem hatları geliştirmeye yönelik Python desteği.

Python ve DataFrame'leri tanımayan kullanıcılar için Databricks, SQL arabiriminin kullanılmasını önerir. SQL kodu ile işlem hattı geliştirmebkz.

DLT Python dil söz dizimi referansının tamamı için bakınız DLT Python dil başvurusu.

İşlem hattı geliştirme için Python'ın temelleri

DLT veri kümeleri oluşturan Python kodu DataFrame'leri döndürmelidir.

Tüm DLT Python API'leri dlt modülünde uygulanır. Python ile uygulanan DLT işlem hattı kodunuzun, Python not defterlerinin ve dosyalarının en üstündeki dlt modülünü açıkça içeri aktarması gerekir.

İşlem hattı yapılandırması sırasında belirtilen kataloğa ve şemaya varsayılan olarak okur ve yazar. Bkz. Hedef kataloğu ve şemayı ayarla.

DLT'ye özgü Python kodu, diğer Python kodu türlerinden kritik bir şekilde farklıdır: Python işlem hattı kodu, DLT veri kümeleri oluşturmak için veri alımı ve dönüştürme gerçekleştiren işlevleri doğrudan çağırmaz. Bunun yerine DLT, bir işlem hattında yapılandırılan tüm kaynak kodu dosyalarında dlt modülünden dekoratör işlevlerini yorumlar ve bir veri akışı grafiği oluşturur.

Önemli

İşlem hattınız çalışırken beklenmeyen davranışlardan kaçınmak için işlevlerinizde veri kümelerini tanımlayan yan etkileri olabilecek kodu eklemeyin. Daha fazla bilgi edinmek için bkz. Python referansı.

Python ile materyalize görünüm veya akış tablosu oluşturun.

@dlt.table dekoratörü, DLT'ye bir işlevin döndürdüğü sonuçlara dayanarak bir materialized view veya akış tablosu oluşturmasını söyler. Toplu okumanın sonuçları maddileştirilmiş bir görünüm oluştururken, akış okumasının sonuçları bir akış tablosu oluşturur.

Varsayılan olarak, materialize edilmiş görünüm ve akış tablosu adları, işlev adlarından çıkarılır. Aşağıdaki kod örneği, gerçekleştirilmiş görünüm ve akış tablosu oluşturmaya yönelik temel söz dizimini gösterir:

Not

her iki işlev de samples kataloğunda aynı tabloya başvurur ve aynı dekoratör işlevini kullanır. Bu örneklerde, gerçekleştirilmiş görünümler ve akış tabloları için temel söz dizimindeki tek farkın spark.read ve spark.readStreamkullanmak olduğu vurgulanır.

Tüm veri kaynakları akış okumalarını desteklemez. Bazı veri kaynakları her zaman akış semantiğiyle işlenmelidir.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

İsteğe bağlı olarak, @dlt.table dekoratördeki name bağımsız değişkenini kullanarak tablo adını belirtebilirsiniz. Aşağıdaki örnek, materyalize edilmiş bir görünüm ve akış tablosu için bu modeli gösterir:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Nesne deposundan veri yükleme

DLT, Azure Databricks tarafından desteklenen tüm biçimlerden veri yüklemeyi destekler. bkz. Veri biçimi seçenekleri.

Not

Bu örneklerde, çalışma alanınıza otomatik olarak bağlanan /databricks-datasets içindeki veriler kullanılır. Databricks, bulut nesne depolama alanında depolanan verilere başvurmak için birim yollarının veya bulut URI'lerinin kullanılmasını önerir. Bkz. Unity Kataloğu birimleri nedir?.

Databricks, artımlı alım iş yüklerini bulut nesne depolamasında depolanan verilere göre yapılandırırken Otomatik Yükleyici ve akış tablolarının kullanılmasını önerir. Bkz. Otomatik Yükleyici nedir?.

Aşağıdaki örnek, Otomatik Yükleyici kullanarak JSON dosyalarından bir akış tablosu oluşturur:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Aşağıdaki örnek, JSON dizinini okumak ve gerçekleştirilmiş bir görünüm oluşturmak için toplu semantiği kullanır:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Verileri beklentilerle doğrulama

Veri kalitesi kısıtlamalarını ayarlamak ve uygulamak için beklentileri kullanabilirsiniz. bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.

Aşağıdaki kod, veri alımı sırasında null olan kayıtları düşüren valid_data adlı bir beklenti tanımlamak için @dlt.expect_or_drop kullanır:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

İşlem hattınızda tanımlanan gerçekleştirilmiş görünümleri ve akış tablolarını sorgulayın

Aşağıdaki örnek dört veri kümesini tanımlar:

  • JSON verilerini yükleyen orders adlı bir akış tablosu.
  • CSV verilerini yükleyen "customers" adlı gerçekleştirilmiş görünüm.
  • orders ve customers veri kümelerindeki kayıtları birleştiren, sipariş zaman damgasını bir tarihe dönüştüren ve customer_id, order_number, stateile order_date alanlarını seçen customer_orders adlı gerçekleştirilmiş bir görünüm.
  • Her eyalet için günlük sipariş sayısını toplayan daily_orders_by_state adlı bir materyalize görünüm.

Noti

İşlem hattınızdaki görünümleri veya tabloları sorgularken, kataloğu ve şemayı doğrudan belirtebilir veya işlem hattınızda yapılandırılan varsayılanları kullanabilirsiniz. Bu örnekte, orders, customersve customer_orders tabloları, işlem hattınız için yapılandırılan varsayılan katalogdan ve şemadan yazılır ve okunur.

Eski yayımlama modu, işlem hattınızda tanımlanan diğer gerçekleştirilmiş görünümleri ve akış tablolarını sorgulamak için LIVE şemasını kullanır. Yeni işlem hatlarında, LIVE şema söz dizimi sessizce yoksayılır. Bkz. LIVE şeması (eski).

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

tabloları bir for döngüsünde oluşturma

Program aracılığıyla birden çok tablo oluşturmak için Python for döngülerini kullanabilirsiniz. Bu, yalnızca birkaç parametreye göre değişen çok sayıda veri kaynağınız veya hedef veri kümeniz olduğunda yararlı olabilir ve bu da toplam kodun korunmasına ve daha az kod yedekliliğine neden olur.

for döngüsü, mantığı seri sırada değerlendirir, ancak veri kümeleri için planlama tamamlandıktan sonra işlem hattı mantığı paralel olarak çalıştırır.

Önemli

Veri kümelerini tanımlamak için bu deseni kullanırken, for döngüsüne geçirilen değerlerin listesinin her zaman eklendiğinden emin olun. Daha önce işlem hattında tanımlanmış bir veri kümesi gelecekteki bir işlem hattı çalıştırmasından atlanırsa, bu veri kümesi hedef şemadan otomatik olarak bırakılır.

Aşağıdaki örnek, müşteri siparişlerini bölgeye göre filtreleyen beş tablo oluşturur. Burada, hedef gerçekleştirilmiş görünümlerin isimlerini belirlemek ve kaynak verileri filtrelemek için bölge adı kullanılır. Geçici görünümler, son gerçekleştirilmiş görünümleri oluştururken kullanılan kaynak tablolardan birleştirmeleri tanımlamak için kullanılır.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Bu işlem hattı için veri akışı grafiği örneği aşağıda verilmiştir:

Beş bölgesel tabloya yol açan iki görünümden oluşan bir veri akışı grafiği.

Sorun giderme: for döngüsü aynı değerlere sahip birçok tablo oluşturur

İşlem hatlarının Python kodunu değerlendirmek için kullandığı gecikmeli yürütme modeli, @dlt.table() tarafından dekore edilen işlev çağrıldığında mantığınızın tek tek değerlere doğrudan başvurmasını gerektirir.

Aşağıdaki örnekte, for döngüsüyle tablo tanımlamaya yönelik iki doğru yaklaşım gösterilmektedir. Her iki örnekte de, tables listesindeki her tablo adına, @dlt.table()tarafından süslenen fonksiyon içinde açıkça başvurulur.

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Aşağıdaki örnek , başvuru değerlerini doğru. Bu örnek farklı adlara sahip tablolar oluşturur, ancak tüm tablolar for döngüsündeki son değerden veri yükler:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)