Megosztás a következőn keresztül:


Folyamatkód fejlesztése a Pythonnal

A DLT számos új Python-kódszerkezetet vezet be a folyamat materializált nézeteinek és streamtábláinak meghatározásához. A csővezetékek fejlesztésének Python-támogatása a PySpark DataFrame és a strukturált adatfolyam API-k alapjaira épül.

A Pythonnal és DataFrame-ekkel nem ismert felhasználók számára a Databricks az SQL-felület használatát javasolja. Lásd: Folyamatkód fejlesztése AZ SQLhasználatával.

A DLT Python szintaxisának teljes referenciáját DLT Python nyelvi referenciacímű témakörben talál.

A Python alapjai folyamatfejlesztéshez

A DLT-adathalmazokat létrehozó Python-kódnak DataFrame-eket kell visszaadnia.

Az összes DLT Python API a dlt modulban van implementálva. A Pythonnal implementált DLT-folyamatkódnak explicit módon importálnia kell a dlt modult a Python-jegyzetfüzetek és -fájlok tetején.

A folyamatkonfiguráció során megadott katalógusba és sémába beolvassa és beírja az alapértelmezett értéket. Lásd: Célkatalógus és sémabeállítása.

A DLT-specifikus Python-kód egy kritikus módon különbözik a Többi Python-kódtípustól: a Python-folyamatkód nem hívja meg közvetlenül az adatbetöltést és átalakítást végző függvényeket DLT-adathalmazok létrehozásához. Ahelyett, a DLT értelmezi a dlt modul dekorátorfüggvényeit az összes forráskódfájlban, amelyek folyamatba vannak konfigurálva, és egy adatfolyam gráfot hoz létre.

Fontos

A folyamat futtatásakor nem várt viselkedés elkerülése érdekében ne tartalmazzon olyan kódot, amely az adathalmazokat definiáló függvényekben esetlegesen mellékhatásokat okoz. A részletekért lásd a Python-referenciát.

Materializált nézet vagy streamelési tábla létrehozása a Pythonnal

A @dlt.table dekoratőr arra utasítja a DLT-t, hogy a függvény által visszaadott eredmények alapján hozzon létre materializált nézetet vagy streamelési táblát. A kötegelt olvasás eredményei materializált nézetet hoznak létre, míg a folyamatos olvasás eredményei folyamatos táblázatot hoznak létre.

Alapértelmezés szerint a materializált nézetek és a streamelő táblák neveit a függvénynevekből következtetik. Az alábbi példakód a materializált nézet és a streamelési tábla létrehozásának alapszintaxisát mutatja be:

Jegyzet

Mindkét függvény ugyanarra a táblára hivatkozik a samples katalógusban, és ugyanazt a dekorátor függvényt használja. Ezek a példák kiemelik, hogy a materializált nézetek és streamelési táblák alapszintaxisának egyetlen különbsége a spark.read és a spark.readStreamhasználata.

Nem minden adatforrás támogatja a streamelési olvasásokat. Egyes adatforrásokat mindig streamelési szemantikával kell feldolgozni.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Igény szerint megadhatja a tábla nevét a @dlt.table dekorátor name argumentumával. Az alábbi példa ezt a mintát mutatja be egy materializált nézethez és streamelési táblához:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Adatok betöltése objektumtárolóból

A DLT az Azure Databricks által támogatott összes formátumból támogatja az adatok betöltését. Lásd: Adatformátum beállításai.

Jegyzet

Ezek a példák a munkaterületre automatikusan csatlakoztatott /databricks-datasets alatt elérhető adatokat használják. A Databricks kötetútvonalak vagy felhőalapú URI-k használatát javasolja a felhőobjektum-tárolóban tárolt adatokra való hivatkozáshoz. Lásd Mik azok a Unity Catalog-kötetek?.

A Databricks az automatikus betöltő és a streamelő táblák használatát javasolja, amikor növekményes betöltési számítási feladatokat konfigurál a felhőobjektum-tárolóban tárolt adatokhoz. Lásd Mi az automatikus betöltő?.

Az alábbi példa létrehoz egy streamelési táblát JSON-fájlokból az Automatikus betöltő használatával:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Az alábbi példa kötegelt szemantika alkalmazásával olvas be egy JSON-könyvtárat és hoz létre egy materializált nézetet.

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Adatok validálása az elvárások alapján

Az elvárásokkal adatminőségi korlátozásokat állíthat be és kényszeríthet ki. Lásd: Az adatminőség kezelése a folyamatok elvárásaival.

Az alábbi kód @dlt.expect_or_drop használ egy valid_data nevű elvárás definiálásához, amely az adatbetöltés során null értékű rekordokat ad vissza:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

A folyamat során definiált materializált nézetek és streamelési táblák lekérdezése

Az alábbi példa négy adatkészletet határoz meg:

  • Egy JSON-adatokat betöltő orders nevű streamelési tábla.
  • A customers nevű materializált nézet, amely betölti a CSV-adatokat.
  • A orders és customers adathalmazok rekordjait összekapcsoló, customer_orders nevű materializált nézet dátumra veti a rendelési időbélyeget, és kiválasztja a customer_id, order_number, stateés order_date mezőket.
  • Egy daily_orders_by_state nevű materializált nézet, amely összesíti az egyes államok napi rendeléseinek számát.

Jegyzet

A folyamat nézeteinek vagy tábláinak lekérdezésekor közvetlenül megadhatja a katalógust és a sémát, vagy használhatja a folyamatban konfigurált alapértelmezett beállításokat. Ebben a példában a orders, customersés customer_orders táblák a folyamathoz konfigurált alapértelmezett katalógusból és sémából vannak megírva és olvasva.

Az örökölt közzétételi mód a LIVE sémával kérdezi le a folyamatban definiált egyéb materializált nézeteket és streamtáblákat. Az új folyamatokban a LIVE sémaszintaxisa csendben figyelmen kívül lesz hagyva. Lásd: LIVE séma (örökölt).

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Táblák létrehozása for ciklusban

A Python for ciklusokkal programozott módon hozhat létre több táblát. Ez akkor lehet hasznos, ha számos adatforrással vagy céladatkészletkel rendelkezik, amelyek csak néhány paramétertől függően változnak, így kevesebb teljes kóddal kell karbantartani és kevesebb kódredundanciát eredményezni.

A for ciklus soros sorrendben értékeli ki a logikát, de ha az adathalmazok tervezése befejeződött, a folyamat párhuzamosan futtatja a logikát.

Fontos

Ha ezt a mintát használja adathalmazok definiálásához, győződjön meg arról, hogy a for ciklusnak átadott értékek listája mindig additív. Ha egy folyamatban lévő munkafolyamatban korábban meghatározott adathalmazt kihagy egy későbbi folyamatfuttatásból, akkor az adathalmazt automatikusan eltávolítják a célsémából.

Az alábbi példa öt táblát hoz létre, amelyek régiónként szűrik az ügyfélrendeléseket. Itt a régiónév a cél materializált nézeteinek nevének beállítására és a forrásadatok szűrésére szolgál. Az ideiglenes nézetek a végleges materializált nézetek létrehozásához használt forrástáblákból származó illesztések definiálására szolgálnak.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Az alábbiakban egy példa látható a folyamat adatfolyam-grafikonjára:

Két nézetből álló adatfolyam-diagram, amely öt regionális táblába vezet.

Hibaelhárítás: for hurok számos táblát hoz létre ugyanazokkal az értékekkel

A Python-kód kiértékelésére használt lusta végrehajtási modell megköveteli, hogy a logika közvetlenül hivatkozik az egyes értékekre a @dlt.table() által dekorált függvény meghívásakor.

Az alábbi példa két helyes módszert mutat be a táblák for hurokkal való definiálásához. Mindkét példában a tables listából származó összes táblanévre kifejezetten hivatkozik a @dlt.table()által dekorált függvény.

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Az alábbi példa nem helyesen hivatkozik a értékekre. Ez a példa különböző nevű táblákat hoz létre, de az összes tábla betölti az adatokat a for ciklus utolsó értékéből:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)