Megosztás a következőn keresztül:


Folyamatkód fejlesztése SQL-lel

A DLT számos új SQL-kulcsszót és függvényt vezet be a folyamat materializált nézeteinek és streamtábláinak meghatározásához. A folyamatok fejlesztésének SQL-támogatása a Spark SQL alapjaira épül, és támogatja a strukturált streamelési funkciókat.

A PySpark DataFrame-eket ismerő felhasználók szívesebben fejlesztenek folyamatkódot a Pythonnal. A Python szélesebb körű tesztelést és műveleteket támogat, amelyek az SQL-vel való implementálásuk során kihívást jelentenek, például metaprogramozási műveletek. Lásd: Folyamatkód fejlesztése Python-használatával.

A DLT SQL szintaxisának teljes referenciáját a DLT SQL nyelvi referenciacímű témakörben talál.

Az SQL alapjai folyamatfejlesztéshez

A DLT-adatkészleteket létrehozó SQL-kód a CREATE OR REFRESH szintaxissal definiálja a materializált nézeteket és a streamelési táblákat a lekérdezési eredmények alapján.

A STREAM kulcsszó azt jelzi, hogy a SELECT záradékban hivatkozott adatforrást streamelési szemantikával kell-e olvasni.

A folyamatkonfiguráció során megadott katalógusba és sémába beolvassa és beírja az alapértelmezett értéket. Lásd: Célkatalógus és sémabeállítása.

A DLT-forráskód kritikusan különbözik az SQL-szkriptektől: A DLT kiértékeli a folyamaton konfigurált összes forráskódfájl összes adathalmazdefinícióját, és létrehoz egy adatfolyam-gráfot a lekérdezések futtatása előtt. A jegyzetfüzetben vagy szkriptben megjelenő lekérdezések sorrendje határozza meg a kód kiértékelésének sorrendjét, a lekérdezések végrehajtásának sorrendjét azonban nem.

Materializált nézet létrehozása AZ SQL használatával

Az alábbi példakód bemutatja a materializált nézet SQL-sel való létrehozásának alapszintaxisát:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Streamelési tábla létrehozása SQL-lel

Az alábbi példakód bemutatja a streamelési tábla SQL-sel való létrehozásának alapszintaxisát:

Jegyzet

Nem minden adatforrás támogatja a streamelési olvasásokat, és egyes adatforrásokat mindig streamelési szemantikával kell feldolgozni.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Adatok betöltése objektumtárolóból

A DLT az Azure Databricks által támogatott összes formátumból támogatja az adatok betöltését. Lásd: Adatformátum beállításai.

Jegyzet

Ezek a példák az automatikusan a munkaterülethez csatlakoztatott /databricks-datasets-n elérhető adatokat használják. A Databricks kötetútvonalak vagy felhőalapú URI-k használatát javasolja a felhőobjektum-tárolóban tárolt adatokra való hivatkozáshoz. Lásd Mik azok a Unity Catalog-kötetek?.

A Databricks az automatikus betöltő és a streamelő táblák használatát javasolja, amikor növekményes betöltési számítási feladatokat konfigurál a felhőobjektum-tárolóban tárolt adatokhoz. Lásd Mi az automatikus betöltő?.

Az SQL a read_files függvénnyel hívja meg az Automatikus betöltő funkciót. A streamelési olvasást az STREAM és read_fileskulcsszavakkal is konfigurálnia kell.

Az alábbi példa létrehoz egy streamelési táblát JSON-fájlokból az Automatikus betöltő használatával:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

A read_files függvény a kötegelt szemantikát is támogatja a materializált nézetek létrehozásához. Az alábbi példa kötegfeldolgozási szemantikával olvas be egy JSON-könyvtárat, és létrehoz egy materializált nézetet.

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Adatok érvényesítése elvárásokkal

Az elvárásokkal adatminőségi korlátozásokat állíthat be és kényszeríthet ki. Lásd: Az adatminőség kezelése a folyamatokra vonatkozó elvárásokkal.

Az alábbi kód egy valid_data nevű elvárást határoz meg, amely az adatbetöltés során null értékű rekordokat ad le:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

A folyamat során definiált materializált nézetek és streamelési táblák lekérdezése

Az alábbi példa négy adatkészletet határoz meg:

  • Egy JSON-adatokat betöltő orders nevű streamelési tábla.
  • CSV-adatokat betöltő materializált nézet, amelynek neve customers.
  • A orders és customers adathalmazok rekordjait összekapcsoló, customer_orders nevű materializált nézet dátumra veti a rendelési időbélyeget, és kiválasztja a customer_id, order_number, stateés order_date mezőket.
  • Egy daily_orders_by_state nevű materializált nézet, amely összesíti az egyes államok rendeléseinek napi számát.

Jegyzet

A folyamat nézeteinek vagy tábláinak lekérdezésekor közvetlenül megadhatja a katalógust és a sémát, vagy használhatja a folyamatban konfigurált alapértelmezett beállításokat. Ebben a példában a orders, customersés customer_orders táblák a folyamathoz konfigurált alapértelmezett katalógusból és sémából vannak megírva és olvasva.

Az örökölt közzétételi mód a LIVE sémával kérdezi le a folyamatban definiált egyéb materializált nézeteket és streamtáblákat. Az új folyamatokban a LIVE sémaszintaxisa csendben figyelmen kívül lesz hagyva. Lásd: LIVE séma (örökölt).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;