Udostępnij za pośrednictwem


Opracowywanie kodu potoku przy użyciu języka SQL

DLT wprowadza kilka nowych słów kluczowych i funkcji SQL do definiowania zmaterializowanych widoków i tabel strumieniowych w potokach. Obsługa języka SQL na potrzeby tworzenia potoków opiera się na podstawach usługi Spark SQL i dodaje obsługę funkcji przesyłania strumieniowego ze strukturą.

Użytkownicy zaznajomieni z DataFrame PySpark mogą preferować tworzenie kodu potoku w Pythonie. Język Python obsługuje bardziej rozbudowane testowanie i operacje, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania. Zobacz Rozwijaj kod potoku za pomocą Pythona.

Aby uzyskać pełne odwołanie do składni języka DLT SQL, zobacz pełne odwołanie do składni języka DLT SQL.

Podstawy opracowywania potoków w języku SQL

Kod SQL tworzący zestawy danych DLT używa składni CREATE OR REFRESH do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego dla wyników zapytania.

Słowo kluczowe STREAM wskazuje, czy źródło danych, do których odwołuje się klauzula SELECT, powinno być odczytywane za pomocą semantyki przesyłania strumieniowego.

Odczytuje i zapisuje domyślnie katalog i schemat określony podczas konfiguracji potoku. Zobacz Ustaw katalog docelowy i schemat.

Kod źródłowy DLT krytycznie różni się od skryptów SQL: biblioteka DLT ocenia wszystkie definicje zestawów danych we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzy graf przepływu danych przed uruchomieniem zapytań. Kolejność zapytań wyświetlanych w notesie lub skryscie definiuje kolejność oceny kodu, ale nie kolejność wykonywania zapytań.

Tworzenie zmaterializowanego widoku przy użyciu języka SQL

Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku przy użyciu języka SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Tworzenie tabeli przesyłania strumieniowego za pomocą języka SQL

Poniższy przykład kodu przedstawia podstawową składnię tworzenia tabeli strumieniowej w SQL.

Notatka

Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo, a niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Ładowanie danych z magazynu obiektów

Biblioteka DLT obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.

Notatka

W tych przykładach używane są dane dostępne w /databricks-datasets, które są automatycznie zamontowane w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to są woluminy Unity Catalog?.

Databricks zaleca używanie funkcji Auto Loader i tabel strumieniowych przy konfigurowaniu zadań pozyskiwania przyrostowego z danych przechowywanych w chmurowym magazynie obiektów. Zobacz Co to jest moduł automatycznego ładowania?.

SQL używa funkcji read_files do uruchamiania funkcji Auto Loader. Musisz również użyć słowa kluczowego STREAM, aby skonfigurować odczyt strumieniowy za pomocą read_files.

W poniższym przykładzie tworzona jest tabela strumieniowa z plików JSON przy użyciu Auto Loader.

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Funkcja read_files obsługuje również semantyki wsadowe w celu tworzenia zmaterializowanych widoków. W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Walidacja danych względem oczekiwań

Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań dotyczących potoków.

Poniższy kod definiuje oczekiwania o nazwie valid_data, które odrzucają rekordy o wartości null podczas pozyskiwania danych:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Wykonywanie zapytań dotyczących zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku

W poniższym przykładzie zdefiniowano cztery zestawy danych:

  • Tabela strumieniowa o nazwie orders, która ładuje dane JSON.
  • Zmaterializowany widok o nazwie customers, który ładuje dane CSV.
  • Zmaterializowany widok o nazwie customer_orders, łączący rekordy z zestawów danych orders i customers, przekształca znacznik czasu zamówienia na datę oraz wybiera pola customer_id, order_number, statei order_date.
  • Zmaterializowany widok o nazwie daily_orders_by_state, który agreguje dzienną liczbę zamówień dla każdego stanu.

Notatka

Podczas zapytań dotyczących widoków lub tabel w potoku danych, można określić katalog i schemat bezpośrednio lub użyć wartości domyślnych skonfigurowanych w potoku danych. W tym przykładzie tabele orders, customersi customer_orders są zapisywane i odczytywane z domyślnego wykazu i schematu skonfigurowanego dla potoku.

Starszy tryb publikowania używa schematu LIVE do wykonywania zapytań dotyczących innych zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku. W nowych potokach składnia schematu LIVE jest ignorowana w trybie dyskretnym. Zobacz live schema (starsza wersja).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;