Bagikan melalui


Memuat dan memproses data secara bertahap dengan alur DLT

Artikel ini menjelaskan apa itu alur dan bagaimana Anda dapat menggunakan alur di alur DLT untuk memproses data secara bertahap dari sumber ke tabel streaming target. Di DLT, alur didefinisikan dengan dua cara:

  1. Alur ditentukan secara otomatis saat Anda membuat kueri yang memperbarui tabel streaming.
  2. DLT juga menyediakan fungsionalitas untuk secara eksplisit menentukan alur untuk pemrosesan yang lebih kompleks seperti menambahkan ke tabel streaming dari beberapa sumber streaming.

Artikel ini membahas alur implisit yang dibuat saat Anda menentukan kueri untuk memperbarui tabel streaming, lalu menyediakan detail tentang sintaks untuk menentukan alur yang lebih kompleks.

Apa itu alur?

Di DLT, alur adalah kueri streaming yang memproses data sumber secara bertahap untuk memperbarui tabel streaming target. Sebagian besar dataset DLT yang Anda buat dalam pipeline menentukan arus sebagai bagian dari kueri dan tidak memerlukan pendefinisian arus secara eksplisit. Misalnya, Anda membuat tabel streaming di DLT dalam satu perintah DDL alih-alih menggunakan tabel terpisah dan pernyataan alur untuk membuat tabel streaming:

Nota

Contoh CREATE FLOW ini disediakan hanya untuk tujuan ilustrasi dan menyertakan kata kunci yang tidak valid sintaks DLT.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Selain alur default yang ditentukan oleh kueri, antarmuka DLT Python dan SQL menyediakan fungsionalitas alur penambahan. Alur penambahan mendukung pemrosesan yang memerlukan pembacaan data dari beberapa sumber streaming untuk memperbarui satu tabel streaming. Misalnya, Anda dapat menggunakan fungsionalitas alur penambahan saat Anda memiliki tabel dan alur streaming yang ada dan ingin menambahkan sumber streaming baru yang menulis ke tabel streaming yang ada ini.

Menggunakan alur tambahan untuk menulis ke tabel streaming dari beberapa aliran sumber

Gunakan dekorator @append_flow di antarmuka Python atau klausa CREATE FLOW di antarmuka SQL untuk menulis ke tabel streaming dari beberapa sumber streaming. Gunakan alur tambahan untuk memproses tugas seperti berikut ini:

  • Tambahkan sumber streaming yang menambahkan data ke tabel streaming yang ada tanpa memerlukan refresh penuh. Misalnya, Anda mungkin memiliki tabel yang menggabungkan data regional dari setiap wilayah tempat Anda beroperasi. Saat wilayah baru diluncurkan, Anda dapat menambahkan data wilayah baru ke tabel tanpa melakukan refresh penuh. Lihat Contoh : Menulis ke tabel streaming dari beberapa topik Kafka.
  • Perbarui tabel streaming dengan menambahkan data historis yang hilang (backfilling). Misalnya, Anda memiliki tabel streaming yang sudah ada yang ditulis oleh topik Apache Kafka. Anda juga memiliki data historis yang disimpan dalam tabel yang perlu Anda sisipkan tepat sekali ke dalam tabel streaming, dan Anda tidak dapat mengalirkan data karena pemrosesan Anda mencakup melakukan agregasi kompleks sebelum menyisipkan data. Lihat Contoh : Menjalankan pengisian ulang data satu kali.
  • Gabungkan data dari beberapa sumber dan tulis ke satu tabel streaming alih-alih menggunakan klausa UNION dalam kueri. Menggunakan pemrosesan alur tambahan alih-alih UNION memungkinkan Anda memperbarui tabel target secara bertahap tanpa menjalankan pembaruan refresh penuh . Lihat Contoh : Gunakan pemrosesan alur tambahan alih-alih UNION.

Target untuk output rekaman oleh pemrosesan alur tambahan dapat berupa tabel yang sudah ada atau tabel baru. Untuk kueri Python, gunakan fungsi create_streaming_table() untuk membuat tabel target.

Penting

  • Jika Anda perlu menentukan batasan kualitas data dengan ekspektasi , tentukan harapan pada tabel target sebagai bagian dari fungsi create_streaming_table() atau pada definisi tabel yang ada. Anda tidak dapat menentukan ekspektasi dalam definisi @append_flow.
  • Alur diidentifikasi dengan nama alur , dan nama ini digunakan untuk mengidentifikasi titik pemeriksaan streaming. Penggunaan nama alur untuk mengidentifikasi titik pemeriksaan berarti sebagai berikut:
    • Jika alur yang ada dalam alur diganti namanya, titik pemeriksaan tidak dilakukan, dan alur yang diganti namanya secara efektif merupakan alur yang sama sekali baru.
    • Anda tidak dapat menggunakan kembali nama alur proses dalam rangkaian proses, karena checkpoint yang ada tidak akan cocok dengan definisi alur proses yang baru.

Berikut ini adalah sintaks untuk @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  append_target BY NAME
SELECT * FROM
  source;

Contoh: Menulis ke tabel streaming dari berbagai topik Kafka

Contoh berikut membuat tabel streaming bernama kafka_target dan menulis ke tabel streaming tersebut dari dua topik Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Untuk mempelajari selengkapnya tentang fungsi read_kafka() bernilai tabel yang digunakan dalam kueri SQL, lihat read_kafka dalam referensi bahasa SQL.

Di Python, Anda dapat secara terprogram membuat beberapa alur yang menargetkan satu tabel. Contoh berikut menunjukkan pola ini untuk daftar topik Kafka.

Nota

Pola ini memiliki persyaratan yang sama seperti menggunakan perulangan for untuk membuat tabel. Anda harus secara eksplisit meneruskan nilai Python ke fungsi yang menentukan alur. Lihat Membuat tabel dalam for perulangan.

import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Contoh : Menjalankan isi ulang data satu kali

Contoh berikut menjalankan kueri untuk menambahkan data historis ke tabel streaming:

Nota

Untuk memastikan isi ulang satu kali yang benar saat kueri isi ulang adalah bagian dari alur yang berjalan secara terjadwal atau terus menerus, hapus kueri setelah menjalankan alur sekali. Untuk menambahkan data baru jika tiba di direktori pengisian ulang, pertahankan kueri pada posisinya.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  STREAM read_files(
    "path/to/sourceDir",
    format => "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  STREAM read_files(
    "path/to/backfill/data/dir",
    format => "csv"
  );

Contoh : Gunakan pemrosesan alur tambahan alih-alih UNION

Alih-alih menggunakan kueri dengan klausul UNION, Anda bisa menggunakan kueri alur tambahan untuk menggabungkan beberapa sumber dan menulis ke satu tabel streaming. Menggunakan kueri alur tambahan alih-alih UNION memungkinkan Anda menambahkan ke tabel streaming dari beberapa sumber tanpa menjalankan refresh penuh .

Contoh Python berikut menyertakan kueri yang menggabungkan beberapa sumber data dengan klausa UNION:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Contoh berikut mengganti kueri UNION dengan kueri alur tambahan:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );