Megosztás a következőn keresztül:


DLT SQL nyelvi referencia

Ez a cikk a DLT SQL programozási felületének részleteit tartalmazza.

A Python felhasználó által definiált függvényeket (UDF-eket) használhatja az SQL-lekérdezésekben, de ezeket az UDF-eket Python-fájlokban kell definiálnia, mielőtt SQL-forrásfájlokban hívhatja meg őket. Lásd: felhasználó által definiált skaláris függvények – Python.

Korlátozások

A PIVOT záradék nem támogatott. A Spark pivot művelete megköveteli a bemeneti adatok lelkes betöltését a kimeneti séma kiszámításához. A DLT nem támogatja ezt a képességet.

DLT materializált nézet vagy streamelési tábla létrehozása

Jegyzet

A CREATE OR REFRESH LIVE TABLE szintaxis a materializált nézet létrehozásához elavult. Ehelyett használja a CREATE OR REFRESH MATERIALIZED VIEW.

Ugyanazt az alapszintű SQL-szintaxist használja streamelési tábla vagy materializált nézet deklarálásakor.

DLT materializált nézet deklarálása SQL-lel

Az alábbiakban a materializált nézetnek az SQL-sel rendelkező DLT-ben való deklarálásának szintaxisát ismerteti:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  CLUSTER BY clause
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

DLT-streamelési tábla deklarálása SQL-lel

Adatfolyam táblákat csak olyan lekérdezésekkel hozhat létre, amelyek egy adatfolyamforrásból olvasnak. A Databricks azt javasolja, hogy az Automatikus betöltő használatával streamelje a felhőobjektum-tárolóból származó fájlok betöltését. Lásd: Autobetöltő SQL-szintaxis.

Ha a csővezetékben más táblákat vagy nézeteket ad meg streamforrásként, az adathalmaz neve köré kell alkalmaznia a STREAM() függvényt.

Az alábbiakban egy streamelési tábla DLT-ben sql-lel való deklarálásának szintaxisát ismerteti:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [CLUSTER BY clause]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

DLT-nézet létrehozása

Az alábbiakban a nézetek SQL-sel való deklarálásának szintaxisát ismerteti:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Automatikus Betöltő SQL-szintaxisa

Az alábbiak az automatikus betöltő SQL-ben való használatának szintaxisát ismertetik:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value>",
      "<option-key>", "<option_value>",
      ...
    )
  )

Az Automatikus betöltővel támogatott formátumbeállításokat használhat. A map() függvénnyel a beállításokat átadhatja a read_files() metódusnak. A beállítások kulcs-érték párok, ahol a kulcsok és az értékek sztringek. A támogatási formátumokkal és beállításokkal kapcsolatos részletekért lásd Fájlformátum beállításai.

Példa: Táblák definiálása

Adathalmazt úgy hozhat létre, hogy egy külső adatforrásból vagy egy folyamatban definiált adathalmazból olvas be. Belső adatkészletből való olvasáshoz adja meg azt a táblanevet, amely a konfigurált folyamat alapértelmezett beállításait fogja használni a katalógushoz és a sémához. A következő példa két különböző adatkészletet határoz meg: egy taxi_raw nevű táblát, amely egy JSON-fájlt fogad bemeneti forrásként, és egy filtered_data nevű táblát, amely bemenetként veszi fel a taxi_raw táblát:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM taxi_raw

Példa: Olvasás streamelési forrásból

Ha streamelési forrásból, például automatikus betöltőből vagy belső adatkészletből szeretne adatokat olvasni, definiáljon egy STREAMING táblát:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

További információért az adatfolyamokról, tekintse meg a Folyamatokkal történő adatátalakítást.

Rekordok végleges törlése materializált nézetből vagy streamelési táblából

Ha véglegesen törölni szeretné a rekordokat egy materializált nézetből vagy streamelőtáblából, és engedélyezve van a törlési vektorok használata, például a GDPR-megfelelőség érdekében, további műveleteket kell végrehajtani az objektum alapjául szolgáló Delta-táblákon. A rekordok materializált nézetből való törlésének biztosításához lásd: Rekordok végleges törlése materializált nézetben, engedélyezett törlési vektorokkal. A rekordok streamelési táblából való törlésének biztosításához lásd: Rekordok végleges törlése streamelési táblából.

A táblák materializálásának irányítása

A táblák a materializációjuk további szabályozását is lehetővé teszi:

Jegyzet

Az 1 TB-nál kisebb méretű táblák esetében a Databricks azt javasolja, hogy a DLT vezérelje az adatszervezést. Hacsak nem számít arra, hogy a tábla terabájton túl nő, a Databricks azt javasolja, hogy ne adjon meg partícióoszlopokat.

Példa: Séma és fürtoszlopok megadása

Tábla definiálásakor igény szerint megadhatja a sémát. Az alábbi példa a céltábla sémáját határozza meg, beleértve a Delta Lake generált oszlopokat, és definiálja a tábla fürtözési oszlopait.

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Alapértelmezés szerint a DLT a sémát a table definíciójából következteti, ha nem ad meg sémát.

Példa: Partícióoszlopok megadása

A tábla partícióoszlopait igény szerint megadhatja:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

A folyékony klaszterezés rugalmas, optimalizált megoldást biztosít a csoportosításhoz. Fontolja meg a CLUSTER BY használatát a DLT-PARTITIONED BY helyett.

Példa: Táblakorlátozások definiálása

Jegyzet

A táblakorlátozásokhoz tartozó DLT-támogatás nyilvános előzetes verzióban. A táblakorlátozások meghatározásához a folyamatnak unitykatalógus-kompatibilis folyamatnak kell lennie, és konfigurálva kell lennie a preview csatorna használatára.

Séma megadásakor megadhatja az elsődleges és az idegen kulcsokat. A korlátozások tájékoztató jellegűek, és nincsenek kényszerítve. Tekintse meg a CONSTRAINT záradékot az SQL nyelvi hivatkozásában.

Az alábbi példa egy elsődleges és idegenkulcs-korlátozással rendelkező táblát határoz meg:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Táblák vagy nézetek SQL-sel történő deklarálásakor használt értékek paraméterezése

A SET használatával megadhat egy konfigurációs értéket egy táblát vagy nézetet deklaráló lekérdezésben, beleértve a Spark-konfigurációkat is. A jegyzetfüzetben definiált bármely tábla vagy nézet, miután a SET utasítás hozzáfér a megadott értékhez. A SET utasítással megadott Spark-konfigurációk a Spark-lekérdezés végrehajtásakor használhatók a SET utasítást követő bármely táblához vagy nézethez. A konfigurációs érték lekérdezésben való olvasásához használja a sztring interpolációs szintaxisát ${}. Az alábbi példa egy startDate nevű Spark-konfigurációs értéket állít be, és ezt az értéket használja egy lekérdezésben:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Több konfigurációs érték megadásához minden értékhez használjon külön SET utasítást.

Példa: Sorszűrő és oszlopmaszk definiálása

Fontos

A sorszűrők és az oszlopmaszkok nyilvános előzetes verziójú.

Ha materializált nézetet vagy streamelési táblát szeretne létrehozni sorszűrővel és oszlopmaszkkal, használja a ROW FILTER záradékot és a MASZK záradékot. Az alábbi példa bemutatja, hogyan definiálhat materializált nézetet és streamelési táblát sorszűrővel és oszlopmaszkkal is:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

További információ a sorszűrőkről és az oszlopmaszkokról: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.

SQL-tulajdonságok

CREATE TABLE vagy nézet
TEMPORARY
Hozzon létre egy táblát, de ne tegye közzé a táblához tartozó metaadatokat. A TEMPORARY záradék arra utasítja a DLT-t, hogy hozzon létre egy táblát, amely elérhető a folyamat számára, de nem érhető el a folyamaton kívül. A feldolgozási idő csökkentése érdekében egy ideiglenes tábla megmarad az azt létrehozó folyamat teljes élettartama alatt, és nem csak egyetlen frissítéssel.
STREAMING
Hozzon létre egy táblázatot, amely streamként olvas be egy bemeneti adathalmazt. A bemeneti adatkészletnek streamelési adatforrásnak kell lennie, például automatikus betöltőnek vagy STREAMING táblának.
CLUSTER BY
Engedélyezze a "liquid clustering" funkciót a táblában, és határozza meg a fürtözési kulcsként használni kívánt oszlopokat.
Lásd A Delta-táblákfolyékony fürtözésének használata című témakört.
PARTITIONED BY
A tábla particionálásához használandó egy vagy több oszlop választható listája.
LOCATION
A táblaadatok opcionális tárolási helye. Ha nincs beállítva, a rendszer alapértelmezés szerint a folyamat tárolási helyére kerül.
COMMENT
A táblázat opcionális leírása.
column_constraint
Az oszlopon opcionális elsődleges kulcs vagy idegen kulcs megszorítás lehet.
MASK clause (nyilvános előzetes verzió)
Olyan oszlopmaszk funkciót ad hozzá, amely lehetővé teszi a bizalmas adatok anonimizálását. Az oszlop jövőbeli lekérdezései a kiértékelt függvény eredményét adják vissza az oszlop eredeti értéke helyett. Ez hasznos lehet a részletes hozzáférés-vezérléshez, mert a függvény ellenőrizheti a felhasználó identitását és csoporttagságait annak eldöntésére, hogy kitakarja-e az értéket.
Lásd a oszlop mask záradék.
table_constraint
A táblához tartozó opcionális elsődleges kulcs vagy idegen kulcs információs korlátozás.
TBLPROPERTIES
A tábla táblatulajdonságainak választható listája.
WITH ROW FILTER clause (nyilvános előzetes verzió)
Sorszűrő függvényt ad hozzá a táblához. A tábla jövőbeli lekérdezései azoknak a soroknak a részhalmazát kapják meg, amelyekre a függvény IGAZ értéket ad. Ez a részletes hozzáférés-vezérléshez hasznos, mert lehetővé teszi, hogy a függvény megvizsgálja a behívó felhasználó identitását és csoporttagságát, hogy eldöntse, szűr-e bizonyos sorokat.
Lásd ROW FILTER záradék.
select_statement
A tábla adatkészletét meghatározó DLT-lekérdezés.
CONSTRAINT záradék
EXPECT expectation_name
Adatminőségi korlátozás definiálása expectation_name. Ha a ON VIOLATION kényszer nincs definiálva, adjon hozzá olyan sorokat, amelyek megsértik a korlátozást a céladatkészletben.
ON VIOLATION
A sikertelen sorok esetében opcionális művelet végrehajtása.
  • FAIL UPDATE: A folyamat végrehajtásának azonnali leállítása.
  • DROP ROW: Dobd el a rekordot és folytasd a feldolgozást.

Adatrögzítés módosítása az SQL használatával a DLT-ben

Használja a APPLY CHANGES INTO utasítást a DLT CDC funkcióinak használatához, az alábbiak szerint:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Egy APPLY CHANGES-cél adatminőségi korlátozásait ugyanazzal a CONSTRAINT záradékkal határozhatja meg, mint a nemAPPLY CHANGES lekérdezéseket. Lásd: Az adatminőség kezelése az adatcsatornákkal kapcsolatos elvárásokkal.

Jegyzet

Az INSERT és UPDATE események alapértelmezett viselkedése az, hogy a forrás CDC eseményeit frissíti vagy beszúrja: frissíti a céltábla minden olyan sorát, amely megfelel a megadott kulcs(ok)nak, vagy új sort szúr be, ha nem található egyező rekord a céltáblában. A DELETE események kezelése a APPLY AS DELETE WHEN feltétellel adható meg.

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A APPLY CHANGES céltábla sémájának megadásakor a __START_AT és __END_AT oszlopokat is meg kell adnia, és az adattípusa megegyezik a sequence_by mezővel.

Lásd: A MÓDOSÍTÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése A DLThasználatával.

Záradékok
KEYS
Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira.
Az oszlopok kombinációjának meghatározásához használjon vesszővel tagolt oszloplistát.
Ez a záradék kötelező.
IGNORE NULL UPDATES
A céloszlopok egy részhalmazát tartalmazó frissítések betöltésének engedélyezése. Ha egy CDC-esemény megfelel egy meglévő sornak, és be van állítva a NULL FRISSÍTÉSEK FIGYELMEN KÍVÜL HAGYÁSA, akkor az ilyen null-ával rendelkező oszlopok megtartják meglévő értékeiket a célban. Ez a nullértékkel rendelkező beágyazott oszlopokra is vonatkozik.
Ez a záradék nem kötelező.
Az alapértelmezett érték a meglévő oszlopok felülírása null értékekkel.
APPLY AS DELETE WHEN
Azt határozza meg, hogy a CDC-eseményeket mikor kell DELETE ként kezelni, nem pedig upsertként. A rendelésen kívüli adatok kezeléséhez a törölt sor ideiglenesen sírkőként marad meg az alapul szolgáló Delta-táblában, és létrejön egy nézet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz konfigurálható a következővel:
pipelines.cdc.tombstoneGCThresholdInSeconds táblatulajdonság.
Ez a záradék nem kötelező.
APPLY AS TRUNCATE WHEN
Megadja, hogy a CDC-eseményeket mikor kell teljes táblaként kezelni TRUNCATE. Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható.
A APPLY AS TRUNCATE WHEN záradék csak az 1. SCD-típus esetében támogatott. A 2. SCD-típus nem támogatja a csonkolási műveletet.
Ez a záradék nem kötelező.
SEQUENCE BY
Az oszlop neve, amely a CDC-események logikai sorrendjét adja meg a forrásadatokban. A DLT a szekvenálást arra használja, hogy kezelje a nem sorrendben érkező módosítási eseményeket.
A megadott oszlopnak rendezhető adattípusnak kell lennie.
Ez a záradék kötelező.
COLUMNS
A céltáblában szerepeltetni kívánt oszlopok egy részhalmazát adja meg. A következőkre van lehetőség:
  • Adja meg a belefoglalandó oszlopok teljes listáját: COLUMNS (userId, name, city).
  • Adja meg a kizárandó oszlopok listáját: COLUMNS * EXCEPT (operation, sequenceNum)

Ez a záradék nem kötelező.
Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha a COLUMNS záradék nincs megadva.
STORED AS
A rekordok tárolása 1. vagy 2. SCD-típusként.
Ez a záradék nem kötelező.
Az alapértelmezett scd típus 1.
TRACK HISTORY ON
A kimeneti oszlopok egy részhalmazát adja meg, amely előzményrekordokat hoz létre a megadott oszlopok módosításakor. A következőkre van lehetőség:
  • Adja meg a nyomon követni kívánt oszlopok teljes listáját: COLUMNS (userId, name, city).
  • Adja meg a nyomon követésből kizárandó oszlopok listáját: COLUMNS * EXCEPT (operation, sequenceNum)

Ez a záradék nem kötelező. Az alapértelmezett beállítás az összes kimeneti oszlop előzményeinek nyomon követése, ha bármilyen változás történt, ami a TRACK HISTORY ON *felel meg.