Referensi bahasa DLT Python
Artikel ini memiliki detail untuk antarmuka pemrograman DLT Python.
Untuk informasi tentang SQL API, lihat referensi bahasa DLT SQL.
Untuk detail khusus untuk mengonfigurasi Auto Loader, lihat Apa itu Auto Loader?.
Sebelum Anda mulai
Berikut ini adalah pertimbangan penting saat Anda menerapkan alur dengan antarmuka DLT Python:
- Karena fungsi Python
table()
danview()
dipanggil beberapa kali selama perencanaan dan berjalannya pembaruan alur, jangan sertakan kode dalam salah satu fungsi ini yang mungkin memiliki efek samping (misalnya, kode yang memodifikasi data atau mengirim email). Untuk menghindari perilaku tak terduga, fungsi Python Anda yang menentukan himpunan data hanya boleh menyertakan kode yang diperlukan untuk menentukan tabel atau tampilan. - Untuk melakukan operasi seperti mengirim email atau mengintegrasikan dengan layanan pemantauan eksternal, terutama dalam fungsi yang menentukan himpunan data, gunakan kait peristiwa. Menerapkan operasi ini dalam fungsi yang menentukan himpunan data Anda akan menyebabkan perilaku yang tidak terduga.
- Fungsi Python
table
danview
harus mengembalikan DataFrame. Beberapa fungsi yang beroperasi pada DataFrames tidak mengembalikan DataFrames dan tidak boleh digunakan. Operasi ini mencakup fungsi seperticollect()
,count()
,toPandas()
,save()
, dansaveAsTable()
. Karena transformasi DataFrame dijalankan setelah grafik aliran data lengkap telah diselesaikan, menggunakan operasi tersebut mungkin memiliki efek samping yang tidak diinginkan.
Mengimpor modul Python dlt
Fungsi DLT Python didefinisikan dalam modul dlt
. Alur Anda yang diimplementasikan dengan Python API harus mengimpor modul ini:
import dlt
Membuat tampilan terwujud DLT atau tabel streaming
Di Python, DLT menentukan apakah akan memperbarui himpunan data sebagai tampilan materialisasi atau tabel streaming berdasarkan kueri yang menentukan. Dekorator @table
dapat digunakan untuk menentukan tampilan materialisasi dan tabel streaming.
Untuk menentukan tampilan materialisasi di Python, terapkan @table
ke kueri yang melakukan baca statis terhadap sumber data. Untuk menentukan tabel streaming, terapkan @table
ke kueri yang melakukan pembacaan streaming terhadap sumber data atau gunakan fungsi create_streaming_table(). Kedua jenis himpunan data memiliki spesifikasi sintaks yang sama seperti berikut:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Membuat tampilan DLT
Untuk menentukan tampilan di Python, terapkan dekorator @view
. Seperti dekorator @table
, Anda dapat menggunakan tampilan di DLT untuk himpunan data statis atau streaming. Berikut ini adalah sintaks untuk menentukan tampilan dengan Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Contoh: Menentukan tabel dan tampilan
Untuk menentukan tabel atau tampilan di Python, terapkan dekorator @dlt.view
atau @dlt.table
ke fungsi. Anda dapat menggunakan nama fungsi atau parameter name
untuk menetapkan nama tabel atau tampilan. Contoh berikut mendefinisikan dua himpunan data yang berbeda: tampilan yang disebut taxi_raw
yang mengambil file JSON sebagai sumber input dan tabel yang disebut filtered_data
yang mengambil tampilan taxi_raw
sebagai input:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
Contoh: Mengakses himpunan data yang ditentukan dalam alur yang sama
Nota
Meskipun fungsi dlt.read()
dan dlt.read_stream()
masih tersedia dan didukung sepenuhnya oleh antarmuka DLT Python, Databricks merekomendasikan untuk selalu menggunakan fungsi spark.read.table()
dan spark.readStream.table()
karena hal berikut:
- Fungsi
spark
mendukung pembacaan himpunan data internal dan eksternal, termasuk himpunan data di penyimpanan eksternal atau ditentukan dalam alur lain. Fungsidlt
hanya mendukung pembacaan himpunan data internal. - Fungsi
spark
mendukung penentuan opsi, sepertiskipChangeCommits
, untuk operasi pembacaan. Menentukan opsi tidak didukung oleh fungsidlt
.
Untuk mengakses himpunan data yang ditentukan dalam alur yang sama, gunakan fungsi spark.read.table()
atau spark.readStream.table()
:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
Nota
Saat mengkueri tampilan atau tabel di alur, Anda dapat menentukan katalog dan skema secara langsung, atau Anda bisa menggunakan default yang dikonfigurasi di alur Anda. Dalam contoh ini, tabel customers
ditulis dan dibaca dari katalog dan skema default yang dikonfigurasi untuk alur Anda.
Contoh: Membaca dari tabel yang terdaftar di metastore
Untuk membaca data dari tabel yang terdaftar di Hive metastore, dalam argumen fungsi, Anda dapat menyertakan nama basis data di depan nama tabel:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Untuk contoh membaca dari tabel Katalog Unity, lihat Menyerap data ke dalam alur Katalog Unity.
Contoh : Mengakses himpunan data menggunakan spark.sql
Anda juga bisa mengembalikan himpunan data menggunakan ekspresi spark.sql
dalam fungsi kueri. Untuk membaca dari himpunan data internal, Anda dapat membiarkan nama tidak memenuhi syarat untuk menggunakan katalog dan skema default, atau Anda dapat menambahkannya sebelumnya:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
Menghapus rekaman secara permanen dari tampilan terwujud atau tabel streaming
Untuk menghapus rekaman secara permanen dari tampilan materialisasi atau tabel streaming dengan vektor penghapusan diaktifkan, seperti untuk kepatuhan GDPR, operasi tambahan harus dilakukan pada tabel Delta dasar objek. Untuk memastikan penghapusan rekaman dari tampilan materialisasi, lihat Menghapus rekaman secara permanen dari tampilan materialisasi dengan vektor penghapusan diaktifkan. Untuk memastikan penghapusan rekaman dari tabel streaming, lihat Menghapus rekaman secara permanen dari tabel streaming.
Tulis ke layanan streaming peristiwa eksternal atau tabel Delta dengan API DLT sink
Penting
API sink
DLT berada dalam Pratinjau Umum.
Nota
- Menjalankan pembaruan penyegaran penuh tidak menghapus data dari sink. Setiap data yang diolah ulang akan ditambahkan ke sink, dan data yang ada tidak akan diubah.
- Harapan DLT tidak didukung dengan API
sink
.
Untuk menulis ke layanan streaming peristiwa seperti Apache Kafka atau Azure Event Hubs atau ke tabel Delta dari alur DLT, gunakan fungsi create_sink()
yang disertakan dalam modul Python dlt
. Setelah membuat sink dengan fungsi create_sink()
, Anda menggunakan sink dalam append flow untuk menulis data ke sink. alur tambahan adalah satu-satunya jenis alur yang didukung oleh fungsi create_sink()
. Jenis alur lainnya, seperti apply_changes
, tidak didukung.
Berikut ini adalah sintaks untuk membuat sink dengan fungsi create_sink()
:
create_sink(<sink_name>, <format>, <options>)
Argumen |
---|
name Jenis: str String yang mengidentifikasi sink dan digunakan untuk mereferensikan dan mengelola sink. Nama sink harus unik untuk pipeline, termasuk di semua kode sumber seperti notebook atau modul yang merupakan bagian dari pipeline. Parameter ini diperlukan. |
format Jenis: str String yang menentukan format output, baik kafka atau delta .Parameter ini diperlukan. |
options Jenis: dict Daftar opsi sink yang opsional, diformat sebagai {"key": "value"} , di mana kunci dan nilainya keduanya adalah string. Semua opsi Databricks Runtime yang didukung oleh sink Kafka dan Delta tersedia. Untuk opsi Kafka, lihat Mengonfigurasi penulis Streaming Terstruktur Kafka. Untuk opsi Delta, lihat tabel Delta sebagai sink. |
Contoh : Membuat sink Kafka dengan fungsi create_sink()
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
Contoh : Membuat sink Delta dengan fungsi create_sink()
dan jalur sistem file
Contoh berikut membuat sebuah komponen sink yang menulis ke tabel Delta dengan cara meneruskan jalur sistem file ke tabel tersebut:
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
Contoh : Membuat "sink" Delta dengan fungsi create_sink()
dan nama tabel di Katalog Unity
Nota
Delta sink mendukung tabel eksternal dan terkelola dalam Unity Catalog serta tabel terkelola dalam metastore Hive. Nama tabel harus sepenuhnya memenuhi syarat. Misalnya, tabel Katalog Unity harus menggunakan pengidentifikasi tiga tingkat: <catalog>.<schema>.<table>
. Di tabel metastore Apache Hive harus menggunakan <schema>.<table>
.
Contoh berikut membuat sink yang menulis ke tabel Delta dengan memberikan nama tabel di Katalog Unity.
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
Contoh : Gunakan alur penamangan untuk menulis ke sink Delta
Contoh berikut membuat sink yang menulis ke tabel Delta lalu membuat alur tambahan untuk menulis ke sink tersebut:
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
Contoh: Gunakan alur tambahan untuk menulis ke sink Kafka
Contoh berikut membuat sink yang menulis ke topik Kafka lalu membuat alur penambahan untuk menulis ke sink tersebut:
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Skema DataFrame yang ditulis ke Kafka harus menyertakan kolom yang ditentukan dalam Mengonfigurasi penulis Streaming Terstruktur Kafka.
Membuat tabel untuk digunakan sebagai target operasi streaming
Gunakan fungsi create_streaming_table()
untuk membuat tabel target yang digunakan untuk output rekaman dari operasi streaming, termasuk apply_changes(), apply_changes_from_snapshot(), dan rekaman keluaran @append_flow.
Nota
Fungsi create_target_table()
dan create_streaming_live_table()
tidak digunakan lagi. Databricks merekomendasikan pembaruan kode yang ada untuk menggunakan fungsi create_streaming_table()
.
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumen |
---|
name Jenis: str Nama tabel. Parameter ini diperlukan. |
comment Jenis: str Deskripsi opsional untuk tabel. |
spark_conf Jenis: dict Daftar opsional konfigurasi Spark untuk eksekusi kueri ini. |
table_properties Jenis: dict Daftar opsional properti tabel untuk tabel. |
partition_cols Jenis: array Daftar opsional dari satu atau beberapa kolom yang akan digunakan untuk mempartisi tabel. |
cluster_by Jenis: array Secara opsional aktifkan pengklusteran cair pada tabel dan tentukan kolom yang akan digunakan sebagai kunci pengklusteran. Lihat Menggunakan pengklusteran cair untuk tabel Delta. |
path Jenis: str Lokasi penyimpanan opsional untuk data tabel. Jika tidak ditentukan, sistem secara otomatis menggunakan lokasi penyimpanan pipeline. |
schema Jenis: str atau StructType Definisi skema opsional untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL atau dengan Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Jenis: dict Batasan kualitas data opsional untuk tabel. Lihat beragam ekspektasi. |
row_filter (Pratinjau Umum)Jenis: str Klausa filter baris opsional untuk tabel. Lihat Menerbitkan tabel dengan filter baris dan masker kolom. |
Mengontrol bagaimana tabel diwujudkan
Tabel juga menawarkan kontrol tambahan atas materialisasinya:
- Tentukan cara mengelompokkan tabel menggunakan
cluster_by
. Anda dapat menggunakan pemodelan kluster cair (liquid clustering) untuk mempercepat kueri. Lihat Menggunakan pengklusteran cair untuk tabel Delta. - Tentukan bagaimana tabel dipartisi dengan menggunakan
partition_cols
. - Anda bisa mengatur properti tabel saat menentukan tampilan atau tabel. Lihat properti tabel DLT.
- Atur lokasi penyimpanan untuk data tabel menggunakan pengaturan
path
. Secara default, data tabel disimpan di lokasi penyimpanan alur jikapath
tidak diatur. - Anda dapat menggunakan kolom yang dihasilkan dalam definisi skema Anda. Lihat Contoh : Tentukan skema dan kolom kluster.
Nota
Untuk tabel berukuran kurang dari 1 TB, Databricks merekomendasikan untuk mengizinkan DLT mengontrol organisasi data. Anda sebaiknya tidak menentukan kolom partisi kecuali jika Anda mengharapkan tabel Anda tumbuh melebihi satu terabyte.
Contoh : Tentukan skema dan kolom kluster
Anda dapat secara opsional menentukan skema tabel menggunakan StructType
Python atau string SQL DDL. Ketika ditentukan dengan string DDL, definisi dapat menyertakan kolom yang dihasilkan.
Contoh berikut membuat tabel yang disebut sales
dengan skema yang ditentukan menggunakan Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Contoh berikut menentukan skema untuk tabel menggunakan string DDL, menentukan kolom yang dihasilkan, dan menentukan kolom pengklusteran:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
Secara default, DLT menyimpulkan skema dari definisi table
jika Anda tidak menentukan skema.
Contoh : Tentukan kolom partisi
Contoh berikut menentukan skema untuk tabel menggunakan string DDL, menentukan kolom yang dihasilkan, dan menentukan kolom partisi:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Contoh: Menentukan batasan tabel
Penting
Batasan tabel ada di pratinjau publik.
Saat menentukan skema, Anda dapat menentukan kunci primer dan asing. Batasan bersifat informasi dan tidak diberlakukan. Lihat klausa CONSTRAINT dalam referensi bahasa SQL.
Contoh berikut mendefinisikan tabel dengan batasan kunci primer dan asing:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Contoh: Menentukan filter baris dan masker kolom
Untuk membuat tampilan materialisasi atau Tabel streaming dengan filter baris dan masker kolom, gunakan klausa ROW FILTER dan klausa MASK . Contoh berikut menunjukkan cara menentukan tampilan materialisasi dan tabel Streaming dengan filter baris dan masker kolom:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Untuk informasi selengkapnya tentang filter baris dan masker kolom, lihat Terbitkan tabel dengan filter baris dan masker kolom.
Mengonfigurasi tabel streaming untuk mengabaikan perubahan dalam tabel streaming sumber
Nota
- Bendera
skipChangeCommits
hanya berfungsi ketika digunakan bersamaspark.readStream
melalui fungsioption()
. Anda tidak dapat menggunakan bendera ini dalam fungsidlt.read_stream()
. - Anda tidak dapat menggunakan bendera
skipChangeCommits
saat tabel streaming sumber didefinisikan sebagai target fungsi apply_changes().
Secara default, tabel streaming memerlukan sumber khusus tambahan. Saat tabel streaming menggunakan tabel streaming lain sebagai sumber, dan tabel streaming sumber memerlukan pembaruan atau penghapusan, misalnya, pemrosesan GDPR "right to be forgotten", flag skipChangeCommits
dapat diatur saat membaca tabel streaming sumber untuk mengabaikan perubahan tersebut. Untuk informasi selengkapnya tentang bendera ini, lihat Mengabaikan pembaruan dan menghapus.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Sifat Python DLT
Tabel berikut ini menjelaskan opsi dan properti yang bisa Anda tentukan saat menentukan tabel dan tampilan dengan DLT:
@table atau @view |
---|
name Jenis: str Nama opsional untuk tabel atau tampilan. Jika tidak ditentukan, nama fungsi digunakan sebagai nama tabel atau tampilan. |
comment Jenis: str Deskripsi opsional untuk tabel. |
spark_conf Jenis: dict Daftar opsional konfigurasi Spark untuk eksekusi kueri ini. |
table_properties Jenis: dict Daftar opsional properti tabel untuk tabel. |
path Jenis: str Lokasi penyimpanan opsional untuk data tabel. Jika tidak diatur, sistem akan menggunakan lokasi penyimpanan pipeline. |
partition_cols Jenis: a collection of str Koleksi opsional, misalnya, list dari satu atau beberapa kolom yang akan digunakan untuk mempartisi tabel. |
cluster_by Jenis: array Secara opsional aktifkan pengklusteran cair pada tabel dan tentukan kolom yang akan digunakan sebagai kunci pengklusteran. Lihat Menggunakan pengklusteran cair untuk tabel Delta. |
schema Jenis: str atau StructType Definisi skema opsional untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL atau dengan Python StructType . |
temporary Jenis: bool Buat tabel, tetapi jangan terbitkan metadata untuk tabel. Kata kunci temporary menginstruksikan DLT untuk membuat tabel yang tersedia untuk digunakan oleh jalur tetapi tidak boleh diakses di luar jalur. Untuk mengurangi waktu pemrosesan, tabel sementara bertahan selama masa operasi jalur pemrosesan (pipeline) yang membuatnya, dan bukan hanya untuk satu pembaruan saja.Defaultnya adalah 'False'. |
row_filter (Pratinjau Umum)Jenis: str Klausa filter baris opsional untuk tabel. Lihat Menerbitkan tabel dengan filter baris dan masker kolom. |
Definisi tabel atau tampilan |
---|
def <function-name>() Fungsi Python yang menentukan himpunan data. Jika parameter name tidak diatur, maka <function-name> digunakan sebagai nama himpunan data target. |
query Pernyataan Spark SQL yang mengembalikan Himpunan Data Spark atau Koalas DataFrame. Gunakan dlt.read() atau spark.read.table() untuk melakukan bacaan lengkap dari himpunan data yang ditentukan dalam alur yang sama. Untuk membaca himpunan data eksternal, gunakan fungsi spark.read.table() . Anda tidak dapat menggunakan dlt.read() untuk membaca himpunan data eksternal. Karena spark.read.table() dapat digunakan untuk membaca himpunan data internal, himpunan data yang ditentukan di luar alur saat ini, dan memungkinkan Anda menentukan opsi untuk membaca data, Databricks merekomendasikan untuk menggunakannya alih-alih fungsi dlt.read() .Saat Anda menentukan himpunan data dalam alur, secara default akan menggunakan katalog dan skema yang ditentukan dalam konfigurasi alur. Anda dapat menggunakan fungsi spark.read.table() untuk membaca pada dataset yang ditentukan dalam alur dengan tidak menggunakan kualifikasi. Misalnya, untuk membaca dari himpunan data bernama customers :spark.read.table("customers") Anda juga dapat menggunakan fungsi spark.read.table() untuk membaca dari tabel yang terdaftar di metastore dengan secara opsional memenuhi syarat nama tabel dengan nama database:spark.read.table("sales.customers") Gunakan dlt.read_stream() atau spark.readStream.table() untuk melakukan pembacaan streaming dari himpunan data yang ditentukan dalam alur yang sama. Untuk melakukan pembacaan streaming dari himpunan data eksternal, gunakanFungsi spark.readStream.table() . Karena spark.readStream.table() dapat digunakan untuk membaca himpunan data internal, himpunan data yang ditentukan di luar alur saat ini, dan memungkinkan Anda menentukan opsi untuk membaca data, Databricks merekomendasikan untuk menggunakannya alih-alih fungsi dlt.read_stream() .Untuk menentukan kueri dalam fungsi table DLT menggunakan sintaks SQL, gunakan fungsi spark.sql . Lihat Contoh : Mengakses himpunan data menggunakan spark.sql . Untuk menentukan kueri dalam fungsi table DLT menggunakan Python, gunakan sintaks PySpark. |
Harapan |
---|
@expect("description", "constraint") Menyatakan batasan kualitas data yang diidentifikasi oleh description . Jika baris melanggar harapan, sertakan baris dalam himpunan data target. |
@expect_or_drop("description", "constraint") Menyatakan batasan kualitas data yang diidentifikasi oleh description . Jika sebuah baris tidak memenuhi ekspektasi, hapus baris tersebut dari himpunan data target. |
@expect_or_fail("description", "constraint") Menyatakan batasan kualitas data yang diidentifikasi oleh description . Jika sebuah baris tidak memenuhi ekspektasi, segera hentikan eksekusi. |
@expect_all(expectations) Nyatakan satu atau beberapa batasan kualitas data. expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu ekspektasi, sertakan baris dalam himpunan data target. |
@expect_all_or_drop(expectations) Nyatakan satu atau beberapa batasan kualitas data. expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu ekspektasi, hilangkan baris dari himpunan data target. |
@expect_all_or_fail(expectations) Nyatakan satu atau beberapa batasan kualitas data. expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu ketentuan yang diharapkan, segera hentikan pelaksanaan. |
Mengubah pengambilan data dari umpan perubahan dengan Python di DLT
Gunakan fungsi apply_changes()
di API Python untuk menggunakan fungsionalitas DLT change data capture (CDC) untuk memproses data sumber dari umpan data perubahan (CDF).
Penting
Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema tabel target apply_changes()
, Anda harus menyertakan kolom __START_AT
dan __END_AT
dengan tipe data yang sama dengan bidang sequence_by
.
Untuk membuat tabel target yang diperlukan, Anda dapat menggunakan fungsi create_streaming_table() di antarmuka DLT Python.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Nota
Untuk pemrosesan APPLY CHANGES
, event INSERT
dan UPDATE
memiliki perilaku default untuk upsert event CDC dari sumber: memperbarui baris dalam tabel target yang cocok dengan kunci yang ditentukan atau menyisipkan baris baru jika catatan yang cocok tidak ada di tabel tersebut. Penanganan untuk peristiwa DELETE
dapat ditentukan dengan kondisi APPLY AS DELETE WHEN
.
Untuk mempelajari selengkapnya tentang pemrosesan CDC dengan umpan perubahan, lihat API APPLY CHANGES: Menyederhanakan perubahan pengambilan data dengan DLT. Untuk contoh penggunaan fungsi apply_changes()
, lihat contoh : pemrosesan SCD tipe 1 dan SCD tipe 2 dengan data sumber CDF.
Penting
Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema tabel target apply_changes
, Anda harus menyertakan kolom __START_AT
dan __END_AT
dengan tipe data yang sama dengan bidang sequence_by
.
Lihat API TERAPKAN PERUBAHAN: Menyederhanakan pengambilan data perubahan dengan DLT.
Mengubah pengambilan data dari rekam jepret database dengan Python di DLT
Penting
API APPLY CHANGES FROM SNAPSHOT
berada dalam Pratinjau Umum.
Gunakan fungsi apply_changes_from_snapshot()
di API Python untuk menggunakan fungsionalitas DLT change data capture (CDC) untuk memproses data sumber dari rekam jepret database.
Penting
Anda harus menentukan tabel streaming target agar dapat menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema tabel target apply_changes_from_snapshot()
, Anda juga harus menyertakan kolom __START_AT
dan __END_AT
dengan tipe data yang sama dengan bidang sequence_by
.
Untuk membuat tabel target yang diperlukan, Anda dapat menggunakan fungsi create_streaming_table() di antarmuka DLT Python.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Nota
Untuk pemrosesan APPLY CHANGES FROM SNAPSHOT
, perilaku defaultnya adalah menyisipkan baris baru saat rekaman yang cocok dengan kunci yang sama tidak ada di target. Jika rekaman yang cocok memang ada, rekaman tersebut hanya diperbarui jika salah satu nilai dalam baris telah berubah. Baris yang memiliki kunci yang ada di target tetapi sudah tidak ada di sumber akan dihapus.
Untuk mempelajari selengkapnya tentang pemrosesan CDC dengan rekam jepret, lihat API APPLY CHANGES: Menyederhanakan perubahan pengambilan data dengan DLT. Untuk contoh penggunaan fungsi apply_changes_from_snapshot()
, lihat contoh penyerapan cuplikan berkala dan penyerapan cuplikan historis .
Argumen |
---|
target Jenis: str Nama tabel yang akan diperbarui. Anda dapat menggunakan fungsi create_streaming_table() untuk membuat tabel target sebelum menjalankan fungsi apply_changes() .Parameter ini diperlukan. |
source Jenis: str atau lambda function Baik nama tabel atau tampilan untuk rekam jepret secara berkala atau fungsi lambda Python yang mengembalikan DataFrame rekam jepret yang akan diproses dan versi rekam jepret. Lihat Menerapkan argumen source .Parameter ini diperlukan. |
keys Jenis: list Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk rekaman tertentu dalam tabel target. Anda dapat menentukan:
Argumen untuk fungsi col() tidak dapat menyertakan kualifikator. Misalnya, Anda dapat menggunakan col(userId) , tetapi Anda tidak dapat menggunakan col(source.userId) .Parameter ini diperlukan. |
stored_as_scd_type Jenis: str atau int Apakah akan menyimpan rekaman sebagai SCD tipe 1 atau SCD tipe 2. Atur ke 1 untuk SCD tipe 1 atau 2 untuk SCD tipe 2.Klausa ini bersifat opsional. Defaultnya adalah SCD tipe 1. |
track_history_column_list track_history_except_column_list Jenis: list Sekumpulan kolom keluaran yang akan dilacak untuk sejarah dalam tabel target. Gunakan track_history_column_list untuk menentukan daftar lengkap kolom yang akan dilacak. Pakaitrack_history_except_column_list untuk menentukan kolom yang akan dikecualikan dari pelacakan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() :
Argumen untuk fungsi col() tidak dapat menyertakan pengklasifikasian. Misalnya, Anda dapat menggunakan col(userId) , tetapi Anda tidak dapat menggunakan col(source.userId) .Parameter ini bersifat opsional. Defaultnya adalah menyertakan semua kolom dalam tabel target jika tidak ada track_history_column_list atauargumen track_history_except_column_list diteruskan ke fungsi . |
Terapkan argumen source
Fungsi apply_changes_from_snapshot()
menyertakan argumen source
. Untuk memproses rekam jepret historis, argumen source
diharapkan menjadi fungsi lambda Python yang mengembalikan dua nilai ke fungsi apply_changes_from_snapshot()
: Python DataFrame yang berisi data rekam jepret yang akan diproses dan versi rekam jepret.
Berikut ini adalah tanda tangan fungsi lambda:
lambda Any => Optional[(DataFrame, Any)]
- Argumen untuk fungsi lambda adalah versi rekam jepret yang terakhir diproses.
- Nilai pengembalian dari fungsi lambda adalah
None
atau tuple dari dua nilai: Nilai pertama dari tuple adalah DataFrame yang berisi cuplikan yang akan diproses. Nilai kedua dari tupel adalah versi cuplikan yang mewakili urutan logis dari cuplikan tersebut.
Contoh yang mengimplementasikan dan memanggil fungsi lambda:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Runtime DLT melakukan langkah-langkah berikut setiap kali alur yang berisi fungsi apply_changes_from_snapshot()
dipicu:
- Menjalankan fungsi
next_snapshot_and_version
untuk memuat DataFrame rekam jepret berikutnya dan versi rekam jepret yang sesuai. - Jika tidak ada DataFrame yang dikembalikan, eksekusi dihentikan dan pembaruan alur ditandai sebagai selesai.
- Mendeteksi perubahan dalam rekam jepret baru dan menerapkannya secara bertahap ke tabel target.
- Kembali ke langkah #1 untuk memuat rekam jepret berikutnya dan versinya.
Batasan
Antarmuka DLT Python memiliki batasan berikut:
Fungsi pivot()
tidak didukung. Operasi pivot
di Spark memerlukan pemrosesan awal data input untuk menghitung skema output. Kemampuan ini tidak didukung di DLT.