Справочник по языку DLT SQL
В этой статье содержатся сведения о интерфейсе программирования DLT SQL.
- Дополнительные сведения об API Python см. в справочникепо языку Python DLT.
- Дополнительные сведения о командах SQL см. в справочнике по языку SQL.
В запросах SQL можно использовать определяемые пользователем функции Python, но перед их вызовом в исходных файлах SQL необходимо определить эти определяемые пользователем функции Python. См. определяемые пользователем скалярные функции — Python.
Ограничения
Предложение PIVOT
не поддерживается. Операция pivot
в Spark требует активной загрузки входных данных для вычисления выходной схемы. Эта возможность не поддерживается в DLT.
создание материализованного представления или потоковой таблицы DLT
Заметка
Синтаксис CREATE OR REFRESH LIVE TABLE
для создания материализованного представления не рекомендуется. Вместо этого используйте CREATE OR REFRESH MATERIALIZED VIEW
.
При объявлении таблицы потоковой передачи или материализованного представления используется тот же базовый синтаксис SQL.
Объявление материализованного представления DLT с помощью SQL
Ниже описан синтаксис объявления материализованного представления в DLT с помощью SQL:
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
CLUSTER BY clause
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Объявление таблицы потоковой передачи DLT с помощью SQL
Таблицы потоковой передачи можно объявлять только с помощью запросов, которые обращаются к источнику потоковой передачи. Databricks рекомендует использовать Auto Loader для потоковой загрузки файлов из облачного хранилища объектов. См. синтаксис автозагрузчика SQL.
При указании других таблиц или представлений в конвейере в качестве источников потоковой передачи необходимо включить функцию STREAM()
вокруг имени набора данных.
Ниже описан синтаксис объявления таблицы потоковой передачи в DLT с помощью SQL:
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[CLUSTER BY clause]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
создание представления DLT
Ниже описан синтаксис для объявления представлений с помощью SQL:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
синтаксис автозагрузчика SQL
Ниже описан синтаксис для работы с автозагрузчиком в SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value>",
"<option-key>", "<option_value>",
...
)
)
Поддерживаемые параметры форматирования можно использовать с автозагрузчиком. С помощью функции map()
можно передать параметры в метод read_files()
. Параметры — это пары "ключ-значение", в которых ключи и значения являются строками. Для получения дополнительных сведений о поддерживаемых форматах и параметрах см. параметры формата файлов.
Пример. Определение таблиц
Вы можете создать набор данных, считывая из внешнего источника данных или из наборов данных, определенных в конвейере. Чтобы прочитать из внутреннего набора данных, укажите имя таблицы, которая будет использовать настройки по умолчанию для конвейера, каталога и схемы. В следующем примере определяются два разных набора данных: таблица с именем taxi_raw
, которая принимает JSON-файл в качестве источника входных данных и таблицу с именем filtered_data
, которая принимает таблицу taxi_raw
в качестве входных данных:
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM taxi_raw
Пример. Чтение из источника потоковой передачи
Чтобы считывать данные из источника потоковой передачи, например автозагрузчик или внутренний набор данных, определите таблицу STREAMING
:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Дополнительные сведения о потоковой передаче данных см. в разделе Преобразование данных с помощью конвейеров.
Окончательное удаление записей из материализованного представления или потоковой таблицы
Чтобы окончательно удалить записи из материализованного представления или потоковой таблицы с включенными векторами удаления, например для соответствия GDPR, необходимо выполнить дополнительные операции в базовых таблицах Delta объекта. Чтобы обеспечить удаление записей из материализованного представления, см. безвозвратное удаление записей из материализованного представления с включением векторов удаления. Чтобы гарантировать удаление записей из потоковой таблицы, см. как окончательно удалить записи из потоковой таблицы.
Управление материализацией таблиц
Таблицы также предлагают дополнительный контроль над их материализацией:
- Укажите, как с помощью
CLUSTER BY
кластеризовать таблицы . Для ускорения запросов можно использовать кластеризацию жидкости. См. Использование кластеризации жидкости для таблиц Delta. - Укажите, как таблицы секционированы с помощью
PARTITIONED BY
. - Свойства таблицы можно задать с помощью
TBLPROPERTIES
. См. свойства таблицы DLT. - Задайте расположение хранилища с помощью параметра
LOCATION
. По умолчанию данные таблицы хранятся в месте хранения конвейера, еслиLOCATION
не задан. - В определении схемы можно использовать созданные столбц ы. См. Пример: Укажите схему и столбцы кластера.
Заметка
Для таблиц меньше 1 ТБ по размеру Databricks рекомендует доверить управление организацией данных DLT. Если вы не ожидаете, что таблица будет расти за пределами терабайта, Databricks рекомендует не указывать столбцы секций.
Пример: Указать схему и столбцы кластера
При выборе таблицы можно указать схему. В следующем примере указывается схема целевой таблицы, включая использование Delta Lake сгенерированных столбцов, а также определяются столбцы кластеризации для таблицы.
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
По умолчанию DLT определяет схему из определения table
, если не указать схему.
Пример : Укажите столбцы секции
При необходимости можно указать столбцы разбиения для таблицы.
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Кластеризация Liquid предоставляет гибкое оптимизированное решение для кластеризации. Рекомендуется использовать CLUSTER BY
вместо PARTITIONED BY
для DLT.
пример . Определение ограничений таблицы
Заметка
Поддержка DLT для ограничений таблиц находится в общедоступной предварительной версии . Чтобы определить ограничения таблицы, конвейер должен быть конвейером с поддержкой каталога Unity и настроен для использования канала preview
.
При указании схемы можно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. пункт CONSTRAINT в справочнике языка SQL.
В следующем примере определяется таблица с ограничением первичного и внешнего ключа:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Параметризация значений, используемых при объявлении таблиц или представлений с помощью SQL
Используйте SET
для указания значения конфигурации в запросе, объявляющего таблицу или представление, включая конфигурации Spark. Любая таблица или представление, определенное в записной книжке после инструкции SET
, имеет доступ к заданному значению. Все конфигурации Spark, указанные с помощью инструкции SET
, используются при выполнении запроса Spark для любой таблицы или представления после инструкции SET. Чтобы считывать значение конфигурации в запросе, используйте синтаксис интерполяции строк ${}
. Следующий пример задает значение конфигурации Spark с именем startDate
и использует это значение в запросе:
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Чтобы указать несколько значений конфигурации, используйте отдельную инструкцию SET
для каждого значения.
пример. Определение фильтра строк и маски столбцов
Чтобы создать материализованное представление или таблицу потоковой передачи с фильтром строк и маской столбцов, используйте предложение ROW FILTER и предложение MASK. В следующем примере показано, как определить материализованное представление и потоковую таблицу с фильтрацией строк и маскировкой столбцов:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze
Дополнительные сведения о фильтрах строк и масках столбцов смотрите в разделе публикации таблиц с фильтрами строк и масками столбцов.
Свойства SQL
CREATE TABLE или VIEW |
---|
TEMPORARY Создайте таблицу, но не публикуйте метаданные для таблицы. Положение TEMPORARY инструктирует DLT создать таблицу, которая будет доступна для конвейера, но не должна быть доступна за его пределами. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления. |
STREAMING Создайте таблицу, которая считывает входной набор данных в виде потока. Входной набор данных должен быть источником потоковых данных, например автозагрузчиком или таблицей STREAMING . |
CLUSTER BY Включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. Использование кластеризации жидкости для таблиц Delta. |
PARTITIONED BY Необязательный список одного или нескольких столбцов, используемых для секционирования таблицы. |
LOCATION Необязательное расположение хранилища для данных таблицы. Если не задано, система по умолчанию выберет местоположение для хранения конвейера. |
COMMENT Необязательное описание таблицы. |
column_constraint Необязательный информационный первичный ключ или ограничение внешнего ключа для столбца. |
MASK clause (общедоступная предварительная версия)Добавляет функцию маски столбца для анонимизации конфиденциальных данных. Будущие запросы для этого столбца возвращают результат вычисляемой функции вместо исходного значения столбца. Это полезно для точного управления доступом, так как функция может проверить удостоверение пользователя и его членство в группах, чтобы решить, следует ли редактировать значение. См. столбец mask пункт. |
table_constraint Необязательный информационный первичный ключ или ограничение внешнего ключа в таблице. |
TBLPROPERTIES Необязательный список свойств для таблицы. |
WITH ROW FILTER clause (общедоступная предварительная версия)Добавляет функцию фильтра строк в таблицу. Будущие запросы для этой таблицы получают подмножество строк, для которых функция оценивается как TRUE. Это полезно для точного управления доступом, поскольку это позволяет функции проверять удостоверение и принадлежность к группам пользователя, выполняющего вызов, чтобы решить, следует ли фильтровать определенные строки. См. пункт ROW FILTER . |
select_statement Запрос DLT, определяющий набор данных для таблицы. |
пункт CONSTRAINT |
---|
EXPECT expectation_name Определение ограничения качества данных expectation_name . Если ограничение ON VIOLATION не определено, добавьте строки, которые нарушают ограничение целевого набора данных. |
ON VIOLATION Необязательное действие в случае неудачных строк:
|
отслеживание изменений данных с помощью SQL в DLT
Используйте инструкцию APPLY CHANGES INTO
для использования функции DLT CDC, как описано в следующем разделе:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Вы определяете ограничения качества данных для целевого объекта APPLY CHANGES
с помощью того же предложения CONSTRAINT
, что и запросы, отличные отAPPLY CHANGES
. См. Управление качеством данных с использованием ожиданий для обработки данных.
Заметка
Поведение по умолчанию для событий INSERT
и UPDATE
заключается в том, чтобы выполнялись вставки или обновления событий CDC из источника: обновляются все строки в целевой таблице, соответствующие указанным ключам, или добавляется новая строка, если соответствующая запись не существует в целевой таблице. Обработку событий DELETE
можно указать с условием APPLY AS DELETE WHEN
.
Важный
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы APPLY CHANGES
необходимо также включить столбцы __START_AT
и __END_AT
с тем же типом данных, что и поле sequence_by
.
См. API APPLY CHANGES: упрощение захвата изменений данных с помощью DLT.
Клаузулы |
---|
KEYS Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице. Чтобы определить сочетание столбцов, используйте разделенный запятыми список столбцов. Этот пункт является обязательным. |
IGNORE NULL UPDATES Разрешить прием обновлений, содержащих подмножество целевых столбцов. При совпадении события CDC с существующей строкой и параметром IGNORE NULL UPDATES столбцы с null будут хранить существующие значения в целевом объекте. Это также относится к вложенным столбцам со значением null .Это предложение является необязательным. Значение по умолчанию — перезаписать существующие столбцы со значениями null . |
APPLY AS DELETE WHEN Указывает, когда событие CDC следует рассматривать как DELETE , а не как upsert. Чтобы обрабатывать данные, поступающие вне очереди, строка, отмеченная как удалённая, временно сохраняется в виде маркера в базовой таблице Delta, а представление создаётся в хранилище метаданных, которое фильтрует эти маркеры. Интервал хранения можно настроить с помощьюpipelines.cdc.tombstoneGCThresholdInSeconds
свойства таблицы.Это предложение является необязательным. |
APPLY AS TRUNCATE WHEN Указывает, когда событие CDC следует рассматривать как полную таблицу TRUNCATE . Так как это предложение активирует полную очистку целевой таблицы, его следует использовать только для конкретных случаев, когда необходимо воспользоваться этой функцией.Клаузула APPLY AS TRUNCATE WHEN поддерживается только для типа SCD 1. SCD типа 2 не поддерживает операцию усечения.Это предложение является необязательным. |
SEQUENCE BY Имя столбца, указывающее логический порядок событий CDC в исходных данных. DLT использует эту последовательность для обработки событий изменений, поступающих в неправильном порядке. Указанный столбец должен быть сортируемым типом данных. Этот пункт является обязательным. |
COLUMNS Указывает подмножество столбцов для включения в целевую таблицу. Вы можете выполнить следующие действия:
Это предложение является необязательным. По умолчанию необходимо включить все столбцы в целевую таблицу, если предложение COLUMNS не указано. |
STORED AS Следует ли хранить записи как SCD типа 1 или SCD типа 2. Это предложение является необязательным. Значение по умолчанию — SCD тип 1. |
TRACK HISTORY ON Задает подмножество выходных столбцов для создания записей журнала при наличии изменений в указанных столбцах. Вы можете выполнить следующие действия:
Это предложение является необязательным. Значение по умолчанию — отслеживать историю для всех выходных столбцов при наличии изменений, что эквивалентно TRACK HISTORY ON * . |