Bagikan melalui


Mengubah data dengan alur

Artikel ini menjelaskan cara menggunakan DLT untuk mendeklarasikan transformasi pada himpunan data dan menentukan bagaimana rekaman diproses melalui logika kueri. Ini juga berisi contoh pola transformasi umum untuk membangun alur DLT.

Anda dapat menentukan himpunan data terhadap kueri apa pun yang mengembalikan DataFrame. Anda dapat menggunakan operasi bawaan Apache Spark, UDF, logika kustom, dan model MLflow sebagai transformasi dalam alur DLT Anda. Setelah data diimpor ke dalam alur DLT, Anda dapat menentukan himpunan data baru berdasarkan sumber hulu untuk membuat tabel streaming baru, tampilan materialisasi, dan tampilan.

Untuk mempelajari cara melakukan pemrosesan stateful secara efektif dengan DLT, lihat Cara mengoptimalkan pemrosesan stateful pada DLT menggunakan marka air.

Kapan menggunakan tampilan, tampilan materialisasi, dan tabel streaming

Saat menerapkan kueri alur Anda, pilih jenis himpunan data terbaik untuk memastikannya efisien dan dapat dipertahankan.

Pertimbangkan untuk menggunakan tampilan untuk melakukan hal berikut:

  • Pecahkan kueri besar atau kompleks yang Anda inginkan menjadi kueri yang lebih mudah dikelola.
  • Validasi hasil perantara menggunakan ekspektasi.
  • Kurangi biaya penyimpanan dan komputasi untuk hasil yang tidak perlu Anda pertahankan. Karena tabel terwujud, tabel memerlukan sumber daya komputasi dan penyimpanan tambahan.

Pertimbangkan untuk menggunakan tampilan materialisasi saat:

  • Beberapa kueri lanjutan memproses tabel. Karena tampilan dihitung sesuai permintaan, tampilan dihitung ulang setiap kali tampilan dikueri.
  • Alur, pekerjaan, atau kueri lainnya memanfaatkan tabel. Karena tampilan tidak terwujud, Anda hanya dapat menggunakannya dalam alur yang sama.
  • Anda ingin menampilkan hasil kueri selama pengembangan. Karena tabel terwujud dan dapat dilihat dan dikueri di luar alur, menggunakan tabel selama pengembangan dapat membantu memvalidasi kebenaran komputasi. Setelah memvalidasi, konversi kueri yang tidak memerlukan materialisasi menjadi view.

Pertimbangkan untuk menggunakan tabel streaming saat:

  • Kueri didefinisikan terhadap sumber data yang terus menerus atau bertambah secara bertahap.
  • Hasil kueri harus dihitung secara bertahap.
  • Alur membutuhkan throughput tinggi dan latensi rendah.

Nota

Tabel streaming selalu didefinisikan berdasarkan sumber streaming. Anda juga dapat menggunakan sumber streaming dengan APPLY CHANGES INTO untuk menerapkan pembaruan dari umpan CDC. Lihat API TERAPKAN PERUBAHAN: Menyederhanakan penangkapan data perubahan dengan DLT.

Mengecualikan tabel dari skema target

Jika Anda harus menghitung tabel perantara yang tidak ditujukan untuk konsumsi eksternal, Anda dapat mencegahnya diterbitkan ke skema menggunakan kata kunci TEMPORARY. Tabel sementara masih menyimpan dan memproses data sesuai dengan semantik DLT tetapi tidak boleh diakses di luar alur saat ini. Tabel sementara bertahan selama masa pakai pipeline yang dibuat. Gunakan sintaks berikut untuk mendeklarasikan tabel sementara:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

Menggabungkan tabel streaming dan tampilan materialisasi dalam satu alur

Tabel streaming mewarisi jaminan pemrosesan Apache Spark Structured Streaming dan dikonfigurasi untuk memproses kueri dari sumber data khusus tambahan, di mana baris baru selalu dimasukkan ke dalam tabel sumber daripada dimodifikasi.

Nota

Meskipun, secara default, tabel streaming memerlukan sumber data khusus tambahan, ketika sumber streaming adalah tabel streaming lain yang memerlukan pembaruan atau penghapusan, Anda dapat mengambil alih perilaku ini dengan bendera skipChangeCommits.

Pola streaming umum melibatkan penyerapan data sumber untuk membuat himpunan data awal dalam alur. Himpunan data awal ini biasanya disebut tabel perunggu dan sering melakukan transformasi sederhana.

Sebaliknya, tabel akhir dalam alur, umumnya disebut tabel emas, sering memerlukan agregasi atau pembacaan yang rumit dari target operasi APPLY CHANGES INTO. Karena operasi ini secara inheren membuat pembaruan daripada penampingan, operasi ini tidak didukung sebagai input ke tabel streaming. Transformasi ini lebih cocok untuk tampilan materialisasi.

Dengan mencampur tabel streaming dan tampilan materialisasi ke dalam satu alur, Anda dapat menyederhanakan alur Anda, menghindari penyerapan ulang atau pemrosesan ulang data mentah yang mahal, dan memiliki kekuatan penuh SQL untuk menghitung agregasi kompleks melalui himpunan data yang dikodekan dan difilter secara efisien. Contoh berikut mengilustrasikan jenis pemrosesan campuran ini:

Nota

Contoh-contoh ini menggunakan Auto Loader untuk memuat file dari penyimpanan cloud. Untuk memuat file dengan Auto Loader dalam alur Katalog Unity yang diaktifkan, Anda harus menggunakan lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Unity Catalog dengan DLT, lihat Menggunakan Unity Catalog dengan alur DLT Anda.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

Pelajari selengkapnya tentang menggunakan Auto Loader untuk menyerap file JSON secara bertahap dari penyimpanan Azure.

Gabungan Stream-statis

Gabungan statis aliran adalah pilihan yang baik saat mendenormalisasi aliran berkelanjutan data khusus tambahan dengan tabel dimensi statis utamanya.

Dengan setiap pembaruan alur, rekaman baru dari aliran digabungkan dengan rekam jepret terbaru dari tabel statis. Jika rekaman ditambahkan atau diperbarui dalam tabel statis setelah data terkait dari tabel streaming diproses, rekaman yang dihasilkan tidak dihitung ulang kecuali refresh penuh dilakukan.

Dalam alur yang dikonfigurasi untuk eksekusi yang dipicu, tabel statis mengembalikan hasil pada saat pembaruan dimulai. Dalam alur yang dikonfigurasi untuk eksekusi berkelanjutan, versi terbaru tabel statis dikueri setiap kali tabel memproses pembaruan.

Berikut ini adalah contoh penggabungan aliran-statis:

Python

@dlt.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

Menghitung agregat secara efisien

Anda dapat menggunakan tabel streaming untuk menghitung agregat distributif sederhana secara bertahap seperti jumlah, min, maks, atau jumlah, dan agregat aljabar seperti rata-rata atau simpangan baku. Databricks merekomendasikan agregasi bertahap untuk kueri dengan jumlah grup terbatas, seperti kueri dengan klausa GROUP BY country. Hanya data input baru yang dibaca dengan setiap pembaruan.

Untuk mempelajari selengkapnya tentang menulis kueri DLT yang melakukan agregasi inkremental, lihat Melakukan agregasi berbasis jendela dengan penanda batas waktu.

Menggunakan model MLflow dalam alur DLT

Nota

Untuk menggunakan model MLflow dalam alur yang mendukung Katalog Unity, alur Anda harus dikonfigurasi untuk menggunakan saluran preview. Untuk menggunakan saluran current, Anda harus mengonfigurasi alur Anda untuk menerbitkan ke metastore Hive.

Anda dapat menggunakan model yang dilatih oleh MLflow dalam pipeline DLT. Model MLflow diperlakukan sebagai transformasi di Azure Databricks, yang berarti model tersebut bertindak berdasarkan input Spark DataFrame dan mengembalikan hasil sebagai Spark DataFrame. Karena DLT mendefinisikan himpunan data terhadap DataFrames, Anda dapat mengonversi beban kerja Apache Spark yang menggunakan MLflow ke DLT hanya dengan beberapa baris kode. Untuk informasi selengkapnya tentang MLflow, lihat MLflow untuk agen AI gen dan siklus hidup model ML.

Jika Anda sudah memiliki notebook Python yang memanggil model MLflow, Anda dapat mengadaptasi kode ini ke DLT dengan menggunakan dekorator @dlt.table dan memastikan fungsi didefinisikan untuk mengembalikan hasil transformasi. DLT tidak menginstal MLflow secara default, jadi konfirmasikan bahwa Anda telah menginstal pustaka MLFlow dengan %pip install mlflow dan telah mengimpor mlflow dan dlt di bagian atas buku catatan Anda. Untuk pengenalan sintaks DLT, lihat Mengembangkan kode alur dengan Python.

Untuk menggunakan model MLflow di DLT, selesaikan langkah-langkah berikut:

  1. Dapatkan run ID dan nama model MLflow. ID eksekusi dan nama model digunakan untuk membangun URI model MLflow.
  2. Gunakan URI untuk menentukan Spark UDF untuk memuat model MLflow.
  3. Panggil UDF dalam definisi tabel Anda untuk menggunakan model MLflow.

Contoh berikut menunjukkan sintaks dasar untuk pola ini:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Sebagai contoh lengkap, kode berikut mendefinisikan Spark UDF bernama loaded_model_udf yang memuat model MLflow yang dilatih pada data risiko pinjaman. Kolom data yang digunakan untuk membuat prediksi diteruskan sebagai argumen ke UDF. Tabel loan_risk_predictions menghitung prediksi untuk setiap baris dalam loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Pertahankan penghapusan atau pembaruan manual

DLT memungkinkan Anda menghapus atau memperbarui rekaman secara manual dari tabel dan melakukan operasi refresh untuk mengolah ulang tabel hilir.

Secara default, DLT mengolah ulang hasil tabel berdasarkan data input setiap kali alur diperbarui, jadi Anda harus memastikan rekaman yang dihapus tidak dimuat ulang dari data sumber. Mengatur properti tabel pipelines.reset.allowed ke false mencegah refresh ke tabel tetapi tidak mencegah penulisan bertahap ke tabel atau data baru mengalir ke tabel.

Diagram berikut mengilustrasikan contoh menggunakan dua tabel streaming:

  • raw_user_table menyerap data pengguna mentah dari sumber.
  • bmi_table menghitung skor BMI secara bertahap menggunakan berat dan tinggi dari raw_user_table.

Anda ingin menghapus atau memperbarui catatan pengguna secara manual dari raw_user_table dan menghitung ulang bmi_table.

Mempertahankan diagram data

Kode berikut menunjukkan pengaturan properti tabel pipelines.reset.allowed ke false untuk menonaktifkan refresh penuh untuk raw_user_table sehingga perubahan yang dimaksudkan dipertahankan dari waktu ke waktu, tetapi tabel hilir dikomputasi ulang saat pembaruan alur dijalankan:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);