Python ile işlem hattı kodu geliştirme
DLT, işlem hatlarında gerçekleştirilmiş görünümleri ve akış tablolarını tanımlamaya yönelik birkaç yeni Python kod yapısı sunar. PySpark DataFrame ve Yapılandırılmış Akış API'lerinin temellerini temel alan işlem hatları geliştirmeye yönelik Python desteği.
Python ve DataFrame'leri tanımayan kullanıcılar için Databricks, SQL arabiriminin kullanılmasını önerir. SQL kodu ile işlem hattı geliştirmebkz.
DLT Python dil söz dizimi referansının tamamı için bakınız DLT Python dil başvurusu.
İşlem hattı geliştirme için Python'ın temelleri
DLT veri kümeleri oluşturan Python kodu DataFrame'leri döndürmelidir.
Tüm DLT Python API'leri dlt
modülünde uygulanır. Python ile uygulanan DLT işlem hattı kodunuzun, Python not defterlerinin ve dosyalarının en üstündeki dlt
modülünü açıkça içeri aktarması gerekir.
İşlem hattı yapılandırması sırasında belirtilen kataloğa ve şemaya varsayılan olarak okur ve yazar. Bkz. Hedef kataloğu ve şemayı ayarla.
DLT'ye özgü Python kodu, diğer Python kodu türlerinden kritik bir şekilde farklıdır: Python işlem hattı kodu, DLT veri kümeleri oluşturmak için veri alımı ve dönüştürme gerçekleştiren işlevleri doğrudan çağırmaz. Bunun yerine DLT, bir işlem hattında yapılandırılan tüm kaynak kodu dosyalarında dlt
modülünden dekoratör işlevlerini yorumlar ve bir veri akışı grafiği oluşturur.
Önemli
İşlem hattınız çalışırken beklenmeyen davranışlardan kaçınmak için işlevlerinizde veri kümelerini tanımlayan yan etkileri olabilecek kodu eklemeyin. Daha fazla bilgi edinmek için bkz. Python referansı.
Python ile materyalize görünüm veya akış tablosu oluşturun.
@dlt.table
dekoratörü, DLT'ye bir işlevin döndürdüğü sonuçlara dayanarak bir materialized view veya akış tablosu oluşturmasını söyler. Toplu okumanın sonuçları maddileştirilmiş bir görünüm oluştururken, akış okumasının sonuçları bir akış tablosu oluşturur.
Varsayılan olarak, materialize edilmiş görünüm ve akış tablosu adları, işlev adlarından çıkarılır. Aşağıdaki kod örneği, gerçekleştirilmiş görünüm ve akış tablosu oluşturmaya yönelik temel söz dizimini gösterir:
Not
her iki işlev de samples
kataloğunda aynı tabloya başvurur ve aynı dekoratör işlevini kullanır. Bu örneklerde, gerçekleştirilmiş görünümler ve akış tabloları için temel söz dizimindeki tek farkın spark.read
ve spark.readStream
kullanmak olduğu vurgulanır.
Tüm veri kaynakları akış okumalarını desteklemez. Bazı veri kaynakları her zaman akış semantiğiyle işlenmelidir.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
İsteğe bağlı olarak, @dlt.table
dekoratördeki name
bağımsız değişkenini kullanarak tablo adını belirtebilirsiniz. Aşağıdaki örnek, materyalize edilmiş bir görünüm ve akış tablosu için bu modeli gösterir:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Nesne deposundan veri yükleme
DLT, Azure Databricks tarafından desteklenen tüm biçimlerden veri yüklemeyi destekler. bkz. Veri biçimi seçenekleri.
Not
Bu örneklerde, çalışma alanınıza otomatik olarak bağlanan /databricks-datasets
içindeki veriler kullanılır. Databricks, bulut nesne depolama alanında depolanan verilere başvurmak için birim yollarının veya bulut URI'lerinin kullanılmasını önerir. Bkz. Unity Kataloğu birimleri nedir?.
Databricks, artımlı alım iş yüklerini bulut nesne depolamasında depolanan verilere göre yapılandırırken Otomatik Yükleyici ve akış tablolarının kullanılmasını önerir. Bkz. Otomatik Yükleyici nedir?.
Aşağıdaki örnek, Otomatik Yükleyici kullanarak JSON dosyalarından bir akış tablosu oluşturur:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Aşağıdaki örnek, JSON dizinini okumak ve gerçekleştirilmiş bir görünüm oluşturmak için toplu semantiği kullanır:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Verileri beklentilerle doğrulama
Veri kalitesi kısıtlamalarını ayarlamak ve uygulamak için beklentileri kullanabilirsiniz. bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.
Aşağıdaki kod, veri alımı sırasında null olan kayıtları düşüren valid_data
adlı bir beklenti tanımlamak için @dlt.expect_or_drop
kullanır:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
İşlem hattınızda tanımlanan gerçekleştirilmiş görünümleri ve akış tablolarını sorgulayın
Aşağıdaki örnek dört veri kümesini tanımlar:
- JSON verilerini yükleyen
orders
adlı bir akış tablosu. - CSV verilerini yükleyen "
customers
" adlı gerçekleştirilmiş görünüm. -
orders
vecustomers
veri kümelerindeki kayıtları birleştiren, sipariş zaman damgasını bir tarihe dönüştüren vecustomer_id
,order_number
,state
ileorder_date
alanlarını seçencustomer_orders
adlı gerçekleştirilmiş bir görünüm. - Her eyalet için günlük sipariş sayısını toplayan
daily_orders_by_state
adlı bir materyalize görünüm.
Noti
İşlem hattınızdaki görünümleri veya tabloları sorgularken, kataloğu ve şemayı doğrudan belirtebilir veya işlem hattınızda yapılandırılan varsayılanları kullanabilirsiniz. Bu örnekte, orders
, customers
ve customer_orders
tabloları, işlem hattınız için yapılandırılan varsayılan katalogdan ve şemadan yazılır ve okunur.
Eski yayımlama modu, işlem hattınızda tanımlanan diğer gerçekleştirilmiş görünümleri ve akış tablolarını sorgulamak için LIVE
şemasını kullanır. Yeni işlem hatlarında, LIVE
şema söz dizimi sessizce yoksayılır. Bkz. LIVE şeması (eski).
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
tabloları bir for
döngüsünde oluşturma
Program aracılığıyla birden çok tablo oluşturmak için Python for
döngülerini kullanabilirsiniz. Bu, yalnızca birkaç parametreye göre değişen çok sayıda veri kaynağınız veya hedef veri kümeniz olduğunda yararlı olabilir ve bu da toplam kodun korunmasına ve daha az kod yedekliliğine neden olur.
for
döngüsü, mantığı seri sırada değerlendirir, ancak veri kümeleri için planlama tamamlandıktan sonra işlem hattı mantığı paralel olarak çalıştırır.
Önemli
Veri kümelerini tanımlamak için bu deseni kullanırken, for
döngüsüne geçirilen değerlerin listesinin her zaman eklendiğinden emin olun. Daha önce işlem hattında tanımlanmış bir veri kümesi gelecekteki bir işlem hattı çalıştırmasından atlanırsa, bu veri kümesi hedef şemadan otomatik olarak bırakılır.
Aşağıdaki örnek, müşteri siparişlerini bölgeye göre filtreleyen beş tablo oluşturur. Burada, hedef gerçekleştirilmiş görünümlerin isimlerini belirlemek ve kaynak verileri filtrelemek için bölge adı kullanılır. Geçici görünümler, son gerçekleştirilmiş görünümleri oluştururken kullanılan kaynak tablolardan birleştirmeleri tanımlamak için kullanılır.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Bu işlem hattı için veri akışı grafiği örneği aşağıda verilmiştir:
Sorun giderme: for
döngüsü aynı değerlere sahip birçok tablo oluşturur
İşlem hatlarının Python kodunu değerlendirmek için kullandığı gecikmeli yürütme modeli, @dlt.table()
tarafından dekore edilen işlev çağrıldığında mantığınızın tek tek değerlere doğrudan başvurmasını gerektirir.
Aşağıdaki örnekte, for
döngüsüyle tablo tanımlamaya yönelik iki doğru yaklaşım gösterilmektedir. Her iki örnekte de, tables
listesindeki her tablo adına, @dlt.table()
tarafından süslenen fonksiyon içinde açıkça başvurulur.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
Aşağıdaki örnek , başvuru değerlerini doğru. Bu örnek farklı adlara sahip tablolar oluşturur, ancak tüm tablolar for
döngüsündeki son değerden veri yükler:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)