Megosztás a következőn keresztül:


Oktatóanyag: Az első DLT-folyamat futtatása

Ez az oktatóanyag végigvezeti az első DLT-folyamat konfigurálásához, az alapszintű ETL-kód írásához és egy folyamatfrissítés futtatásához szükséges lépéseken.

Az oktatóanyag minden lépése olyan munkaterületekhez készült, amelyeken engedélyezve van a Unity Catalog. A DLT-folyamatokat úgy is konfigurálhatja, hogy működjenek az örökölt Hive-metaadattárral. Lásd: DLT-folyamatok használata örökölt Hive-metaadattárral.

Jegyzet

Ez az oktatóanyag útmutatást tartalmaz az új folyamatkód Databricks-jegyzetfüzetek használatával történő fejlesztéséhez és érvényesítéséhez. A folyamatokat forráskód használatával is konfigurálhatja Python- vagy SQL-fájlokban.

Konfigurálhat egy folyamatot a kód futtatására, ha már rendelkezik DLT-szintaxissal írt forráskóddal. Lásd a DLT-folyamat konfigurálását a következő oldalon: .

A Databricks SQL-ben teljes mértékben deklaratív SQL-szintaxissal regisztrálhatja és beállíthatja a materializált nézetek és streamelési táblák frissítési ütemezését Unity Catalog által felügyelt objektumként. Lásd: Materializált nézetek használata a Databricks SQL- és Adatok betöltése streamtáblákkal a Databricks SQL.

Példa: A New York-i babanevek adatainak betöltése és feldolgozása

A cikkben szereplő példa egy nyilvánosan elérhető adatkészletet használ, amely New York állambeli babanevekrekordjait tartalmazza. Ez a példa egy DLT-folyamat használatát mutatja be a következő módon:

  • Kötetből nyers CSV-adatokat olvasni egy táblába.
  • Olvassa el a rekordokat a betöltési táblából, és használja a DLT elvárásokat egy új, megtisztított adatokat tartalmazó tábla létrehozásához.
  • Használja a megtisztított rekordokat a származtatott adathalmazokat létrehozó DLT-lekérdezések bemeneteként.

Ez a kód a medallion architektúra egyszerűsített példáját mutatja be. Lásd Mi a medallion lakehouse architektúrája?.

A példa implementációi a Pythonhoz és az SQL-hez tartoznak. A lépéseket követve hozzon létre egy új folyamatot és jegyzetfüzetet, majd másolja be a megadott kódot.

Például jegyzetfüzetek teljes kóddal is biztosítottak.

Követelmények

  • A folyamat elindításához rendelkeznie kell fürtlétrehozási engedéllyel vagy hozzáféréssel egy DLT-fürtöt meghatározó fürtszabályzathoz. A DLT futtatási környezete létrehoz egy fürtöt a csővezeték futtatása előtt, és sikertelenül lefut, ha nem rendelkezik a megfelelő engedéllyel.
  • Alapértelmezés szerint minden felhasználó elindíthat frissítéseket kiszolgáló nélküli folyamatokkal. A kiszolgáló nélküli kiszolgálót a fiók szintjén kell engedélyezni, és előfordulhat, hogy nem érhető el a munkaterület régiójában. Lásd a Kiszolgáló nélküli számítás engedélyezését.
  • Az ebben az oktatóanyagban szereplő példák a Unity katalógusthasználják. A Databricks azt javasolja, hogy hozzon létre egy új sémát az oktatóanyag futtatásához, mivel több adatbázis-objektum jön létre a célsémában.

    • Új séma katalógusban való létrehozásához ALL PRIVILEGES vagy USE CATALOG és CREATE SCHEMA jogosultsággal kell rendelkeznie.
    • Ha nem tud új sémát létrehozni, futtassa ezt az oktatóanyagot egy meglévő sémán. A következő jogosultságokkal kell rendelkeznie:
      • USE CATALOG a főkatalógushoz.
      • ALL PRIVILEGES vagy USE SCHEMA, CREATE MATERIALIZED VIEWés CREATE TABLE jogosultságokat a célséma számára.
    • Ez az oktatóanyag kötetet használ a mintaadatok tárolására. A Databricks azt javasolja, hogy hozzon létre egy új kötetet ehhez az oktatóanyaghoz. Ha új sémát hoz létre ehhez az oktatóanyaghoz, létrehozhat egy új kötetet ebben a sémában.
      • Ha új kötetet szeretne létrehozni egy meglévő sémában, a következő jogosultságokkal kell rendelkeznie:
        • USE CATALOG a szülőkatalógus számára.
        • ALL PRIVILEGES vagy USE SCHEMA és CREATE VOLUME jogosultságok a célsémán.
      • Használhat meglévő kötetet is. A következő jogosultságokkal kell rendelkeznie:
        • USE CATALOG a szülőkatalógushoz.
        • USE SCHEMA a szülőséma számára.
        • ALL PRIVILEGES vagy READ VOLUME és WRITE VOLUME a célköteten.

    Az engedélyek beállításához forduljon a Databricks rendszergazdájához. A Unity Catalog jogosultságairól további információt a Unity Catalog jogosultságai és védhető objektumokcímű résznél talál.

0. lépés: Adatok letöltése

Ez a példa adatokat tölt be egy Unity Catalog-kötetből. Az alábbi kód letölt egy CSV-fájlt, és a megadott köteten tárolja. Nyisson meg egy új jegyzetfüzetet, és futtassa a következő kódot az adatok a megadott kötetre való letöltéséhez:

import urllib

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

urllib.request.urlretrieve(download_url, volume_path + filename)

Cserélje le a <catalog-name>, <schema-name>és <volume-name> címkéket a Unity Katalógus kötetének katalógus-, séma- és kötetneveire. A megadott kód megkísérli létrehozni a megadott sémát és kötetet, ha ezek az objektumok nem léteznek. Megfelelő jogosultságokkal kell rendelkeznie ahhoz, hogy objektumokat hozzon létre és írjon a Unity Catalogban. Lásd követelmények.

Jegyzet

Mielőtt folytatná az oktatóanyagot, győződjön meg arról, hogy a jegyzetfüzet sikeresen lefutott. Ne konfigurálja ezt a jegyzetfüzetet az adatfolyam részeként.

1. lépés: Folyamat létrehozása

A DLT a jegyzetfüzetekben vagy fájlokban definiált függőségek (forráskód) DLT szintaxist használva történő feloldásával hozza létre az adatfolyamokat. Minden forráskódfájl csak egy nyelvet tartalmazhat, de több nyelvspecifikus jegyzetfüzetet vagy fájlt is hozzáadhat a folyamathoz.

Fontos

Ne konfiguráljon semmilyen objektumot a Forráskód mezőben. Ha ezt a mezőt fekete színben hagyja, létrehoz és konfigurál egy jegyzetfüzetet a forráskódok létrehozásához.

Az oktatóanyag utasításai kiszolgáló nélküli számítást és Unity-katalógust használnak. Az utasításokban nem megadott összes konfigurációs beállítás alapértelmezett beállításait használja.

Jegyzet

Ha a kiszolgáló nélküli szolgáltatás nincs engedélyezve vagy támogatott a munkaterületen, az oktatóanyagot az alapértelmezett számítási beállítások használatával írott módon végezheti el. A Folyamat létrehozása felhasználói felület Cél szakaszában a Tárolási beállítások alatt manuálisan kell kiválasztania a Unity Catalog elemet.

Új folyamat konfigurálásához tegye a következőket:

  1. Az oldalsávon kattintson DLT.
  2. Kattintson a Folyamat létrehozásaelemre.
  3. A Folyamat nevemezőbe írjon be egy egyedi folyamatnevet.
  4. Jelölje be a kiszolgáló nélküli jelölőnégyzetet.
  5. A(z) Célkonfigurálásához, ahol a táblák a Unity-katalógus helyén vannak közzétéve, válasszon ki egy katalógust és egy sémát.
  6. A Haladólehetőségre kattintva válassza a Konfiguráció hozzáadása opciót, majd adja meg a katalógus, séma és kötet folyamatparamétereit, amelyekbe adatokat töltött le, a következő paraméternevek használatával:
    • my_catalog
    • my_schema
    • my_volume
  7. Kattintson létrehozása gombra.

Az új csővezeték felhasználói felülete megjelenik. A rendszer automatikusan létrehoz és konfigurál egy forráskódfüzetet a folyamathoz.

A jegyzettömb egy új mappában jön létre a felhasználói könyvtárban. Az új könyvtár és fájl neve megegyezik a folyamat nevével. Például /Users/your.username@databricks.com/my_pipeline/my_pipeline.

A jegyzetfüzet elérésére szolgáló hivatkozás a Folyamat részletei panel Forráskód mezőjében található. Kattintson a hivatkozásra a jegyzetfüzet megnyitásához, mielőtt továbblép a következő lépésre.

2. lépés: Materializált nézetek és streamelési táblák deklarálása egy jegyzetfüzetben Python vagy SQL használatával

A Datbricks-jegyzetfüzetekkel interaktívan fejlesztheti és ellenőrizheti a DLT-folyamatok forráskódját. A funkció használatához csatolnia kell a jegyzetfüzetet a pipeline-hoz. Az újonnan létrehozott jegyzetfüzet csatolása az imént létrehozott folyamathoz:

  1. Kattintson a Csatlakozás elemre a jobb felső sarokban a számítási konfigurációs menü megnyitásához.
  2. Vigye a kurzort az első lépésben létrehozott folyamat nevére.
  3. Kattintson a Csatlakozásgombra.

A felhasználói felület a jobb felső sarokban lévő Érvényesítési és Start gombokat is tartalmazza. A folyamatkódok fejlesztésének jegyzetfüzet-támogatásáról további információt a DLT-folyamatok fejlesztése és hibakeresése jegyzetfüzetekbencímű témakörben talál.

Fontos

  • A DLT-folyamatok a tervezés során kiértékelik a jegyzetfüzet összes celláját. Az általános célú számításokkal futtatott vagy feladatként ütemezett jegyzetfüzetekkel ellentétben a pipeline-ok nem garantálják, hogy a cellák a megadott sorrendben fussanak.
  • A jegyzetfüzetek csak egyetlen programozási nyelvet tartalmazhatnak. Ne keverje a Python- és SQL-kódot a folyamat forráskód-jegyzetfüzeteibe.

A Kód Pythonnal vagy SQL-lel való fejlesztéséről további információt Folyamatkód fejlesztése Python- vagy Folyamatkód fejlesztése AZ SQLhasználatával című témakörben talál.

Példa folyamatkódra

Az oktatóanyagban szereplő példa implementálásához másolja és illessze be a következő kódot a folyamat forráskódjaként konfigurált jegyzetfüzet egyik cellájába.

A megadott kód a következőket teszi:

  • Importálja a szükséges modulokat (csak Python esetén).
  • A folyamatkonfiguráció során definiált paraméterekre hivatkozik.
  • Definiál egy baby_names_raw nevű streamelési táblát, amely adatokat gyűjt be egy kötetből.
  • Egy baby_names_prepared nevű materializált nézetet határoz meg, amely ellenőrzi a betöltött adatokat.
  • Egy top_baby_names_2021 nevű materializált nézetet definiál, amely az adatok rendkívül kifinomult nézetével rendelkezik.

Piton

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

3. lépés: Folyamatfrissítés indítása

Folyamatfrissítés indításához kattintson a jegyzetfüzet felhasználói felületének jobb felső részén található Start gombra.

példajegyzetfüzetek

A következő jegyzetfüzetek ugyanazokat a kód példákat tartalmazzák, amelyek ebben a cikkben szerepelnek. Ezek a jegyzetfüzetek ugyanazokat a követelményeket támasztják, mint a jelen cikkben ismertetett lépések. Lásd a követelményeket.

Jegyzetfüzet importálásához hajtsa végre a következő lépéseket:

  1. Nyissa meg a jegyzetfüzet felhasználói felületét.
    • Kattintson a + Új>Jegyzetfüzetelemre.
    • Megnyílik egy üres jegyzetfüzet.
  2. Kattintson Fájl>Importálás...elemre. Megjelenik a Importálás párbeszédpanel.
  3. Válassza a URL lehetőséget a importálás céljából a-ból.
  4. Adja meg a jegyzetfüzet URL-címét.
  5. Kattintson Importálásgombra.

Ehhez az oktatóanyaghoz egy adatbeállítási jegyzetfüzetet kell futtatnia a DLT-folyamat konfigurálása és futtatása előtt. Importálja a következő jegyzetfüzetet, csatolja a jegyzetfüzetet egy számítási erőforráshoz, töltse ki a szükséges változót my_catalog, my_schemaés my_volume, majd kattintson az Az összesfuttatása parancsra.

Adatletöltés pipelines oktatóanyaghoz

Jegyzetfüzet lekérése

Az alábbi jegyzetfüzetek példákat mutatnak be a Pythonban vagy az SQL-ben. Amikor importál egy jegyzetfüzetet, az a felhasználói kezdőlapra lesz mentve.

Az alábbi jegyzetfüzetek importálása után végezze el a folyamatlánc létrehozásának lépéseit, de a letöltött jegyzetfüzet kiválasztásához használja a Source code fájlválasztót. Miután létrehozott egy csővezetéket egy forráskódként konfigurált jegyzetfüzettel, kattintson a Indítás a csővezeték felhasználói felületén a frissítés aktiválásához.

A DLT Python-jegyzetfüzet használatának első lépései

Jegyzetfüzet lekérése

A DLT SQL-jegyzetfüzet használatának első lépései

Jegyzetfüzet lekérése

További erőforrások