Esercitazione: Esegui la tua prima pipeline DLT
Questa esercitazione illustra i passaggi per configurare la prima pipeline DLT, scrivere codice ETL di base ed eseguire un aggiornamento della pipeline.
Tutti i passaggi di questo tutorial sono progettati per le aree di lavoro in cui il Catalogo Unity è abilitato. È anche possibile configurare le pipeline DLT per l'uso con il metastore Hive legacy. Consultare usare pipeline DLT con metastore Hive legacy.
Nota
Questa esercitazione contiene istruzioni per lo sviluppo e la convalida di un nuovo codice della pipeline usando i notebook di Databricks. È anche possibile configurare le pipeline usando il codice sorgente in file Python o SQL.
È possibile configurare una pipeline per eseguire il codice se il codice sorgente è già stato scritto usando la sintassi DLT. Consulta Configurare una pipeline DLT.
È possibile usare la sintassi SQL completamente dichiarativa in Databricks SQL per registrare e impostare pianificazioni di aggiornamento per viste materializzate e tabelle di streaming come oggetti gestiti dal catalogo unity. Vedere Usare viste materializzate in Databricks SQL e Caricare dati usando tabelle di streaming in Databricks SQL.
Esempio: Inserire ed elaborare i dati dei nomi dei bambini di New York
L'esempio in questo articolo usa un set di dati disponibile pubblicamente che contiene record di i nomi dei bambini dello Stato di New York. Questo esempio illustra l'uso di una pipeline DLT per:
- Leggere dati CSV non elaborati da un volume in una tabella.
- Leggere i record dalla tabella di inserimento e usare DLT aspettative per creare una nuova tabella contenente dati puliti.
- Usare i record puliti come input per le query DLT che creano set di dati derivati.
Questo codice illustra un esempio semplificato dell'architettura Medallion. Consulta Che cos'è l'architettura Lakehouse medallion?.
Le implementazioni di questo esempio vengono fornite per Python e SQL. Seguire la procedura per creare una nuova pipeline e un nuovo notebook, quindi copiare e incollare il codice fornito.
Vengono forniti anche notebook di esempio con codice completo.
Requisiti
- Per avviare una pipeline, è necessario disporre dell'autorizzazione per la creazione di cluster o accesso a un criterio di configurazione del cluster che definisce un cluster DLT. Il runtime DLT crea un cluster prima di eseguire la pipeline e ha esito negativo se non si dispone dell'autorizzazione corretta.
- Per impostazione predefinita, tutti gli utenti possono attivare gli aggiornamenti usando pipeline serverless. Il serverless deve essere abilitato a livello di account e potrebbe non essere disponibile nella regione dell'area di lavoro. Consultare Enable serverless compute.
Gli esempi in questo tutorial usano Catalogo Unity. Databricks consiglia di creare un nuovo schema per eseguire questa esercitazione, perché nello schema di destinazione vengono creati più oggetti di database.
- Per creare un nuovo schema in un catalogo, è necessario disporre di privilegi di
ALL PRIVILEGES
o diUSE CATALOG
e diCREATE SCHEMA
. - Se non è possibile creare un nuovo schema, eseguire questa esercitazione su uno schema esistente. È necessario disporre dei privilegi seguenti:
-
USE CATALOG
per il catalogo principale. -
ALL PRIVILEGES
oUSE SCHEMA
,CREATE MATERIALIZED VIEW
e privilegiCREATE TABLE
nello schema di destinazione.
-
- Questa esercitazione usa un volume per archiviare i dati di esempio. Databricks consiglia di creare un nuovo volume per questa esercitazione. Se si crea un nuovo schema per questa esercitazione, è possibile creare un nuovo volume in tale schema.
- Per creare un nuovo volume in uno schema esistente, è necessario disporre dei privilegi seguenti:
-
USE CATALOG
per il catalogo padre. -
ALL PRIVILEGES
oUSE SCHEMA
e privilegi diCREATE VOLUME
nello schema di destinazione.
-
- Facoltativamente, è possibile usare un volume esistente. È necessario disporre dei privilegi seguenti:
-
USE CATALOG
per il catalogo padre. -
USE SCHEMA
per lo schema padre. -
ALL PRIVILEGES
oREAD VOLUME
eWRITE VOLUME
nel volume di destinazione.
-
- Per creare un nuovo volume in uno schema esistente, è necessario disporre dei privilegi seguenti:
Per impostare queste autorizzazioni, contattare l'amministratore di Databricks. Per ulteriori informazioni sui privilegi di Unity Catalog, consultare i privilegi di Unity Catalog e gli oggetti securabili.
- Per creare un nuovo schema in un catalogo, è necessario disporre di privilegi di
Passaggio 0: Scaricare i dati
Questo esempio carica i dati da un volume di Unity Catalog. Il codice seguente scarica un file CSV e lo archivia nel volume specificato. Aprire un nuovo notebook ed eseguire il codice seguente per scaricare questi dati nel volume specificato:
import urllib
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
urllib.request.urlretrieve(download_url, volume_path + filename)
Sostituire <catalog-name>
, <schema-name>
e <volume-name>
con i nomi di catalogo, schema e volume per un volume di Unity Catalog. Il codice fornito tenta di creare lo schema e il volume specificati se questi oggetti non esistono. È necessario disporre dei privilegi appropriati per creare e scrivere in oggetti in Unity Catalog. Vedere Requisiti.
Nota
Assicurati che il notebook sia stato eseguito correttamente prima di continuare con il tutorial. Non configurare questo notebook come parte della pipeline.
Passaggio 1: Creare una pipeline
DLT crea delle pipeline risolvendo le dipendenze definite nei notebook o nei file (denominati codice sorgente) usando la sintassi DLT. Ogni file di codice sorgente può contenere una sola lingua, ma è possibile aggiungere più notebook o file specifici della lingua nella pipeline.
Importante
Non configurare alcun asset nel campo codice sorgente. Lasciando questo campo nero viene creato e configurato un notebook per la creazione del codice sorgente.
Le istruzioni in questa esercitazione utilizzano il calcolo serverless e il Catalogo Unity. Usare le impostazioni predefinite per tutte le opzioni di configurazione non specificate in queste istruzioni.
Nota
Se serverless non è abilitato o supportato nell'area di lavoro, è possibile completare l'esercitazione come scritto usando le impostazioni di calcolo predefinite. È necessario selezionare manualmente catalogo Unity in opzioni di archiviazione nella sezione Destinazione dell'interfaccia utente di Crea pipeline.
Per configurare una nuova pipeline, eseguire le operazioni seguenti:
- Nella barra laterale fare clic su DLT.
- Fare clic su Crea pipeline.
- In nome pipeline, digitare un nome univoco per la pipeline.
- Selezionare la casella di controllo serverless.
- In Destinazione, per configurare un percorso del catalogo Unity in cui vengono pubblicate le tabelle, selezionare un catalogo e uno schema.
- In Advanced, fai clic su Aggiungi configurazione e poi definisci i parametri della pipeline per il catalogo, lo schema e il volume nel quale hai scaricato i dati usando i seguenti nomi di parametro:
my_catalog
my_schema
my_volume
- Fare clic su Crea.
Viene visualizzata l'interfaccia utente delle pipeline per la nuova pipeline. Un notebook del codice sorgente viene creato e configurato automaticamente per la pipeline.
Il notebook viene creato in una nuova directory nella directory utente. Il nome della nuova directory e quello del file corrispondono al nome della pipeline. Ad esempio, /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Un collegamento per accedere a questo notebook si trova nel campo codice sorgente nel pannello dettagli del pipeline. Fare clic sul collegamento per aprire il notebook prima di procedere al passaggio successivo.
Passaggio 2: Dichiarare viste materializzate e tabelle di streaming in un notebook con Python o SQL
È possibile usare i notebook datbricks per sviluppare e convalidare in modo interattivo il codice sorgente per le pipeline DLT. Per usare questa funzionalità, è necessario collegare il notebook alla pipeline. Per collegare il notebook appena creato alla pipeline appena creata:
- Fare clic su Connect in alto a destra per aprire il menu di configurazione del calcolo.
- Passare il puntatore del mouse sul nome della pipeline creata nel passaggio 1.
- Fare clic su Connetti.
Le modifiche apportate all'interfaccia utente includono i pulsanti Convalida e Avvia in alto a destra. Per altre informazioni sul supporto dei notebook per lo sviluppo di codice della pipeline, vedere Sviluppare ed eseguire il debug di pipeline DLT nei notebook.
Importante
- Le pipeline DLT valutano tutte le celle di un notebook durante la progettazione. A differenza dei notebook eseguiti su calcolo generico o pianificati come processi, le pipeline non garantiscono che le celle vengano eseguite nell'ordine stabilito.
- I notebook possono contenere solo un singolo linguaggio di programmazione. Non combinare codice Python e SQL nei notebook del codice sorgente della pipeline.
Per informazioni dettagliate sullo sviluppo di codice con Python o SQL, vedere Sviluppare codice della pipeline con Python o Sviluppare codice della pipeline con SQL.
Codice della pipeline di esempio
Per implementare l'esempio in questa esercitazione, copiare e incollare il codice seguente in una cella del notebook configurato come codice sorgente per la pipeline.
Il codice fornito esegue le operazioni seguenti:
- Importa i moduli necessari (solo Python).
- Parametri di riferimento definiti durante la configurazione della pipeline.
- Definisce una tabella di streaming denominata
baby_names_raw
che acquisisce dati da un volume. - Definisce una vista materializzata denominata
baby_names_prepared
che convalida i dati inseriti. - Definisce una vista materializzata denominata
top_baby_names_2021
con una vista altamente raffinata dei dati.
Pitone
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Passaggio 3: Avviare un aggiornamento della pipeline
Per avviare un aggiornamento della pipeline, fare clic sul pulsante Start in alto a destra dell'interfaccia utente del notebook.
Notebook di esempio
I notebook seguenti contengono gli stessi esempi di codice forniti in questo articolo. Questi notebook hanno gli stessi requisiti dei passaggi descritti in questo articolo. Vedere Requisiti.
Per importare un notebook, completare la procedura seguente:
- Aprire l'interfaccia utente del notebook.
- Fare clic su + Nuovo notebook>.
- Verrà aperto un notebook vuoto.
- Fare clic su File>Importa.... Viene visualizzata la finestra di dialogo Import.
- Selezionare l'opzione URL per Importazione da .
- Incollare l'URL del notebook.
- Fare clic su Importa.
Questa esercitazione richiede l'esecuzione di un notebook di impostazione dei dati prima di configurare ed eseguire la pipeline DLT. Importare il notebook seguente, collegare il notebook a una risorsa di calcolo, riempire la variabile richiesta per my_catalog
, my_schema
e my_volume
e fare clic su Esegui tutto.
Esercitazione sul download dei dati per le pipeline
Ottieni notebook
I notebook seguenti forniscono esempi in Python o SQL. Quando si importa un notebook, questo viene salvato nella home directory dell'utente.
Dopo aver importato uno dei notebook seguenti, completare i passaggi per creare una pipeline, ma usare il codice sorgente selettore di file per selezionare il notebook scaricato. Dopo aver creato la pipeline con un notebook configurato come codice sorgente, fare clic su Avvia nell'interfaccia utente della pipeline per attivare un aggiornamento.
Introduzione al notebook Python DLT
Inizia con il notebook SQL DLT
Prendi il notebook