Condividi tramite


Informazioni di riferimento sul linguaggio SQL DLT

Questo articolo contiene informazioni dettagliate sull'interfaccia di programmazione SQL DLT.

È possibile usare funzioni definite dall'utente Python nelle query SQL, ma è necessario definire queste funzioni definite dall'utente nei file Python prima di chiamarle nei file di origine SQL. Vedere Funzioni scalari definite dall'utente - Python.

Limitazioni

La clausola PIVOT non è supportata. L'operazione pivot in Spark richiede il caricamento anticipato dei dati di input per calcolare lo schema di output. Questa funzionalità non è supportata in DLT.

Creare una vista materializzata DLT o una tabella di streaming

Nota

La sintassi CREATE OR REFRESH LIVE TABLE per creare una vista materializzata è deprecata. Usare invece CREATE OR REFRESH MATERIALIZED VIEW.

Per dichiarare una tabella di streaming o una vista materializzata, usare la stessa sintassi SQL di base.

Dichiarare una vista materializzata di DLT con SQL

Di seguito viene descritta la sintassi per dichiarare una vista materializzata in DLT con SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  CLUSTER BY clause
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Dichiarare una tabella di streaming DLT con SQL

È possibile dichiarare tabelle di streaming solo usando query che leggono da un'origine di streaming. Databricks consiglia di usare il caricatore automatico per l'inserimento in streaming di file dall'archiviazione di oggetti cloud. Consulta la sintassi SQL del caricatore automatico .

Quando si specificano altre tabelle o viste nella pipeline come origini di streaming, è necessario includere la funzione STREAM() intorno a un nome di set di dati.

Di seguito viene descritta la sintassi per dichiarare una tabella di streaming in DLT con SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [CLUSTER BY clause]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Creare una vista DLT

Di seguito viene descritta la sintassi per dichiarare le viste con SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

sintassi SQL del caricatore automatico

Di seguito viene descritta la sintassi per l'uso del caricatore automatico in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value>",
      "<option-key>", "<option_value>",
      ...
    )
  )

È possibile usare le opzioni di formato supportate con Il caricatore automatico. Usando la funzione map(), è possibile passare le opzioni al metodo read_files(). Le opzioni sono coppie chiave-valore, in cui le chiavi e i valori sono stringhe. Per informazioni dettagliate sui formati e le opzioni di supporto, vedere opzioni di formato di file .

Esempio: Definire tabelle

È possibile creare un set di dati leggendo da un'origine dati esterna o da set di dati definiti in una pipeline. Per leggere da un set di dati interno, specificare il nome della tabella che userà le impostazioni predefinite della pipeline configurate per il catalogo e lo schema. L'esempio seguente definisce due set di dati diversi: una tabella denominata taxi_raw che accetta un file JSON come origine di input e una tabella denominata filtered_data che accetta la tabella taxi_raw come input:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM taxi_raw

Esempio: Leggere da un'origine di streaming

Per leggere i dati da un'origine di streaming, ad esempio Auto Loader o un set di dati interno, definire una tabella STREAMING:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Per ulteriori informazioni sui dati di streaming, vedere Trasformare i dati con le pipeline.

Eliminare definitivamente i record da una vista materializzata o da una tabella di streaming

Per eliminare definitivamente i record da una vista materializzata o da una tabella di streaming con vettori di eliminazione abilitati, ad esempio per la conformità al GDPR, è necessario eseguire operazioni aggiuntive sulle tabelle Delta sottostanti dell'oggetto. Per garantire l'eliminazione dei record da una vista materializzata, vedere Eliminare definitivamente i record da una vista materializzata con vettori di eliminazione abilitati. Per garantire l'eliminazione dei record da una tabella di streaming, vedere Eliminare definitivamente i record da una tabella di streaming.

Controllare la modalità di materializzazione delle tabelle

Le tabelle offrono anche un controllo aggiuntivo della loro materializzazione.

  • Specifica come raggruppare le tabelle usando CLUSTER BY. È possibile usare il clustering liquido per velocizzare le query. Vedi Usa il clustering liquido per le tabelle Delta.
  • Specificare il modo in cui le tabelle vengono partizionate usando PARTITIONED BY.
  • È possibile impostare le proprietà della tabella usando TBLPROPERTIES. Consulta le proprietà della tabella DLT .
  • Impostare una destinazione di archiviazione usando l'impostazione LOCATION. Per impostazione predefinita, i dati della tabella vengono archiviati nel percorso di archiviazione della pipeline se LOCATION non è impostato.
  • È possibile usare le colonne generate nella definizione dello schema. Consulta Esempio: Specificare uno schema e le colonne del cluster.

Nota

Per le tabelle di dimensioni inferiori a 1 TB, Databricks consiglia di lasciare che DLT controlli l'organizzazione dei dati. A meno che la tabella non cresca oltre un terabyte, Databricks consiglia di non specificare le colonne di partizione.

: Esempio: specificare uno schema e colonne di cluster

Facoltativamente, è possibile specificare uno schema quando si definisce una tabella. L'esempio seguente specifica lo schema per la tabella di destinazione, incluse le colonne generate di Delta Lake e definisce le colonne di clustering per la tabella:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Per impostazione predefinita, DLT deduce lo schema dalla definizione di table se non si specifica uno schema.

Esempio di : Specificare le colonne di partizione

Facoltativamente, è possibile specificare colonne di partizione per la tabella:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Il clustering liquido offre una soluzione flessibile e ottimizzata per il clustering. È consigliabile usare CLUSTER BY anziché PARTITIONED BY per DLT.

Esempio di : Definire vincoli di tabella

Nota

Il supporto DLT per i vincoli di tabella si trova in anteprima pubblica. Per definire i vincoli di tabella, la pipeline deve essere una pipeline abilitata per Unity Catalog e configurata per l'uso del canale preview.

Quando si specifica uno schema, è possibile definire chiavi primarie ed esterne. I vincoli sono informativi e non vengono applicati. Consulta la clausola CONSTRAINT nella guida di riferimento del linguaggio SQL.

Nell'esempio seguente viene definita una tabella con un vincolo di chiave primaria ed esterna:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parametrizzare i valori usati durante la dichiarazione di tabelle o viste con SQL

Usare SET per specificare un valore di configurazione in una query che dichiara una tabella o una vista, incluse le configurazioni di Spark. Qualsiasi tabella o vista definita in un notebook dopo che l'istruzione SET ha accesso al valore definito. Tutte le configurazioni spark specificate usando l'istruzione SET vengono usate durante l'esecuzione della query Spark per qualsiasi tabella o vista dopo l'istruzione SET. Per leggere un valore di configurazione in una query, usare la sintassi dell'interpolazione di stringhe ${}. L'esempio seguente imposta un valore di configurazione Spark denominato startDate e usa tale valore in una query:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Per specificare più valori di configurazione, usare un'istruzione SET separata per ogni valore.

esempio : Definire un filtro di riga e una maschera di colonna

Importante

I filtri di riga e le maschere di colonna sono in anteprima pubblica .

Per creare una vista materializzata o una tabella di streaming con un filtro di riga e una maschera di colonna, utilizzare la clausola ROW FILTER e la clausola MASK. Nell'esempio seguente viene illustrato come definire una vista materializzata e una tabella di streaming con un filtro di riga e una maschera di colonna:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

Per altre informazioni sui filtri di riga e sulle maschere di colonna, vedere Pubblicare tabelle con filtri di riga e maschere di colonna.

Proprietà SQL

CREATE TABLE o VISUALIZZA
TEMPORARY
Creare una tabella ma non pubblicare i metadati per la tabella. La clausola TEMPORARY indica a DLT di creare una tabella disponibile per la pipeline, ma non deve essere accessibile all'esterno della pipeline. Per ridurre il tempo di elaborazione, una tabella temporanea viene mantenuta per la durata della pipeline che lo crea e non solo per un singolo aggiornamento.
STREAMING
Creare una tabella che legge un set di dati di input come flusso. Il set di dati di input deve essere una sorgente dati in streaming, ad esempio Auto Loader o una tabella STREAMING.
CLUSTER BY
Abilitare il clustering liquido nella tabella e definire le colonne da usare come chiavi di clustering.
Vedi Utilizzare il clustering liquido per le tabelle Delta.
PARTITIONED BY
Elenco facoltativo di una o più colonne da utilizzare per il partizionamento della tabella.
LOCATION
Posizione di archiviazione facoltativa per i dati della tabella. Se non impostato, il sistema userà per impostazione predefinita il percorso di archiviazione della pipeline.
COMMENT
Descrizione facoltativa per la tabella.
column_constraint
Una chiave primaria informativa facoltativa o un vincolo di chiave esterna nella colonna.
MASK clause (anteprima pubblica)
Aggiunge una funzione maschera di colonna per rendere anonimi i dati sensibili. Le query future per tale colonna restituiscono il risultato della funzione valutata anziché il valore originale della colonna. Ciò è utile per il controllo di accesso con granularità fine, perché la funzione può controllare l'identità dell'utente e le appartenenze ai gruppi per decidere se redigire il valore.
Vedere clausola della colonnamask.
table_constraint
Una chiave primaria informativa facoltativa o un vincolo di chiave esterna nella tabella.
TBLPROPERTIES
Elenco facoltativo di proprietà della tabella.
WITH ROW FILTER clause (anteprima pubblica)
Aggiunge una funzione di filtro di riga alla tabella. Le future query per tale tabella ricevono un sottoinsieme delle righe per cui la funzione restituisce TRUE. Ciò è utile per il controllo di accesso con granularità fine, perché consente alla funzione di controllare l'identità e le appartenenze ai gruppi dell'utente che richiama per decidere se filtrare determinate righe.
Vedere ROW FILTER clausola.
select_statement
Query DLT che definisce il set di dati per la tabella.
clausola CONSTRAINT
EXPECT expectation_name
Definire il vincolo di qualità dei dati expectation_name. Se il vincolo ON VIOLATION non è definito, aggiungere righe che violano il vincolo al set di dati di destinazione.
ON VIOLATION
Azione facoltativa da eseguire per le righe non elaborate correttamente:
  • FAIL UPDATE: arrestare immediatamente l'esecuzione della pipeline.
  • DROP ROW: eliminare il record e continuare l'elaborazione.

La cattura di dati modificati con SQL in DLT

Usare l'istruzione APPLY CHANGES INTO per usare la funzionalità DLT CDC, come descritto di seguito:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

È possibile definire vincoli di qualità dei dati per una destinazione APPLY CHANGES usando la stessa clausola CONSTRAINT delle query nonAPPLY CHANGES. Fare riferimento a Gestire la qualità dei dati con le aspettative della pipeline.

Nota

Il comportamento predefinito per gli eventi INSERT e UPDATE consiste nello upsert degli eventi CDC dall'origine: aggiornare qualunque riga nella tabella di destinazione che corrisponda alle chiavi specificate o inserire una nuova riga quando non esiste un record corrispondente nella tabella di destinazione. La gestione degli eventi di DELETE può essere specificata con la condizione APPLY AS DELETE WHEN.

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della tabella di destinazione APPLY CHANGES, è necessario includere anche le colonne __START_AT e __END_AT con lo stesso tipo di dati del campo sequence_by.

Vedere le API APPLY CHANGES semplificano la cattura dei dati modificati con DLT.

Clausole
KEYS
Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione.
Per definire una combinazione di colonne, usare un elenco delimitato da virgole di colonne.
Questa clausola è obbligatoria.
IGNORE NULL UPDATES
Consentire l'inserimento di aggiornamenti contenenti un sottoinsieme delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e viene specificato IGNORE NULL UPDATES, le colonne con un null manterranno i valori esistenti nella destinazione. Questo vale anche per le colonne nidificate con un valore di null.
Questa clausola è facoltativa.
L'impostazione predefinita consiste nel sovrascrivere le colonne esistenti con valori null.
APPLY AS DELETE WHEN
Specifica quando un evento CDC deve essere considerato come DELETE anziché come upsert. Per gestire i dati non ordinati, la riga eliminata viene temporaneamente mantenuta come tombstone nella tabella Delta sottostante e viene creata una vista nel metastore che filtra questi tombstone. L'intervallo di conservazione può essere configurato con
pipelines.cdc.tombstoneGCThresholdInSeconds proprietà della tabella.
Questa clausola è facoltativa.
APPLY AS TRUNCATE WHEN
Specifica quando un evento CDC deve essere considerato come una tabella completa TRUNCATE. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.
La clausola APPLY AS TRUNCATE WHEN è supportata solo per scD di tipo 1. Il tipo SCD 2 non supporta l'operazione di troncamento.
Questa clausola è facoltativa.
SEQUENCE BY
Nome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. DLT usa questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine.
La colonna specificata deve essere un tipo di dati ordinabile.
Questa clausola è obbligatoria.
COLUMNS
Specifica un subset di colonne da includere nella tabella di destinazione. È possibile:
  • Specificare l'elenco completo di colonne da includere: COLUMNS (userId, name, city).
  • Specificare un elenco di colonne da escludere: COLUMNS * EXCEPT (operation, sequenceNum)

Questa clausola è facoltativa.
L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando la clausola COLUMNS non è specificata.
STORED AS
Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2.
Questa clausola è facoltativa.
Il valore predefinito è SCD di tipo 1.
TRACK HISTORY ON
Specifica un subset di colonne di output per generare record di cronologia quando sono presenti modifiche a tali colonne specificate. È possibile:
  • Specificare l'elenco completo delle colonne da tenere traccia: COLUMNS (userId, name, city).
  • Specificare un elenco di colonne da escludere dal rilevamento: COLUMNS * EXCEPT (operation, sequenceNum)

Questa clausola è facoltativa. L'impostazione predefinita consiste nel tenere traccia della cronologia per tutte le colonne di output quando sono presenti modifiche, equivalenti a TRACK HISTORY ON *.