Opracowywanie kodu potoku przy użyciu języka SQL
DLT wprowadza kilka nowych słów kluczowych i funkcji SQL do definiowania zmaterializowanych widoków i tabel strumieniowych w potokach. Obsługa języka SQL na potrzeby tworzenia potoków opiera się na podstawach usługi Spark SQL i dodaje obsługę funkcji przesyłania strumieniowego ze strukturą.
Użytkownicy zaznajomieni z DataFrame PySpark mogą preferować tworzenie kodu potoku w Pythonie. Język Python obsługuje bardziej rozbudowane testowanie i operacje, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania. Zobacz Rozwijaj kod potoku za pomocą Pythona.
Aby uzyskać pełne odwołanie do składni języka DLT SQL, zobacz pełne odwołanie do składni języka DLT SQL.
Podstawy opracowywania potoków w języku SQL
Kod SQL tworzący zestawy danych DLT używa składni CREATE OR REFRESH
do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego dla wyników zapytania.
Słowo kluczowe STREAM
wskazuje, czy źródło danych, do których odwołuje się klauzula SELECT
, powinno być odczytywane za pomocą semantyki przesyłania strumieniowego.
Odczytuje i zapisuje domyślnie katalog i schemat określony podczas konfiguracji potoku. Zobacz Ustaw katalog docelowy i schemat.
Kod źródłowy DLT krytycznie różni się od skryptów SQL: biblioteka DLT ocenia wszystkie definicje zestawów danych we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzy graf przepływu danych przed uruchomieniem zapytań. Kolejność zapytań wyświetlanych w notesie lub skryscie definiuje kolejność oceny kodu, ale nie kolejność wykonywania zapytań.
Tworzenie zmaterializowanego widoku przy użyciu języka SQL
Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku przy użyciu języka SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Tworzenie tabeli przesyłania strumieniowego za pomocą języka SQL
Poniższy przykład kodu przedstawia podstawową składnię tworzenia tabeli strumieniowej w SQL.
Notatka
Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo, a niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Ładowanie danych z magazynu obiektów
Biblioteka DLT obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.
Notatka
W tych przykładach używane są dane dostępne w /databricks-datasets
, które są automatycznie zamontowane w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to są woluminy Unity Catalog?.
Databricks zaleca używanie funkcji Auto Loader i tabel strumieniowych przy konfigurowaniu zadań pozyskiwania przyrostowego z danych przechowywanych w chmurowym magazynie obiektów. Zobacz Co to jest moduł automatycznego ładowania?.
SQL używa funkcji read_files
do uruchamiania funkcji Auto Loader. Musisz również użyć słowa kluczowego STREAM
, aby skonfigurować odczyt strumieniowy za pomocą read_files
.
W poniższym przykładzie tworzona jest tabela strumieniowa z plików JSON przy użyciu Auto Loader.
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Funkcja read_files
obsługuje również semantyki wsadowe w celu tworzenia zmaterializowanych widoków. W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Walidacja danych względem oczekiwań
Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań dotyczących potoków.
Poniższy kod definiuje oczekiwania o nazwie valid_data
, które odrzucają rekordy o wartości null podczas pozyskiwania danych:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Wykonywanie zapytań dotyczących zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku
W poniższym przykładzie zdefiniowano cztery zestawy danych:
- Tabela strumieniowa o nazwie
orders
, która ładuje dane JSON. - Zmaterializowany widok o nazwie
customers
, który ładuje dane CSV. - Zmaterializowany widok o nazwie
customer_orders
, łączący rekordy z zestawów danychorders
icustomers
, przekształca znacznik czasu zamówienia na datę oraz wybiera polacustomer_id
,order_number
,state
iorder_date
. - Zmaterializowany widok o nazwie
daily_orders_by_state
, który agreguje dzienną liczbę zamówień dla każdego stanu.
Notatka
Podczas zapytań dotyczących widoków lub tabel w potoku danych, można określić katalog i schemat bezpośrednio lub użyć wartości domyślnych skonfigurowanych w potoku danych. W tym przykładzie tabele orders
, customers
i customer_orders
są zapisywane i odczytywane z domyślnego wykazu i schematu skonfigurowanego dla potoku.
Starszy tryb publikowania używa schematu LIVE
do wykonywania zapytań dotyczących innych zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku. W nowych potokach składnia schematu LIVE
jest ignorowana w trybie dyskretnym. Zobacz live schema (starsza wersja).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;