Megosztás a következőn keresztül:


Adatok átalakítása adatfolyamokkal

Ez a cikk azt ismerteti, hogyan deklarálhatja az adathalmazok transzformációit a DLT használatával, és hogyan adhatja meg a rekordok lekérdezési logikával történő feldolgozását. Példákat is tartalmaz a DLT-folyamatok létrehozásához használt gyakori átalakítási mintákra.

Adathalmazt bármely olyan lekérdezéshez definiálhat, amely DataFrame-et ad vissza. Az Apache Spark beépített műveleteit, UDF-jeit, egyéni logikáját és MLflow-modelljeit használhatja átalakításként a DLT-folyamatban. Miután az adatokat betöltötte a DLT-folyamatba, új adatkészleteket definiálhat a felsőbb rétegbeli forrásokhoz új streamelési táblák, materializált nézetek és nézetek létrehozásához.

Az állapotalapú feldolgozás DLT használatával történő hatékony végrehajtásának megismeréséhez tekintse meg Az állapotalapú feldolgozás optimalizálása a DLT-ben vízjelekkel.

Mikor érdemes nézeteket, materializált nézeteket és streamelési táblákat használni?

A folyamat-lekérdezések megvalósításakor válassza ki a legjobb adathalmaztípust, hogy azok hatékonyak és karbantarthatók legyenek.

Fontolja meg egy nézet használatát a következők végrehajtásához:

  • A nagyobb vagy összetettebb lekérdezések könnyebben kezelhetővé alakíthatók.
  • Köztes eredmények ellenőrzése elvárások alapján.
  • Csökkentse a tárolási és számítási költségeket azokra az eredményekre, amelyeket nem szükséges megőrizni. Mivel a táblák materializáltak, további számítási és tárolási erőforrásokat igényelnek.

Érdemes materializált nézetet használni, ha:

  • Több alsóbb rétegbeli lekérdezés is használja a táblát. Mivel a nézetek igény szerint vannak kiszámítva, a nézetet a rendszer minden alkalommal újra kiszámítja, amikor lekérdezik a nézetet.
  • Más folyamatok, feladatok vagy lekérdezések is felhasználják a táblát. Mivel a nézetek nem materializáltak, csak ugyanabban a folyamatban használhatja őket.
  • A fejlesztés során meg szeretné tekinteni egy lekérdezés eredményeit. Mivel a táblák materializáltak, és megtekinthetők és lekérdezhetők a folyamaton kívül, a táblák használata a fejlesztés során segíthet a számítások helyességének ellenőrzésében. Az ellenőrzést követően konvertálja a materializálást nem igénylő lekérdezéseket nézetekké.

Érdemes lehet streamelési táblázatot használni a következő esetekben:

  • A lekérdezések egy folyamatosan vagy növekményesen növekvő adatforráson alapulnak.
  • A lekérdezési eredményeket növekményesen kell kiszámítani.
  • A folyamatnak nagy átviteli sebességre és alacsony késésre van szüksége.

Jegyzet

A streamelési táblák mindig a streamelési forrásokhoz vannak definiálva. Használhatja a streamforrásokat is a APPLY CHANGES INTO-val a CDC-hírcsatornák frissítéseinek alkalmazására. Lásd: A MÓDOSÍTÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése A DLThasználatával.

Táblák kizárása a célsémából

Ha nem külső használatra szánt köztes táblákat kell kiszámítania, megakadályozhatja, hogy közzétegyék őket egy sémában a TEMPORARY kulcsszóval. Az ideiglenes táblák továbbra is a DLT szemantikája szerint tárolják és dolgozzák fel az adatokat, de nem érhetők el az aktuális folyamaton kívül. Az ideiglenes tábla az azt létrehozó adatfolyam teljes élettartama alatt megmarad. Ideiglenes táblák deklarálásához használja az alábbi szintaxist:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Piton

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

Adatfolyam-táblák és materializált nézetek egyesítése egyetlen adatfeldolgozási folyamatban.

A streamelési táblák öröklik az Apache Spark strukturált streamelés feldolgozási garanciáit, és úgy vannak konfigurálva, hogy csak hozzáfűző adatforrásokból származó lekérdezéseket dolgozzanak fel, ahol az új sorok mindig a forrástáblába kerülnek módosítás helyett.

Jegyzet

Bár alapértelmezés szerint a streamelési táblák csak hozzáfűző adatforrásokat igényelnek, ha a streamforrás egy másik, frissítéseket vagy törléseket igénylő streamelési tábla, ezt a viselkedést felülbírálhatja a skipChangeCommits jelölővel.

A gyakori adatstreamelési minta magában foglalja a forrásadatok betöltését a kezdeti adatkészletek létrehozásához egy folyamatban. Ezeket a kezdeti adatkészleteket gyakran bronztábláknak nevezik, és gyakran hajtanak végre egyszerű átalakításokat.

Ezzel szemben az adatfeldolgozási folyamat utolsó táblái, más néven aranytáblák, gyakran bonyolult összesítéseket igényelnek, vagy egy APPLY CHANGES INTO művelet célpontjaiból történő adatok olvasását. Mivel ezek a műveletek alapvetően frissítéseket hoznak létre a hozzáfűzések helyett, a streamelési táblák bemeneteként nem támogatottak. Ezek az átalakítások jobban megfelelnek a materializált nézeteknek.

A streamelési táblák és a materializált nézetek egyetlen folyamatba való keverésével egyszerűsítheti a folyamatot, elkerülheti a nyers adatok költséges újrabetöltését vagy újrafeldolgozását, és az SQL teljes mértékben képes összetett összesítéseket kiszámítani egy hatékonyan kódolt és szűrt adathalmazon keresztül. Az alábbi példa az ilyen típusú vegyes feldolgozást szemlélteti:

Jegyzet

Ezek a példák az Automatikus betöltő használatával töltik be a fájlokat a felhőbeli tárolóból. Ha unitykatalógus-kompatibilis folyamat automatikus betöltőjével szeretne fájlokat betölteni, külső helyeket kell használnia. A Unity Catalog DLT-sel való használatáról további információt A Unity katalógus használata DLT-folyamatokkalcímű témakörben talál.

Piton

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

További információ az Automatikus betöltő használatáról JSON-fájlok növekményes betöltéséhez az Azure Storage-ból.

stream-statikus illesztések

A stream-statikus illesztések jó választásnak számítanak a csak hozzáfűző adatok folyamatos adatfolyamának denormalizálásakor, elsősorban statikus dimenziótáblával.

Az egyes folyamatfrissítések során a streamből származó új rekordok a statikus tábla legfrissebb pillanatképével lesznek összekapcsolva. Ha a rekordok a streamelési tábla megfelelő adatainak feldolgozása után hozzáadódnak vagy frissülnek a statikus táblában, az eredményül kapott rekordok csak akkor lesznek újraszámolva, ha teljes frissítést hajtanak végre.

Az aktivált végrehajtáshoz konfigurált folyamatokban a statikus tábla a frissítés indításakor adja vissza az eredményeket. A folyamatos végrehajtásra konfigurált folyamatokban a rendszer a statikus tábla legújabb verzióját kérdezi le minden alkalommal, amikor a tábla feldolgoz egy frissítést.

A következő példa egy stream-statikus illesztésre:

Piton

@dlt.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

Aggregátumok hatékony kiszámítása

Streamelési táblák használatával növekményesen kiszámíthatja az olyan egyszerű eloszlási aggregátumokat, mint a darabszám, a min, a max vagy az összeg, valamint az algebrai aggregátumokat, például az átlagot vagy a szórást. A Databricks növekményes összesítést javasol korlátozott számú csoporttal rendelkező lekérdezésekhez, például GROUP BY country záradékkal rendelkező lekérdezésekhez. Minden frissítés csak új bemeneti adatokat olvas be.

Ha többet szeretne tudni a növekményes összesítéseket végrehajtó DLT-lekérdezések írásáról, olvassa el Ablakos aggregációk végrehajtása vízjelekkelcímű témakört.

MLflow-modellek használata DLT-folyamatban

Jegyzet

Ha MLflow-modelleket szeretne használni unitykatalógus-kompatibilis folyamatban, a folyamatot úgy kell konfigurálni, hogy az preview csatornát használja. A current csatorna használatához konfigurálnia kell a folyamatot a Hive metaadattárban való közzétételre.

A DLT-folyamatokban MLflow-betanított modelleket is használhat. Az MLflow-modellek átalakításként vannak kezelve az Azure Databricksben, ami azt jelenti, hogy Spark DataFrame-bemeneten működnek, és Spark DataFrame-ként adnak vissza eredményeket. Mivel a DLT adatkészleteket definiál a DataFrame-eken, az MLflow-t használó Apache Spark-számítási feladatokat csak néhány sornyi kóddal konvertálhatja DLT-vé. További információ az MLflow-ról: MLflow a generatív mesterséges intelligencia ügynökhöz és az ML-modell életciklusához.

Ha már van egy Python-jegyzetfüzete, amely MLflow-modellt hív meg, ezt a kódot a DLT-hez igazíthatja a @dlt.table dekorátor használatával, és biztosíthatja, hogy a függvények definiálva legyenek az átalakítási eredmények visszaadásához. A DLT alapértelmezés szerint nem telepíti az MLflow-t, ezért ellenőrizze, hogy telepítette-e az MLflow könyvtárakat a %pip install mlflow-lal, és importálta-e a mlflow-et és a dlt-t a jegyzetfüzet tetején. A DLT szintaxisának bemutatása: Folyamatkód fejlesztése Python-.

Az MLflow-modellek DLT-ben való használatához hajtsa végre a következő lépéseket:

  1. Szerezze be az MLflow-modell futtatási azonosítóját és modellnevét. A futtatási azonosító és a modell neve az MLflow-modell URI-jának létrehozásához használatos.
  2. Az URI használatával definiáljon egy Spark UDF-et az MLflow-modell betöltéséhez.
  3. Az MLflow-modell használatához hívja meg a tábladefiníciókban lévő UDF-et.

Az alábbi példa a minta alapszintaxisát mutatja be:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Teljes példaként az alábbi kód egy loaded_model_udf nevű Spark UDF-et határoz meg, amely betölti a hitelkockázati adatokra betanított MLflow-modellt. Az előrejelzéshez használt adatoszlopokat a rendszer argumentumként továbbítja a UDF-nek. A tábla loan_risk_predictions kiszámítja a loan_risk_input_dataegyes sorainak előrejelzéseit.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Manuális törlések vagy frissítések megőrzése

A DLT lehetővé teszi a rekordok manuális törlését vagy frissítését egy táblából, és frissítési műveletet hajthat végre az alsóbb rétegbeli táblák újrafordításához.

A DLT alapértelmezés szerint minden folyamat frissítésekor a bemeneti adatok alapján újrakomponálta a táblaeredményeket, ezért gondoskodnia kell arról, hogy a törölt rekord ne legyen újra betöltve a forrásadatokból. A pipelines.reset.allowed táblatulajdonság false beállítása megakadályozza a táblák frissítését, de nem akadályozza meg, hogy növekményes írások folyjanak a táblákba vagy új adatok a táblába.

Az alábbi ábra egy példát mutat be két streamelési táblázat használatával:

  • raw_user_table nyers felhasználói adatokat használ fel egy forrásból.
  • bmi_table fokozatosan számítja ki a BMI-értékeket a raw_user_tablesúlyából és magasságából.

Manuálisan szeretné törölni vagy frissíteni a felhasználói rekordokat a raw_user_table-ban, és újraszámolni a bmi_table-et.

Adatdiagram megőrzése

Az alábbi kód bemutatja, hogyan állítsuk be a pipelines.reset.allowed táblatulajdonságot false-re a raw_user_table teljes frissítésének letiltásához, hogy a tervezett módosítások idővel megmaradjanak, de a lejjebb elhelyezkedő táblák újraszámításra kerülnek egy összekötő frissítése futtatásakor.

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);