Megosztás a következőn keresztül:


Adatok növekményes betöltése és feldolgozása DLT-folyamatokkal

Ez a cikk bemutatja, hogy mik azok a folyamatok, és hogyan használhatja a DLT-folyamatokban lévő folyamatokat az adatok forrásból egy célstreamelési táblába történő növekményes feldolgozásához. A DLT-ben a folyamatok kétféleképpen vannak definiálva:

  1. Amikor létrehoz egy streamelési táblát frissítő lekérdezést, a folyamat automatikusan meghatározásra kerül.
  2. A DLT emellett olyan funkciókat is biztosít, amelyek explicit módon definiálják a folyamatokat összetettebb feldolgozáshoz, például több streamforrásból származó streamtáblához való hozzáfűzést.

Ez a cikk azokat az implicit folyamatokat ismerteti, amelyek akkor jönnek létre, amikor lekérdezést határoz meg egy streamelési tábla frissítéséhez, majd részletesen ismerteti a szintaxist az összetettebb folyamatok definiálásához.

Mi az a folyamat?

A DLT-ben a folyamat egy streamelési lekérdezés, amely a forrásadatokat növekményesen dolgozza fel a célstreamelési tábla frissítéséhez. A folyamatokban létrehozott DLT-adathalmazok többsége a lekérdezés részeként határozza meg a folyamatot, és nincs szükség a folyamat explicit meghatározására. Létrehozhat például egy streamelési táblát a DLT-ben egyetlen DDL-parancsban ahelyett, hogy külön táblázatokat és folyamatutasságokat használ a streamelési tábla létrehozásához:

Jegyzet

Ez a CREATE FLOW példa csak szemléltető célokra szolgál, és olyan kulcsszavakat tartalmaz, amelyek nem érvényesek A DLT szintaxisa.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

A lekérdezés által definiált alapértelmezett folyamat mellett a DLT Python és SQL felületei hozzáfűzési folyamat funkciót is biztosítanak. A hozzáfűzési folyamat támogatja a feldolgozást, amelyhez több streamforrás adatainak beolvasása szükséges egyetlen streamelési tábla frissítéséhez. Használhatja például a hozzáfűzési folyamat funkciót, ha már rendelkezik egy meglévő streamelési táblával és -folyamatokkal, és szeretne hozzáadni egy új streamforrást, amely ebbe a meglévő streamelési táblába ír.

A hozzáfűzési folyamat használata több forrásstreamből származó streamelési táblázatba való íráshoz

A @append_flow dekoratőrrel a Python-felületen vagy az SQL-felületen található CREATE FLOW záradékkal több streamforrásból származó streamelési táblába írhat. Használja a hozzáfűzési folyamatot a következő feladatok feldolgozásához:

  • Olyan streamelési források hozzáadása, amelyek adatokat fűznek egy meglévő streamelési táblához teljes frissítés nélkül. Lehet például, hogy van egy táblázata, amely a regionális adatokat egyesíti minden régióból, amelyben dolgozik. Az új régiók bevezetésekor teljes frissítés nélkül hozzáadhatja az új régióadatokat a táblához. Lásd: Példa: Írás streamelési táblába több Kafka-témakörből.
  • Frissítsen egy streamelési táblát hiányzó előzményadatok hozzáfűzésével (backfilling). Például van egy meglévő folyamatosan frissülő táblázat, amelybe egy Apache Kafka téma ír. Olyan táblában tárolt előzményadatokat is tárol, amelyeket pontosan egyszer kell beszúrnia a streamelési táblába, és nem tudja streamelni az adatokat, mert a feldolgozás magában foglalja az adatok beszúrása előtt végzett összetett összesítést. Lásd : Példa: Egy egyszeri adatvisszatöltés futtatása.
  • Több forrásból származó adatok egyesítése és írás egyetlen streamelési táblába a lekérdezés UNION záradékának használata helyett. A hozzáfűzési folyamat használata UNION helyett lehetővé teszi, hogy fokozatosan frissítse a céltáblát anélkül, hogy lefuttatna egy teljes frissítést. Lásd Példa: Használja a hozzáfűzési folyamatfeldolgozást UNIONhelyett.

A hozzáfűzési folyamat feldolgozásának rekordkimenetének célja lehet egy meglévő tábla vagy egy új tábla. Python-lekérdezések esetén a create_streaming_table() függvénnyel hozzon létre egy céltáblát.

Fontos

  • Ha elvárásokkalkell meghatároznia az adatminőségre vonatkozó korlátozásokat, az elvárásokat a céltáblán a create_streaming_table() függvény részeként vagy egy meglévő tábladefinícióban határozhatja meg. A @append_flow definícióban nem definiálhat elvárásokat.
  • A folyamatokat egy folyamatnévazonosítja, és ez a név a streamelési ellenőrzőpontok azonosítására szolgál. A folyamatnév használata az ellenőrzőpont azonosításához a következőket jelenti:
    • Ha egy meglévő folyamatot egy folyamatláncban átneveznek, az ellenőrzőpont nem öröklődik, és az átnevezett folyamat gyakorlatilag teljesen új folyamatnak számít.
    • A folyamat neve nem használható újra, mert a meglévő ellenőrzőpont nem felel meg az új folyamatdefiníciónak.

A @append_flowszintaxisa a következő:

Piton

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  append_target BY NAME
SELECT * FROM
  source;

Példa: Írás streamelési táblába több Kafka-témakörből

Az alábbi példák létrehoznak egy kafka_target nevű streamelési táblát, és két Kafka-témakörből írnak a streamelési táblába:

Piton

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Az SQL-lekérdezésekben használt read_kafka() táblaértékű függvényről az SQL nyelvi referenciájában read_kafka talál további információt.

A Pythonban programozott módon hozhat létre több folyamatot, amelyek egyetlen táblát céloznak meg. Az alábbi példa ezt a mintát mutatja be a Kafka-témakörök listájához.

Jegyzet

Ez a minta ugyanazokkal a követelményekkel rendelkezik, mint egy for hurok használata táblák létrehozásához. Explicit módon át kell adnia egy Python-értéket a folyamatot meghatározó függvénynek. Lásd a -t: Táblázatok létrehozása egy for ciklusban.

import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Példa: Egyszeri adatvisszatöltés futtatása

Az alábbi példák egy lekérdezést futtatnak az előzményadatok streamelési táblához való hozzáfűzéséhez:

Jegyzet

Ha gondoskodni szeretne arról, hogy a visszatöltési lekérdezés egy ütemezett vagy folyamatosan futó folyamat része, akkor a folyamat egyszeri futtatása után távolítsa el a lekérdezést. Ha új adatokat szeretne hozzáfűzni, ha az a backfill könyvtárba érkezik, hagyja a lekérdezést helyben.

Piton

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  STREAM read_files(
    "path/to/sourceDir",
    format => "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  STREAM read_files(
    "path/to/backfill/data/dir",
    format => "csv"
  );

Példa: Használja a hozzáfűzési folyamat feldolgozását UNION helyett

Ahelyett, hogy UNION záradékkal rendelkező lekérdezést használ, a hozzáfűző folyamatlekérdezésekkel több forrást egyesíthet, és egyetlen streamelési táblába írhat. A hozzáfűzési művelet lekérdezések használata UNION helyett lehetővé teszi, hogy több forrásból is hozzáfűzzön egy folyamatosan frissített táblához anélkül, hogy egy teljes frissítést futtatna.

A következő Python-példa egy olyan lekérdezést tartalmaz, amely több adatforrást egyesít egy UNION záradékkal:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Az alábbi példák a UNION lekérdezést fűzőfolyamat-lekérdezésekre cserélik:

Piton

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );