Sviluppare codice della pipeline con SQL
DLT introduce diverse nuove parole chiave e funzioni SQL per la definizione di viste materializzate e tabelle di streaming nelle pipeline. Il supporto SQL per lo sviluppo di pipeline si basa sulle nozioni di base di Spark SQL e aggiunge il supporto per la funzionalità Structured Streaming.
Gli utenti che hanno familiarità con i dataframe PySpark potrebbero preferire lo sviluppo di codice della pipeline con Python. Python supporta test e operazioni più estese che sono difficili da implementare con SQL, ad esempio le operazioni di metaprogrammazione. Vedere Sviluppare codice della pipeline con Python.
Per un riferimento completo alla sintassi SQL DLT, consultare la documentazione del linguaggio SQL DLT .
Nozioni di base di SQL per lo sviluppo di pipeline
Il codice SQL che crea set di dati DLT usa la sintassi CREATE OR REFRESH
per definire viste materializzate e tabelle di streaming sui risultati delle query.
La parola chiave STREAM
indica se l'origine dati a cui si fa riferimento in una clausola SELECT
deve essere letta con la semantica di streaming.
Legge e scrive il valore predefinito nel catalogo e nello schema specificato durante la configurazione della pipeline. Vedere Impostare il catalogo di destinazione e lo schema.
Il codice sorgente DLT differisce in modo critico dagli script SQL: DLT valuta tutte le definizioni di set di dati in tutti i file di codice sorgente configurati in una pipeline e compila un grafico del flusso di dati prima dell'esecuzione di qualsiasi query. L'ordine delle query visualizzate in un notebook o in uno script definisce l'ordine di valutazione del codice, ma non l'ordine di esecuzione della query.
Creare una vista materializzata con SQL
L'esempio di codice seguente illustra la sintassi di base per la creazione di una vista materializzata con SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Creare una tabella di streaming con SQL
L'esempio di codice seguente illustra la sintassi di base per la creazione di una tabella di streaming con SQL:
Nota
Non tutte le origini dati supportano le letture di streaming e alcune origini dati devono essere sempre elaborate con la semantica di streaming.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Caricare dati dall'archivio oggetti
DLT supporta il caricamento di dati da tutti i formati supportati da Azure Databricks. Vedere Opzioni di formato dati.
Nota
Questi esempi usano i dati disponibili nella /databricks-datasets
montati automaticamente nell'area di lavoro. Databricks consiglia di usare i percorsi del volume o gli URI cloud per fare riferimento ai dati archiviati nell'archiviazione di oggetti cloud. Consulta Che cosa sono i volumi di Unity Catalog?.
Databricks consiglia di usare il caricatore automatico e le tabelle di streaming quando si configurano carichi di lavoro di inserimento incrementali sui dati archiviati nell'archiviazione di oggetti cloud. Consulta Che cos'è il caricatore automatico?.
SQL usa la funzione read_files
per richiamare la funzionalità del caricatore automatico. È inoltre necessario usare la parola chiave STREAM
per configurare una lettura di streaming con read_files
.
L'esempio seguente crea una tabella di streaming da file JSON usando il caricatore automatico:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
La funzione read_files
supporta anche la semantica batch per creare viste materializzate. L'esempio seguente usa la semantica batch per leggere una directory JSON e creare una vista materializzata:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Convalidare i dati con le aspettative
È possibile usare le aspettative per impostare e applicare vincoli di qualità dei dati. Consulta Gestire la qualità dei dati con le aspettative della pipeline.
Il codice seguente definisce un'aspettativa denominata valid_data
che elimina i record null durante l'inserimento dati:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Query su viste materializzate e tabelle di streaming definite nella tua pipeline
L'esempio seguente definisce quattro set di dati:
- Tabella di streaming denominata
orders
che carica i dati JSON. - Vista materializzata denominata
customers
che carica i dati CSV. - Una vista materializzata denominata
customer_orders
che unisce i record dai set di datiorders
ecustomers
, converte il timestamp dell'ordine in una data e seleziona i campicustomer_id
,order_number
,state
eorder_date
. - Vista materializzata denominata
daily_orders_by_state
che aggrega il conteggio giornaliero degli ordini per ogni stato.
Nota
Quando si eseguono query su viste o tabelle nella pipeline, è possibile specificare direttamente il catalogo e lo schema oppure usare le impostazioni predefinite configurate nella pipeline. In questo esempio, le tabelle orders
, customers
e customer_orders
vengono scritte e lette dal catalogo e dallo schema predefiniti configurati per la pipeline.
La modalità di pubblicazione legacy usa lo schema LIVE
per eseguire query su altre viste materializzate e tabelle di streaming definite nella pipeline. Nelle nuove pipeline la sintassi dello schema LIVE
viene ignorata automaticamente. Visualizza lo schema LIVE (legacy).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;