Folyamatkód fejlesztése SQL-lel
A DLT számos új SQL-kulcsszót és függvényt vezet be a folyamat materializált nézeteinek és streamtábláinak meghatározásához. A folyamatok fejlesztésének SQL-támogatása a Spark SQL alapjaira épül, és támogatja a strukturált streamelési funkciókat.
A PySpark DataFrame-eket ismerő felhasználók szívesebben fejlesztenek folyamatkódot a Pythonnal. A Python szélesebb körű tesztelést és műveleteket támogat, amelyek az SQL-vel való implementálásuk során kihívást jelentenek, például metaprogramozási műveletek. Lásd: Folyamatkód fejlesztése Python-használatával.
A DLT SQL szintaxisának teljes referenciáját a DLT SQL nyelvi referenciacímű témakörben talál.
Az SQL alapjai folyamatfejlesztéshez
A DLT-adatkészleteket létrehozó SQL-kód a CREATE OR REFRESH
szintaxissal definiálja a materializált nézeteket és a streamelési táblákat a lekérdezési eredmények alapján.
A STREAM
kulcsszó azt jelzi, hogy a SELECT
záradékban hivatkozott adatforrást streamelési szemantikával kell-e olvasni.
A folyamatkonfiguráció során megadott katalógusba és sémába beolvassa és beírja az alapértelmezett értéket. Lásd: Célkatalógus és sémabeállítása.
A DLT-forráskód kritikusan különbözik az SQL-szkriptektől: A DLT kiértékeli a folyamaton konfigurált összes forráskódfájl összes adathalmazdefinícióját, és létrehoz egy adatfolyam-gráfot a lekérdezések futtatása előtt. A jegyzetfüzetben vagy szkriptben megjelenő lekérdezések sorrendje határozza meg a kód kiértékelésének sorrendjét, a lekérdezések végrehajtásának sorrendjét azonban nem.
Materializált nézet létrehozása AZ SQL használatával
Az alábbi példakód bemutatja a materializált nézet SQL-sel való létrehozásának alapszintaxisát:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Streamelési tábla létrehozása SQL-lel
Az alábbi példakód bemutatja a streamelési tábla SQL-sel való létrehozásának alapszintaxisát:
Jegyzet
Nem minden adatforrás támogatja a streamelési olvasásokat, és egyes adatforrásokat mindig streamelési szemantikával kell feldolgozni.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Adatok betöltése objektumtárolóból
A DLT az Azure Databricks által támogatott összes formátumból támogatja az adatok betöltését. Lásd: Adatformátum beállításai.
Jegyzet
Ezek a példák az automatikusan a munkaterülethez csatlakoztatott /databricks-datasets
-n elérhető adatokat használják. A Databricks kötetútvonalak vagy felhőalapú URI-k használatát javasolja a felhőobjektum-tárolóban tárolt adatokra való hivatkozáshoz. Lásd Mik azok a Unity Catalog-kötetek?.
A Databricks az automatikus betöltő és a streamelő táblák használatát javasolja, amikor növekményes betöltési számítási feladatokat konfigurál a felhőobjektum-tárolóban tárolt adatokhoz. Lásd Mi az automatikus betöltő?.
Az SQL a read_files
függvénnyel hívja meg az Automatikus betöltő funkciót. A streamelési olvasást az STREAM
és read_files
kulcsszavakkal is konfigurálnia kell.
Az alábbi példa létrehoz egy streamelési táblát JSON-fájlokból az Automatikus betöltő használatával:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
A read_files
függvény a kötegelt szemantikát is támogatja a materializált nézetek létrehozásához. Az alábbi példa kötegfeldolgozási szemantikával olvas be egy JSON-könyvtárat, és létrehoz egy materializált nézetet.
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Adatok érvényesítése elvárásokkal
Az elvárásokkal adatminőségi korlátozásokat állíthat be és kényszeríthet ki. Lásd: Az adatminőség kezelése a folyamatokra vonatkozó elvárásokkal.
Az alábbi kód egy valid_data
nevű elvárást határoz meg, amely az adatbetöltés során null értékű rekordokat ad le:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
A folyamat során definiált materializált nézetek és streamelési táblák lekérdezése
Az alábbi példa négy adatkészletet határoz meg:
- Egy JSON-adatokat betöltő
orders
nevű streamelési tábla. - CSV-adatokat betöltő materializált nézet, amelynek neve
customers
. - A
orders
éscustomers
adathalmazok rekordjait összekapcsoló,customer_orders
nevű materializált nézet dátumra veti a rendelési időbélyeget, és kiválasztja acustomer_id
,order_number
,state
ésorder_date
mezőket. - Egy
daily_orders_by_state
nevű materializált nézet, amely összesíti az egyes államok rendeléseinek napi számát.
Jegyzet
A folyamat nézeteinek vagy tábláinak lekérdezésekor közvetlenül megadhatja a katalógust és a sémát, vagy használhatja a folyamatban konfigurált alapértelmezett beállításokat. Ebben a példában a orders
, customers
és customer_orders
táblák a folyamathoz konfigurált alapértelmezett katalógusból és sémából vannak megírva és olvasva.
Az örökölt közzétételi mód a LIVE
sémával kérdezi le a folyamatban definiált egyéb materializált nézeteket és streamtáblákat. Az új folyamatokban a LIVE
sémaszintaxisa csendben figyelmen kívül lesz hagyva. Lásd: LIVE séma (örökölt).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;