Condividi tramite


Sviluppare codice della pipeline con SQL

DLT introduce diverse nuove parole chiave e funzioni SQL per la definizione di viste materializzate e tabelle di streaming nelle pipeline. Il supporto SQL per lo sviluppo di pipeline si basa sulle nozioni di base di Spark SQL e aggiunge il supporto per la funzionalità Structured Streaming.

Gli utenti che hanno familiarità con i dataframe PySpark potrebbero preferire lo sviluppo di codice della pipeline con Python. Python supporta test e operazioni più estese che sono difficili da implementare con SQL, ad esempio le operazioni di metaprogrammazione. Vedere Sviluppare codice della pipeline con Python.

Per un riferimento completo alla sintassi SQL DLT, consultare la documentazione del linguaggio SQL DLT .

Nozioni di base di SQL per lo sviluppo di pipeline

Il codice SQL che crea set di dati DLT usa la sintassi CREATE OR REFRESH per definire viste materializzate e tabelle di streaming sui risultati delle query.

La parola chiave STREAM indica se l'origine dati a cui si fa riferimento in una clausola SELECT deve essere letta con la semantica di streaming.

Legge e scrive il valore predefinito nel catalogo e nello schema specificato durante la configurazione della pipeline. Vedere Impostare il catalogo di destinazione e lo schema.

Il codice sorgente DLT differisce in modo critico dagli script SQL: DLT valuta tutte le definizioni di set di dati in tutti i file di codice sorgente configurati in una pipeline e compila un grafico del flusso di dati prima dell'esecuzione di qualsiasi query. L'ordine delle query visualizzate in un notebook o in uno script definisce l'ordine di valutazione del codice, ma non l'ordine di esecuzione della query.

Creare una vista materializzata con SQL

L'esempio di codice seguente illustra la sintassi di base per la creazione di una vista materializzata con SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Creare una tabella di streaming con SQL

L'esempio di codice seguente illustra la sintassi di base per la creazione di una tabella di streaming con SQL:

Nota

Non tutte le origini dati supportano le letture di streaming e alcune origini dati devono essere sempre elaborate con la semantica di streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Caricare dati dall'archivio oggetti

DLT supporta il caricamento di dati da tutti i formati supportati da Azure Databricks. Vedere Opzioni di formato dati.

Nota

Questi esempi usano i dati disponibili nella /databricks-datasets montati automaticamente nell'area di lavoro. Databricks consiglia di usare i percorsi del volume o gli URI cloud per fare riferimento ai dati archiviati nell'archiviazione di oggetti cloud. Consulta Che cosa sono i volumi di Unity Catalog?.

Databricks consiglia di usare il caricatore automatico e le tabelle di streaming quando si configurano carichi di lavoro di inserimento incrementali sui dati archiviati nell'archiviazione di oggetti cloud. Consulta Che cos'è il caricatore automatico?.

SQL usa la funzione read_files per richiamare la funzionalità del caricatore automatico. È inoltre necessario usare la parola chiave STREAM per configurare una lettura di streaming con read_files.

L'esempio seguente crea una tabella di streaming da file JSON usando il caricatore automatico:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

La funzione read_files supporta anche la semantica batch per creare viste materializzate. L'esempio seguente usa la semantica batch per leggere una directory JSON e creare una vista materializzata:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Convalidare i dati con le aspettative

È possibile usare le aspettative per impostare e applicare vincoli di qualità dei dati. Consulta Gestire la qualità dei dati con le aspettative della pipeline.

Il codice seguente definisce un'aspettativa denominata valid_data che elimina i record null durante l'inserimento dati:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Query su viste materializzate e tabelle di streaming definite nella tua pipeline

L'esempio seguente definisce quattro set di dati:

  • Tabella di streaming denominata orders che carica i dati JSON.
  • Vista materializzata denominata customers che carica i dati CSV.
  • Una vista materializzata denominata customer_orders che unisce i record dai set di dati orders e customers, converte il timestamp dell'ordine in una data e seleziona i campi customer_id, order_number, statee order_date.
  • Vista materializzata denominata daily_orders_by_state che aggrega il conteggio giornaliero degli ordini per ogni stato.

Nota

Quando si eseguono query su viste o tabelle nella pipeline, è possibile specificare direttamente il catalogo e lo schema oppure usare le impostazioni predefinite configurate nella pipeline. In questo esempio, le tabelle orders, customerse customer_orders vengono scritte e lette dal catalogo e dallo schema predefiniti configurati per la pipeline.

La modalità di pubblicazione legacy usa lo schema LIVE per eseguire query su altre viste materializzate e tabelle di streaming definite nella pipeline. Nelle nuove pipeline la sintassi dello schema LIVE viene ignorata automaticamente. Visualizza lo schema LIVE (legacy).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;