Bagikan melalui


Mengembangkan kode alur dengan Python

DLT memperkenalkan beberapa konstruksi kode Python baru untuk menentukan tampilan materialisasi dan tabel streaming dalam alur. Dukungan Python untuk mengembangkan alur dibangun berdasarkan dasar-dasar PySpark DataFrame dan API Streaming Terstruktur.

Untuk pengguna yang tidak terbiasa dengan Python dan DataFrames, Databricks merekomendasikan penggunaan antarmuka SQL. Lihat Mengembangkan kode alur dengan SQL.

Untuk referensi lengkap sintaks DLT Python, lihat referensi bahasa DLT Python.

Dasar-dasar Python untuk pengembangan alur

Kode Python yang membuat himpunan data DLT harus mengembalikan DataFrames.

Semua API DLT Python diimplementasikan dalam modul dlt. Kode alur DLT Anda yang diterapkan dengan Python harus secara eksplisit mengimpor modul dlt di bagian atas buku catatan dan file Python.

Membaca dan menulis default ke katalog dan skema yang ditentukan selama konfigurasi alur. Lihat Mengatur katalog target dan mengatur skema.

Kode Python khusus DLT berbeda dari jenis kode Python lainnya dengan satu cara penting: Kode alur Python tidak secara langsung memanggil fungsi yang melakukan penyerapan dan transformasi data untuk membuat himpunan data DLT. Sebagai gantinya, DLT menginterpretasikan fungsi dekorator dari modul dlt di semua file kode sumber yang dikonfigurasi dalam alur dan membangun grafik aliran data.

Penting

Untuk menghindari perilaku tak terduga saat alur Anda berjalan, jangan sertakan kode yang mungkin memiliki efek samping dalam fungsi Anda yang menentukan himpunan data. Untuk mempelajari lebih lanjut, lihat referensi Python.

Membuat tampilan materialisasi atau tabel streaming dengan Python

Dekorator @dlt.table memberi tahu DLT untuk membuat tampilan terwujud atau tabel streaming berdasarkan hasil yang dikembalikan oleh fungsi. Hasil baca batch membuat tampilan materialisasi, sementara hasil baca streaming membuat tabel streaming.

Secara default, tampilan materialisasi dan nama tabel streaming disimpulkan dari nama fungsi. Contoh kode berikut menunjukkan sintaks dasar untuk membuat tampilan materialisasi dan tabel streaming:

Nota

Kedua fungsi mereferensikan tabel yang sama dalam katalog samples dan menggunakan fungsi dekorator yang sama. Contoh-contoh ini menyoroti bahwa satu-satunya perbedaan dalam sintaks dasar untuk tampilan materialisasi dan tabel streaming adalah menggunakan spark.read versus spark.readStream.

Tidak semua sumber data mendukung pembacaan streaming. Beberapa sumber data harus selalu diproses dengan semantik streaming.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Secara opsional, Anda dapat menentukan nama tabel menggunakan argumen name di dekorator @dlt.table. Contoh berikut menunjukkan pola ini untuk tampilan materialisasi dan tabel streaming:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Memuat data dari penyimpanan objek

DLT mendukung pemuatan data dari semua format yang didukung oleh Azure Databricks. Lihat opsi format data .

Nota

Contoh-contoh ini menggunakan data yang tersedia di bawah /databricks-datasets, yang dipasang secara otomatis ke ruang kerja Anda. Databricks merekomendasikan penggunaan jalur volume atau URI cloud untuk mereferensikan data yang disimpan dalam penyimpanan objek cloud. Pelajari , Apa itu Volume Katalog Unity?.

Databricks merekomendasikan penggunaan Auto Loader dan tabel streaming saat mengonfigurasi beban kerja penyerapan inkremental terhadap data yang disimpan di penyimpanan objek cloud. Lihat Apa itu Auto Loader?.

Contoh berikut membuat tabel streaming dari file JSON menggunakan Auto Loader:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Contoh berikut menggunakan semantik batch untuk membaca direktori JSON dan membuat tampilan materialisasi:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Memvalidasi data dengan ekspektasi

Anda dapat menggunakan ekspektasi untuk mengatur dan menerapkan batasan kualitas data. Lihat Mengelola kualitas data dengan ekspektasi alur.

Kode berikut menggunakan @dlt.expect_or_drop untuk menentukan ekspektasi bernama valid_data yang menghilangkan rekaman yang null selama penyerapan data:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Kueri tampilan materialisasi dan tabel streaming yang ditentukan dalam alur Anda

Contoh berikut mendefinisikan empat himpunan data:

  • Tabel streaming bernama orders yang memuat data JSON.
  • Tampilan materialisasi bernama customers yang memuat data CSV.
  • Tampilan materialisasi bernama customer_orders yang menggabungkan rekaman dari himpunan data orders dan customers, mengubah tanda waktu pesanan menjadi tanggal, dan memilih bidang customer_id, order_number, state, dan order_date.
  • Tampilan materialisasi bernama daily_orders_by_state yang menggabungkan jumlah pesanan harian untuk setiap negara bagian.

Nota

Saat mengkueri tampilan atau tabel di alur, Anda dapat menentukan katalog dan skema secara langsung, atau Anda bisa menggunakan default yang dikonfigurasi di alur Anda. Dalam contoh ini, tabel orders, customers, dan customer_orders ditulis dan dibaca dari katalog default dan skema yang dikonfigurasi untuk alur Anda.

Mode penerbitan lama menggunakan skema LIVE untuk melakukan kueri tampilan materialisasi lain dan tabel streaming yang ditentukan dalam alur Anda. Dalam alur baru, sintaks skema LIVE diabaikan secara diam-diam. Lihat skema LANGSUNG (warisan) .

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Membuatlah tabel dalam perulangan for

Anda dapat menggunakan perulangan Python for untuk membuat beberapa tabel secara terprogram. Ini dapat berguna ketika Anda memiliki banyak sumber data atau himpunan data target yang bervariasi hanya dengan beberapa parameter, menghasilkan lebih sedikit total kode untuk dipertahankan dan lebih sedikit redundansi kode.

Perulangan for mengevaluasi logika dalam urutan serial, tetapi setelah perencanaan selesai untuk himpunan data, alur menjalankan logika secara paralel.

Penting

Saat menggunakan pola ini untuk menentukan himpunan data, pastikan bahwa daftar nilai yang diteruskan ke perulangan for selalu aditif. Jika himpunan data yang sebelumnya ditentukan dalam alur dihilangkan dari eksekusi alur di masa mendatang, himpunan data tersebut dihilangkan secara otomatis dari skema target.

Contoh berikut membuat lima tabel yang memfilter pesanan pelanggan menurut wilayah. Di sini, nama kawasan digunakan untuk menetapkan nama tampilan materialisasi target dan untuk memfilter data sumber. Tampilan sementara digunakan untuk menentukan gabungan dari tabel sumber yang digunakan dalam membangun tampilan materialisasi akhir.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Berikut ini adalah contoh grafik aliran data untuk alur ini:

Grafik aliran data dari dua tampilan yang mengarah ke lima tabel regional.

Pemecahan masalah: perulangan for membuat banyak tabel dengan nilai yang sama

Model eksekusi malas yang digunakan oleh pipeline untuk mengevaluasi kode Python mengharuskan logika Anda secara langsung mengacu pada nilai individual ketika fungsi yang dihiasi dengan @dlt.table() dipanggil.

Contoh berikut menunjukkan dua pendekatan yang benar untuk menentukan tabel dengan perulangan for. Dalam kedua contoh, setiap nama tabel dari daftar tables secara eksplisit direferensikan dalam fungsi yang dihiasi oleh @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Contoh berikut tidak merujuk referensi nilai secara benar. Contoh ini membuat tabel dengan nama yang berbeda, tetapi semua tabel memuat data dari nilai terakhir dalam perulangan for:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)