Mengembangkan kode alur dengan Python
DLT memperkenalkan beberapa konstruksi kode Python baru untuk menentukan tampilan materialisasi dan tabel streaming dalam alur. Dukungan Python untuk mengembangkan alur dibangun berdasarkan dasar-dasar PySpark DataFrame dan API Streaming Terstruktur.
Untuk pengguna yang tidak terbiasa dengan Python dan DataFrames, Databricks merekomendasikan penggunaan antarmuka SQL. Lihat Mengembangkan kode alur dengan SQL.
Untuk referensi lengkap sintaks DLT Python, lihat referensi bahasa DLT Python.
Dasar-dasar Python untuk pengembangan alur
Kode Python yang membuat himpunan data DLT harus mengembalikan DataFrames.
Semua API DLT Python diimplementasikan dalam modul dlt
. Kode alur DLT Anda yang diterapkan dengan Python harus secara eksplisit mengimpor modul dlt
di bagian atas buku catatan dan file Python.
Membaca dan menulis default ke katalog dan skema yang ditentukan selama konfigurasi alur. Lihat Mengatur katalog target dan mengatur skema.
Kode Python khusus DLT berbeda dari jenis kode Python lainnya dengan satu cara penting: Kode alur Python tidak secara langsung memanggil fungsi yang melakukan penyerapan dan transformasi data untuk membuat himpunan data DLT. Sebagai gantinya, DLT menginterpretasikan fungsi dekorator dari modul dlt
di semua file kode sumber yang dikonfigurasi dalam alur dan membangun grafik aliran data.
Penting
Untuk menghindari perilaku tak terduga saat alur Anda berjalan, jangan sertakan kode yang mungkin memiliki efek samping dalam fungsi Anda yang menentukan himpunan data. Untuk mempelajari lebih lanjut, lihat referensi Python.
Membuat tampilan materialisasi atau tabel streaming dengan Python
Dekorator @dlt.table
memberi tahu DLT untuk membuat tampilan terwujud atau tabel streaming berdasarkan hasil yang dikembalikan oleh fungsi. Hasil baca batch membuat tampilan materialisasi, sementara hasil baca streaming membuat tabel streaming.
Secara default, tampilan materialisasi dan nama tabel streaming disimpulkan dari nama fungsi. Contoh kode berikut menunjukkan sintaks dasar untuk membuat tampilan materialisasi dan tabel streaming:
Nota
Kedua fungsi mereferensikan tabel yang sama dalam katalog samples
dan menggunakan fungsi dekorator yang sama. Contoh-contoh ini menyoroti bahwa satu-satunya perbedaan dalam sintaks dasar untuk tampilan materialisasi dan tabel streaming adalah menggunakan spark.read
versus spark.readStream
.
Tidak semua sumber data mendukung pembacaan streaming. Beberapa sumber data harus selalu diproses dengan semantik streaming.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Secara opsional, Anda dapat menentukan nama tabel menggunakan argumen name
di dekorator @dlt.table
. Contoh berikut menunjukkan pola ini untuk tampilan materialisasi dan tabel streaming:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Memuat data dari penyimpanan objek
DLT mendukung pemuatan data dari semua format yang didukung oleh Azure Databricks. Lihat opsi format data .
Nota
Contoh-contoh ini menggunakan data yang tersedia di bawah /databricks-datasets
, yang dipasang secara otomatis ke ruang kerja Anda. Databricks merekomendasikan penggunaan jalur volume atau URI cloud untuk mereferensikan data yang disimpan dalam penyimpanan objek cloud. Pelajari , Apa itu Volume Katalog Unity?.
Databricks merekomendasikan penggunaan Auto Loader dan tabel streaming saat mengonfigurasi beban kerja penyerapan inkremental terhadap data yang disimpan di penyimpanan objek cloud. Lihat Apa itu Auto Loader?.
Contoh berikut membuat tabel streaming dari file JSON menggunakan Auto Loader:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Contoh berikut menggunakan semantik batch untuk membaca direktori JSON dan membuat tampilan materialisasi:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Memvalidasi data dengan ekspektasi
Anda dapat menggunakan ekspektasi untuk mengatur dan menerapkan batasan kualitas data. Lihat Mengelola kualitas data dengan ekspektasi alur.
Kode berikut menggunakan @dlt.expect_or_drop
untuk menentukan ekspektasi bernama valid_data
yang menghilangkan rekaman yang null selama penyerapan data:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Kueri tampilan materialisasi dan tabel streaming yang ditentukan dalam alur Anda
Contoh berikut mendefinisikan empat himpunan data:
- Tabel streaming bernama
orders
yang memuat data JSON. - Tampilan materialisasi bernama
customers
yang memuat data CSV. - Tampilan materialisasi bernama
customer_orders
yang menggabungkan rekaman dari himpunan dataorders
dancustomers
, mengubah tanda waktu pesanan menjadi tanggal, dan memilih bidangcustomer_id
,order_number
,state
, danorder_date
. - Tampilan materialisasi bernama
daily_orders_by_state
yang menggabungkan jumlah pesanan harian untuk setiap negara bagian.
Nota
Saat mengkueri tampilan atau tabel di alur, Anda dapat menentukan katalog dan skema secara langsung, atau Anda bisa menggunakan default yang dikonfigurasi di alur Anda. Dalam contoh ini, tabel orders
, customers
, dan customer_orders
ditulis dan dibaca dari katalog default dan skema yang dikonfigurasi untuk alur Anda.
Mode penerbitan lama menggunakan skema LIVE
untuk melakukan kueri tampilan materialisasi lain dan tabel streaming yang ditentukan dalam alur Anda. Dalam alur baru, sintaks skema LIVE
diabaikan secara diam-diam. Lihat skema LANGSUNG (warisan) .
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Membuatlah tabel dalam perulangan for
Anda dapat menggunakan perulangan Python for
untuk membuat beberapa tabel secara terprogram. Ini dapat berguna ketika Anda memiliki banyak sumber data atau himpunan data target yang bervariasi hanya dengan beberapa parameter, menghasilkan lebih sedikit total kode untuk dipertahankan dan lebih sedikit redundansi kode.
Perulangan for
mengevaluasi logika dalam urutan serial, tetapi setelah perencanaan selesai untuk himpunan data, alur menjalankan logika secara paralel.
Penting
Saat menggunakan pola ini untuk menentukan himpunan data, pastikan bahwa daftar nilai yang diteruskan ke perulangan for
selalu aditif. Jika himpunan data yang sebelumnya ditentukan dalam alur dihilangkan dari eksekusi alur di masa mendatang, himpunan data tersebut dihilangkan secara otomatis dari skema target.
Contoh berikut membuat lima tabel yang memfilter pesanan pelanggan menurut wilayah. Di sini, nama kawasan digunakan untuk menetapkan nama tampilan materialisasi target dan untuk memfilter data sumber. Tampilan sementara digunakan untuk menentukan gabungan dari tabel sumber yang digunakan dalam membangun tampilan materialisasi akhir.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Berikut ini adalah contoh grafik aliran data untuk alur ini:
Pemecahan masalah: perulangan for
membuat banyak tabel dengan nilai yang sama
Model eksekusi malas yang digunakan oleh pipeline untuk mengevaluasi kode Python mengharuskan logika Anda secara langsung mengacu pada nilai individual ketika fungsi yang dihiasi dengan @dlt.table()
dipanggil.
Contoh berikut menunjukkan dua pendekatan yang benar untuk menentukan tabel dengan perulangan for
. Dalam kedua contoh, setiap nama tabel dari daftar tables
secara eksplisit direferensikan dalam fungsi yang dihiasi oleh @dlt.table()
.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
Contoh berikut tidak merujuk referensi nilai secara benar. Contoh ini membuat tabel dengan nama yang berbeda, tetapi semua tabel memuat data dari nilai terakhir dalam perulangan for
:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)