Oktatóanyag: Az első DLT-folyamat futtatása
Ez az oktatóanyag végigvezeti az első DLT-folyamat konfigurálásához, az alapszintű ETL-kód írásához és egy folyamatfrissítés futtatásához szükséges lépéseken.
Az oktatóanyag minden lépése olyan munkaterületekhez készült, amelyeken engedélyezve van a Unity Catalog. A DLT-folyamatokat úgy is konfigurálhatja, hogy működjenek az örökölt Hive-metaadattárral. Lásd: DLT-folyamatok használata örökölt Hive-metaadattárral.
Jegyzet
Ez az oktatóanyag útmutatást tartalmaz az új folyamatkód Databricks-jegyzetfüzetek használatával történő fejlesztéséhez és érvényesítéséhez. A folyamatokat forráskód használatával is konfigurálhatja Python- vagy SQL-fájlokban.
Konfigurálhat egy folyamatot a kód futtatására, ha már rendelkezik DLT-szintaxissal írt forráskóddal. Lásd a DLT-folyamat konfigurálását a következő oldalon: .
A Databricks SQL-ben teljes mértékben deklaratív SQL-szintaxissal regisztrálhatja és beállíthatja a materializált nézetek és streamelési táblák frissítési ütemezését Unity Catalog által felügyelt objektumként. Lásd: Materializált nézetek használata a Databricks SQL- és Adatok betöltése streamtáblákkal a Databricks SQL.
Példa: A New York-i babanevek adatainak betöltése és feldolgozása
A cikkben szereplő példa egy nyilvánosan elérhető adatkészletet használ, amely New York állambeli babanevekrekordjait tartalmazza. Ez a példa egy DLT-folyamat használatát mutatja be a következő módon:
- Kötetből nyers CSV-adatokat olvasni egy táblába.
- Olvassa el a rekordokat a betöltési táblából, és használja a DLT elvárásokat egy új, megtisztított adatokat tartalmazó tábla létrehozásához.
- Használja a megtisztított rekordokat a származtatott adathalmazokat létrehozó DLT-lekérdezések bemeneteként.
Ez a kód a medallion architektúra egyszerűsített példáját mutatja be. Lásd Mi a medallion lakehouse architektúrája?.
A példa implementációi a Pythonhoz és az SQL-hez tartoznak. A lépéseket követve hozzon létre egy új folyamatot és jegyzetfüzetet, majd másolja be a megadott kódot.
Például jegyzetfüzetek teljes kóddal is biztosítottak.
Követelmények
- A folyamat elindításához rendelkeznie kell fürtlétrehozási engedéllyel vagy hozzáféréssel egy DLT-fürtöt meghatározó fürtszabályzathoz. A DLT futtatási környezete létrehoz egy fürtöt a csővezeték futtatása előtt, és sikertelenül lefut, ha nem rendelkezik a megfelelő engedéllyel.
- Alapértelmezés szerint minden felhasználó elindíthat frissítéseket kiszolgáló nélküli folyamatokkal. A kiszolgáló nélküli kiszolgálót a fiók szintjén kell engedélyezni, és előfordulhat, hogy nem érhető el a munkaterület régiójában. Lásd a Kiszolgáló nélküli számítás engedélyezését.
Az ebben az oktatóanyagban szereplő példák a Unity katalógusthasználják. A Databricks azt javasolja, hogy hozzon létre egy új sémát az oktatóanyag futtatásához, mivel több adatbázis-objektum jön létre a célsémában.
- Új séma katalógusban való létrehozásához
ALL PRIVILEGES
vagyUSE CATALOG
ésCREATE SCHEMA
jogosultsággal kell rendelkeznie. - Ha nem tud új sémát létrehozni, futtassa ezt az oktatóanyagot egy meglévő sémán. A következő jogosultságokkal kell rendelkeznie:
-
USE CATALOG
a főkatalógushoz. -
ALL PRIVILEGES
vagyUSE SCHEMA
,CREATE MATERIALIZED VIEW
ésCREATE TABLE
jogosultságokat a célséma számára.
-
- Ez az oktatóanyag kötetet használ a mintaadatok tárolására. A Databricks azt javasolja, hogy hozzon létre egy új kötetet ehhez az oktatóanyaghoz. Ha új sémát hoz létre ehhez az oktatóanyaghoz, létrehozhat egy új kötetet ebben a sémában.
- Ha új kötetet szeretne létrehozni egy meglévő sémában, a következő jogosultságokkal kell rendelkeznie:
-
USE CATALOG
a szülőkatalógus számára. -
ALL PRIVILEGES
vagyUSE SCHEMA
ésCREATE VOLUME
jogosultságok a célsémán.
-
- Használhat meglévő kötetet is. A következő jogosultságokkal kell rendelkeznie:
-
USE CATALOG
a szülőkatalógushoz. -
USE SCHEMA
a szülőséma számára. -
ALL PRIVILEGES
vagyREAD VOLUME
ésWRITE VOLUME
a célköteten.
-
- Ha új kötetet szeretne létrehozni egy meglévő sémában, a következő jogosultságokkal kell rendelkeznie:
Az engedélyek beállításához forduljon a Databricks rendszergazdájához. A Unity Catalog jogosultságairól további információt a Unity Catalog jogosultságai és védhető objektumokcímű résznél talál.
- Új séma katalógusban való létrehozásához
0. lépés: Adatok letöltése
Ez a példa adatokat tölt be egy Unity Catalog-kötetből. Az alábbi kód letölt egy CSV-fájlt, és a megadott köteten tárolja. Nyisson meg egy új jegyzetfüzetet, és futtassa a következő kódot az adatok a megadott kötetre való letöltéséhez:
import urllib
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
urllib.request.urlretrieve(download_url, volume_path + filename)
Cserélje le a <catalog-name>
, <schema-name>
és <volume-name>
címkéket a Unity Katalógus kötetének katalógus-, séma- és kötetneveire. A megadott kód megkísérli létrehozni a megadott sémát és kötetet, ha ezek az objektumok nem léteznek. Megfelelő jogosultságokkal kell rendelkeznie ahhoz, hogy objektumokat hozzon létre és írjon a Unity Catalogban. Lásd követelmények.
Jegyzet
Mielőtt folytatná az oktatóanyagot, győződjön meg arról, hogy a jegyzetfüzet sikeresen lefutott. Ne konfigurálja ezt a jegyzetfüzetet az adatfolyam részeként.
1. lépés: Folyamat létrehozása
A DLT a jegyzetfüzetekben vagy fájlokban definiált függőségek (forráskód) DLT szintaxist használva történő feloldásával hozza létre az adatfolyamokat. Minden forráskódfájl csak egy nyelvet tartalmazhat, de több nyelvspecifikus jegyzetfüzetet vagy fájlt is hozzáadhat a folyamathoz.
Fontos
Ne konfiguráljon semmilyen objektumot a Forráskód mezőben. Ha ezt a mezőt fekete színben hagyja, létrehoz és konfigurál egy jegyzetfüzetet a forráskódok létrehozásához.
Az oktatóanyag utasításai kiszolgáló nélküli számítást és Unity-katalógust használnak. Az utasításokban nem megadott összes konfigurációs beállítás alapértelmezett beállításait használja.
Jegyzet
Ha a kiszolgáló nélküli szolgáltatás nincs engedélyezve vagy támogatott a munkaterületen, az oktatóanyagot az alapértelmezett számítási beállítások használatával írott módon végezheti el. A Folyamat létrehozása felhasználói felület Cél szakaszában a Tárolási beállítások alatt manuálisan kell kiválasztania a Unity Catalog elemet.
Új folyamat konfigurálásához tegye a következőket:
- Az oldalsávon kattintson DLT.
- Kattintson a Folyamat létrehozásaelemre.
- A Folyamat nevemezőbe írjon be egy egyedi folyamatnevet.
- Jelölje be a kiszolgáló nélküli jelölőnégyzetet.
- A(z) Célkonfigurálásához, ahol a táblák a Unity-katalógus helyén vannak közzétéve, válasszon ki egy katalógust és egy sémát.
- A Haladólehetőségre kattintva válassza a Konfiguráció hozzáadása opciót, majd adja meg a katalógus, séma és kötet folyamatparamétereit, amelyekbe adatokat töltött le, a következő paraméternevek használatával:
my_catalog
my_schema
my_volume
- Kattintson létrehozása gombra.
Az új csővezeték felhasználói felülete megjelenik. A rendszer automatikusan létrehoz és konfigurál egy forráskódfüzetet a folyamathoz.
A jegyzettömb egy új mappában jön létre a felhasználói könyvtárban. Az új könyvtár és fájl neve megegyezik a folyamat nevével. Például /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
A jegyzetfüzet elérésére szolgáló hivatkozás a Folyamat részletei panel Forráskód mezőjében található. Kattintson a hivatkozásra a jegyzetfüzet megnyitásához, mielőtt továbblép a következő lépésre.
2. lépés: Materializált nézetek és streamelési táblák deklarálása egy jegyzetfüzetben Python vagy SQL használatával
A Datbricks-jegyzetfüzetekkel interaktívan fejlesztheti és ellenőrizheti a DLT-folyamatok forráskódját. A funkció használatához csatolnia kell a jegyzetfüzetet a pipeline-hoz. Az újonnan létrehozott jegyzetfüzet csatolása az imént létrehozott folyamathoz:
- Kattintson a Csatlakozás elemre a jobb felső sarokban a számítási konfigurációs menü megnyitásához.
- Vigye a kurzort az első lépésben létrehozott folyamat nevére.
- Kattintson a Csatlakozásgombra.
A felhasználói felület a jobb felső sarokban lévő Érvényesítési és Start gombokat is tartalmazza. A folyamatkódok fejlesztésének jegyzetfüzet-támogatásáról további információt a DLT-folyamatok fejlesztése és hibakeresése jegyzetfüzetekbencímű témakörben talál.
Fontos
- A DLT-folyamatok a tervezés során kiértékelik a jegyzetfüzet összes celláját. Az általános célú számításokkal futtatott vagy feladatként ütemezett jegyzetfüzetekkel ellentétben a pipeline-ok nem garantálják, hogy a cellák a megadott sorrendben fussanak.
- A jegyzetfüzetek csak egyetlen programozási nyelvet tartalmazhatnak. Ne keverje a Python- és SQL-kódot a folyamat forráskód-jegyzetfüzeteibe.
A Kód Pythonnal vagy SQL-lel való fejlesztéséről további információt Folyamatkód fejlesztése Python- vagy Folyamatkód fejlesztése AZ SQLhasználatával című témakörben talál.
Példa folyamatkódra
Az oktatóanyagban szereplő példa implementálásához másolja és illessze be a következő kódot a folyamat forráskódjaként konfigurált jegyzetfüzet egyik cellájába.
A megadott kód a következőket teszi:
- Importálja a szükséges modulokat (csak Python esetén).
- A folyamatkonfiguráció során definiált paraméterekre hivatkozik.
- Definiál egy
baby_names_raw
nevű streamelési táblát, amely adatokat gyűjt be egy kötetből. - Egy
baby_names_prepared
nevű materializált nézetet határoz meg, amely ellenőrzi a betöltött adatokat. - Egy
top_baby_names_2021
nevű materializált nézetet definiál, amely az adatok rendkívül kifinomult nézetével rendelkezik.
Piton
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
3. lépés: Folyamatfrissítés indítása
Folyamatfrissítés indításához kattintson a jegyzetfüzet felhasználói felületének jobb felső részén található Start gombra.
példajegyzetfüzetek
A következő jegyzetfüzetek ugyanazokat a kód példákat tartalmazzák, amelyek ebben a cikkben szerepelnek. Ezek a jegyzetfüzetek ugyanazokat a követelményeket támasztják, mint a jelen cikkben ismertetett lépések. Lásd a követelményeket.
Jegyzetfüzet importálásához hajtsa végre a következő lépéseket:
- Nyissa meg a jegyzetfüzet felhasználói felületét.
- Kattintson a + Új>Jegyzetfüzetelemre.
- Megnyílik egy üres jegyzetfüzet.
- Kattintson Fájl>Importálás...elemre. Megjelenik a Importálás párbeszédpanel.
- Válassza a URL lehetőséget a importálás céljából a-ból.
- Adja meg a jegyzetfüzet URL-címét.
- Kattintson Importálásgombra.
Ehhez az oktatóanyaghoz egy adatbeállítási jegyzetfüzetet kell futtatnia a DLT-folyamat konfigurálása és futtatása előtt. Importálja a következő jegyzetfüzetet, csatolja a jegyzetfüzetet egy számítási erőforráshoz, töltse ki a szükséges változót my_catalog
, my_schema
és my_volume
, majd kattintson az Az összesfuttatása parancsra.
Adatletöltés pipelines oktatóanyaghoz
Az alábbi jegyzetfüzetek példákat mutatnak be a Pythonban vagy az SQL-ben. Amikor importál egy jegyzetfüzetet, az a felhasználói kezdőlapra lesz mentve.
Az alábbi jegyzetfüzetek importálása után végezze el a folyamatlánc létrehozásának lépéseit, de a letöltött jegyzetfüzet kiválasztásához használja a Source code fájlválasztót. Miután létrehozott egy csővezetéket egy forráskódként konfigurált jegyzetfüzettel, kattintson a Indítás a csővezeték felhasználói felületén a frissítés aktiválásához.