Memuat dan memproses data secara bertahap dengan alur DLT
Artikel ini menjelaskan apa itu alur dan bagaimana Anda dapat menggunakan alur di alur DLT untuk memproses data secara bertahap dari sumber ke tabel streaming target. Di DLT, alur didefinisikan dengan dua cara:
- Alur ditentukan secara otomatis saat Anda membuat kueri yang memperbarui tabel streaming.
- DLT juga menyediakan fungsionalitas untuk secara eksplisit menentukan alur untuk pemrosesan yang lebih kompleks seperti menambahkan ke tabel streaming dari beberapa sumber streaming.
Artikel ini membahas alur implisit yang dibuat saat Anda menentukan kueri untuk memperbarui tabel streaming, lalu menyediakan detail tentang sintaks untuk menentukan alur yang lebih kompleks.
Apa itu alur?
Di DLT, alur adalah kueri streaming yang memproses data sumber secara bertahap untuk memperbarui tabel streaming target. Sebagian besar dataset DLT yang Anda buat dalam pipeline menentukan arus sebagai bagian dari kueri dan tidak memerlukan pendefinisian arus secara eksplisit. Misalnya, Anda membuat tabel streaming di DLT dalam satu perintah DDL alih-alih menggunakan tabel terpisah dan pernyataan alur untuk membuat tabel streaming:
Nota
Contoh CREATE FLOW
ini disediakan hanya untuk tujuan ilustrasi dan menyertakan kata kunci yang tidak valid sintaks DLT.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Selain alur default yang ditentukan oleh kueri, antarmuka DLT Python dan SQL menyediakan fungsionalitas alur penambahan. Alur penambahan mendukung pemrosesan yang memerlukan pembacaan data dari beberapa sumber streaming untuk memperbarui satu tabel streaming. Misalnya, Anda dapat menggunakan fungsionalitas alur penambahan saat Anda memiliki tabel dan alur streaming yang ada dan ingin menambahkan sumber streaming baru yang menulis ke tabel streaming yang ada ini.
Menggunakan alur tambahan untuk menulis ke tabel streaming dari beberapa aliran sumber
Gunakan dekorator @append_flow
di antarmuka Python atau klausa CREATE FLOW
di antarmuka SQL untuk menulis ke tabel streaming dari beberapa sumber streaming. Gunakan alur tambahan untuk memproses tugas seperti berikut ini:
- Tambahkan sumber streaming yang menambahkan data ke tabel streaming yang ada tanpa memerlukan refresh penuh. Misalnya, Anda mungkin memiliki tabel yang menggabungkan data regional dari setiap wilayah tempat Anda beroperasi. Saat wilayah baru diluncurkan, Anda dapat menambahkan data wilayah baru ke tabel tanpa melakukan refresh penuh. Lihat Contoh : Menulis ke tabel streaming dari beberapa topik Kafka.
- Perbarui tabel streaming dengan menambahkan data historis yang hilang (backfilling). Misalnya, Anda memiliki tabel streaming yang sudah ada yang ditulis oleh topik Apache Kafka. Anda juga memiliki data historis yang disimpan dalam tabel yang perlu Anda sisipkan tepat sekali ke dalam tabel streaming, dan Anda tidak dapat mengalirkan data karena pemrosesan Anda mencakup melakukan agregasi kompleks sebelum menyisipkan data. Lihat Contoh : Menjalankan pengisian ulang data satu kali.
- Gabungkan data dari beberapa sumber dan tulis ke satu tabel streaming alih-alih menggunakan klausa
UNION
dalam kueri. Menggunakan pemrosesan alur tambahan alih-alihUNION
memungkinkan Anda memperbarui tabel target secara bertahap tanpa menjalankan pembaruan refresh penuh . Lihat Contoh : Gunakan pemrosesan alur tambahan alih-alihUNION
.
Target untuk output rekaman oleh pemrosesan alur tambahan dapat berupa tabel yang sudah ada atau tabel baru. Untuk kueri Python, gunakan fungsi create_streaming_table() untuk membuat tabel target.
Penting
- Jika Anda perlu menentukan batasan kualitas data dengan ekspektasi , tentukan harapan pada tabel target sebagai bagian dari fungsi
create_streaming_table()
atau pada definisi tabel yang ada. Anda tidak dapat menentukan ekspektasi dalam definisi@append_flow
. - Alur diidentifikasi dengan nama alur , dan nama ini digunakan untuk mengidentifikasi titik pemeriksaan streaming. Penggunaan nama alur untuk mengidentifikasi titik pemeriksaan berarti sebagai berikut:
- Jika alur yang ada dalam alur diganti namanya, titik pemeriksaan tidak dilakukan, dan alur yang diganti namanya secara efektif merupakan alur yang sama sekali baru.
- Anda tidak dapat menggunakan kembali nama alur proses dalam rangkaian proses, karena checkpoint yang ada tidak akan cocok dengan definisi alur proses yang baru.
Berikut ini adalah sintaks untuk @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
append_target BY NAME
SELECT * FROM
source;
Contoh: Menulis ke tabel streaming dari berbagai topik Kafka
Contoh berikut membuat tabel streaming bernama kafka_target
dan menulis ke tabel streaming tersebut dari dua topik Kafka:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Untuk mempelajari selengkapnya tentang fungsi read_kafka()
bernilai tabel yang digunakan dalam kueri SQL, lihat read_kafka dalam referensi bahasa SQL.
Di Python, Anda dapat secara terprogram membuat beberapa alur yang menargetkan satu tabel. Contoh berikut menunjukkan pola ini untuk daftar topik Kafka.
Nota
Pola ini memiliki persyaratan yang sama seperti menggunakan perulangan for
untuk membuat tabel. Anda harus secara eksplisit meneruskan nilai Python ke fungsi yang menentukan alur. Lihat Membuat tabel dalam for
perulangan.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Contoh : Menjalankan isi ulang data satu kali
Contoh berikut menjalankan kueri untuk menambahkan data historis ke tabel streaming:
Nota
Untuk memastikan isi ulang satu kali yang benar saat kueri isi ulang adalah bagian dari alur yang berjalan secara terjadwal atau terus menerus, hapus kueri setelah menjalankan alur sekali. Untuk menambahkan data baru jika tiba di direktori pengisian ulang, pertahankan kueri pada posisinya.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
STREAM read_files(
"path/to/sourceDir",
format => "csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
STREAM read_files(
"path/to/backfill/data/dir",
format => "csv"
);
Contoh : Gunakan pemrosesan alur tambahan alih-alih UNION
Alih-alih menggunakan kueri dengan klausul UNION
, Anda bisa menggunakan kueri alur tambahan untuk menggabungkan beberapa sumber dan menulis ke satu tabel streaming. Menggunakan kueri alur tambahan alih-alih UNION
memungkinkan Anda menambahkan ke tabel streaming dari beberapa sumber tanpa menjalankan refresh penuh .
Contoh Python berikut menyertakan kueri yang menggabungkan beberapa sumber data dengan klausa UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Contoh berikut mengganti kueri UNION
dengan kueri alur tambahan:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);