Использование жидкой кластеризации для таблиц Delta
Жидкостная кластеризация Delta Lake заменяет секционирование таблиц и ZORDER
упрощает принятие решений относительно структуры данных и оптимизирует производительность запросов. При "жидкой" кластеризации можно гибко переопределять ключи кластеризации, не переписывая существующие данные. Это дает возможность развивать макет данных со временем в соответствии с задачами аналитики. Кластеризация Liquid применяется как к таблицам потоковой передачи, так и к материализованным представлениям.
Внимание
Databricks рекомендует использовать Databricks Runtime 15.2 и выше для всех таблиц с включенным кластеризациям жидкости. Поддержка общедоступной предварительной версии с ограничениями доступна в Databricks Runtime 13.3 LTS и выше.
Примечание.
Таблицы с поддержкой жидкой кластеризации поддерживают одновременную обработку на уровне строк в Databricks Runtime 13.3 LTS и выше. Параллелизм на уровне строк обычно доступен в Databricks Runtime 14.2 и выше для всех таблиц с включенными векторами удаления. См. информацию об уровнях изоляции и конфликтах записи в Azure Databricks.
Для чего используется кластеризация жидкости?
Databricks рекомендует использовать кластеризацию типа "liquid" для всех новых таблиц Delta, включая как потоковые таблицы (ST), так и материализованные представления (MV). Ниже приведены примеры сценариев, в которых удобно использовать кластеризацию:
- Таблицы, которые часто фильтруются по столбцам высокой кратности.
- Таблицы со значительным неравномерным распределением данных.
- Таблицы, размер которых быстро увеличивается и которые нуждаются в настройке и обслуживании.
- Таблицы, в которых требуются параллельные записи.
- Таблицы с меняющимися шаблонами доступа.
- Таблицы, в которых использование типичного ключа раздела может привести к слишком большому или слишком малому числу разделов.
Включение кластеризации жидкости
Вы можете включить кластеризацию жидкости в существующей таблице или во время создания таблицы. Кластеризация несовместима с секционированием или ZORDER
, и требует использования Azure Databricks для управления всеми операциями макета и оптимизации данных в таблице. После включения кластеризации данных, запускайте задания OPTIMIZE
как обычно для постепенной кластеризации данных. Узнайте , как активировать кластеризацию.
Чтобы включить кластеризацию жидкости, добавьте фразу CLUSTER BY
в инструкцию создания таблицы, как показано в приведенных ниже примерах:
Примечание.
В Databricks Runtime 14.2 и более поздних версиях можно использовать API кадра данных и API DeltaTable в Python или Scala для включения кластеризации жидкости.
SQL
-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);
-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;
-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
Python
# Create an empty table
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Scala
// Create an empty table
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
В Databricks Runtime 16.0 и более поздних версиях вы можете создавать таблицы с использованием технологии liquid clustering, применяя записи из структурированных потоков, например:
Python
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
Scala
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Предупреждение
Таблицы, созданные с поддержкой жидкой кластеризации, включают множество функций Delta-таблицы при создании и используют Delta Writer версии 7 и Reader версии 3. Вы можете переопределить включение некоторых из этих функций. См. Переопределение включения функций по умолчанию (необязательно).
Версии протокола таблиц не могут быть понижены, а таблицы с включенной кластеризацией недоступны для чтения клиентами Delta Lake, которые не поддерживают все включенные функции протокола чтения Delta. См. статью Как Azure Databricks управляет совместимостью функций Delta Lake?.
Вы можете включить кластеризацию жидкости в существующей непартиментной таблице Delta с помощью следующего синтаксиса:
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
Внимание
Поведение по умолчанию не применяет кластеризацию к ранее записанным данным. Чтобы принудительно перекластеризовать все записи, необходимо использовать OPTIMIZE FULL
. См. раздел Принудительная перекластеризация для всех записей.
Чтобы удалить ключи кластеризации, используйте следующий синтаксис:
ALTER TABLE table_name CLUSTER BY NONE;
автоматическая кластеризация жидкости
Внимание
Автоматическая кластеризация жидкости находится в общедоступной предварительной версии.
В Databricks Runtime 15.4 LTS и более поздних версиях можно включить автоматическое жидкое кластерирование для таблиц, управляемых каталогом Unity. При включенной автоматической кластеризации данных Azure Databricks интеллектуально определяет ключи кластеризации для оптимизации производительности запросов. Вы включаете автоматическую кластеризацию жидкости при помощи предложения CLUSTER BY AUTO
.
При включении автоматические операции выбора ключей и кластеризации выполняются асинхронно в качестве операции обслуживания и требуют включения прогнозной оптимизации для таблицы. См прогнозную оптимизацию для управляемых таблиц каталога Unity.
Чтобы определить ключи кластеризации, Azure Databricks анализирует историческую рабочую нагрузку запроса для таблицы и определяет лучшие столбцы кандидатов. Ключи кластеризации изменяются, когда прогнозируемая экономия затрат от пропуска данных перевешивает затраты на кластеризацию данных.
Если способ запроса ваших данных изменяется со временем или производительность запроса указывает на изменения в распределении данных, автоматическая кластеризация жидкости выбирает новые ключи для оптимизации производительности.
Примечание.
Таблицы можно читать или записывать с поддержкой автоматического кластеризации из всех версий среды выполнения Databricks, поддерживающих кластеризацию жидкости, но интеллектуальный выбор ключей зависит от метаданных, представленных в Databricks Runtime 15.4 LTS. Используйте Databricks Runtime 15.4 LTS или более поздней версии, чтобы гарантировать, что автоматически выбранные ключи используют все ваши рабочие нагрузки и что эти рабочие нагрузки учитываются при выборе новых ключей.
Включение или отключение автоматической кластеризации
Чтобы создать новую таблицу с включенной автоматической кластеризации жидкости, используйте следующий синтаксис:
CREATE OR REPLACE TABLE table_name CLUSTER BY AUTO;
Вы также можете включить автоматическую динамическую кластеризацию в существующей таблице, включая таблицы, для которых ранее вручную задавались ключи, как показано в следующем примере:
ALTER TABLE table_name CLUSTER BY AUTO;
Вы также можете изменять таблицы с автоматическим жидкостным кластеризованием, чтобы использовать ключи, указанные вручную.
Примечание.
Свойство clusterByAuto
имеет значение true
при включении автоматической кластеризации жидкости.
Свойство clusteringColumns
отображает текущие столбцы кластеризации, выбранные автоматическим выбором ключа. Запустите DESCRIBE EXTENDED table_name
, чтобы просмотреть полный список свойств таблицы.
Переопределить включение функции по умолчанию (необязательно)
Вы можете переопределить поведение по умолчанию, которое включает функции разностной таблицы во время включения кластеризации жидкости. Это предотвращает обновление протоколов чтения и записи, связанных с этими функциями таблицы. Чтобы выполнить следующие действия, необходимо создать существующую таблицу:
Используется
ALTER TABLE
для задания свойства таблицы, которое отключает одну или несколько функций. Например, чтобы отключить векторы удаления, выполните следующие действия:ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
Включите кластеризацию жидкости в таблице, выполнив следующие действия:
ALTER TABLE <table_name> CLUSTER BY (<clustering_columns>)
В следующей таблице приведены сведения о функциях Delta, которые можно переопределить и как включение влияет на совместимость с версиями среды выполнения Databricks.
Функция Delta | Совместимость среды выполнения | Свойство для переопределения активации | Влияние отключения на кластеризацию жидкости |
---|---|---|---|
Векторы удаления | Для чтения и записи требуется Databricks Runtime 12.2 LTS и более поздних версий. | 'delta.enableDeletionVectors' = false |
Параллелизм на уровне строк отключен, что делает транзакции и операции кластеризации более вероятными для конфликта. См. статью "Конфликты записи с параллелизмом на уровне строк".DELETE , MERGE и UPDATE команды могут выполняться медленнее. |
Отслеживание строк данных | Записи требуют Databricks Runtime 13.3 LTS и более поздних версий. Можно считывать из любой версии Databricks Runtime. | 'delta.enableRowTracking' = false |
Параллелизм на уровне строк отключен, что делает транзакции и операции кластеризации более вероятными для конфликта. См. "Конфликты записи при параллельности на уровне строк". |
Контрольные точки версии 2 | Для чтения и записи требуется Databricks Runtime 13.3 LTS и более поздних версий. | 'delta.checkpointPolicy' = 'classic' |
Не влияет на поведение кластеризации жидкости. |
Выбор ключей кластеризации
Databricks рекомендует автоматическую кластеризацию жидкости для поддерживаемых таблиц. См. автоматическая кластеризация жидкости.
Databricks рекомендует выбирать ключи кластеризации на основе столбцов, наиболее часто используемых в фильтрах запросов. Ключи кластеризации можно определить в любом порядке. Если два столбца очень коррелируются, необходимо включить только один из них в качестве ключа кластеризации.
Можно указать до четырех ключей кластеризации. Для небольших таблиц (менее 10 ТБ) использование дополнительных ключей кластеризации (например, четыре) может снизить производительность при фильтрации по одному столбцу по сравнению с использованием меньшего количества ключей кластеризации (например, двух). Однако по мере увеличения размера таблицы разница в производительности с использованием дополнительных ключей кластеризации для запросов с одним столбцом становится незначительной.
Можно указать только столбцы, которые собирают статистику в качестве ключей кластеризации. По умолчанию в таблице Delta сбор статистики производится для первых 32 столбцов. См. раздел "Указание столбцов статистики Delta".
Кластеризация поддерживает следующие типы данных для кластеризации ключей.
- Дата
- Метка времени
- TimestampNTZ (требуется Databricks Runtime 14.3 LTS или более поздняя версия)
- Строка
- Целое число
- длинный
- Коротко
- Плавающий
- Двойной
- Десятичное число
- Байт
Если вы преобразуете существующую таблицу, рассмотрите следующие рекомендации:
Текущий метод оптимизации данных | Рекомендация по кластеризации ключей |
---|---|
Секционирование в стиле Hive | Используйте столбцы секций в качестве ключей кластеризации. |
Индексирование Z-порядка |
ZORDER BY Используйте столбцы в качестве ключей кластеризации. |
Секционирование в стиле Hive и порядок Z | Используйте как столбцы секционирования, так и ZORDER BY столбцы в качестве ключей кластеризации. |
Созданные столбцы для уменьшения кратности (например, дата для метки времени) | Используйте исходный столбец в качестве ключа кластеризации и не создавайте созданный столбец. |
Запись данных в кластеризованную таблицу
Необходимо использовать клиент записи Delta, поддерживающий все функции таблицы протоколов разностной записи, используемые при кластеризации жидкости. В Azure Databricks необходимо использовать Databricks Runtime 13.3 LTS и выше.
Операции, которые концентрируются при записи, включают следующие:
-
INSERT INTO
операции - Выражения
CTAS
иRTAS
-
COPY INTO
из формата Parquet spark.write.mode("append")
Структурированная потоковая обработка данных никогда не активирует кластеризацию данных при записи. Применяются дополнительные ограничения. См. Ограничения.
Кластеризация при записи активируется только в том случае, если данные в транзакции соответствуют пороговой величине. Эти пороговые значения зависят от количества столбцов кластеризации и ниже для управляемых таблиц каталога Unity, чем другие таблицы Delta.
Количество столбцов кластеризации | Пороговое значение для управляемых таблиц каталога Unity | Пороговый размер для других таблиц Delta |
---|---|---|
1 | 64 МБ | 256 МБ |
2 | 256 МБ | 1 ГБ |
3 | 512 МБ | 2 ГБ |
4 | 1 ГБ | 4 ГБ |
Поскольку не все операции применяют динамическую кластеризацию, Databricks рекомендует часто выполнять OPTIMIZE
, чтобы обеспечить эффективную кластеризацию всех данных.
Как запустить кластеризацию
Прогнозная оптимизация автоматически выполняет OPTIMIZE
команды для включенных таблиц. См. статью Прогнозная оптимизация для управляемых таблиц в Unity Catalog.
Чтобы активировать кластеризацию, необходимо использовать Databricks Runtime 13.3 LTS или более поздней версии.
OPTIMIZE
Используйте команду в таблице, как показано в следующем примере:
OPTIMIZE table_name;
Жидкая кластеризация является инкрементной, что означает, что данные перезаписываются только при необходимости для учёта данных, которые должны быть кластеризованы. Файлы данных с ключами кластеризации, которые не соответствуют данным, которые должны быть кластеризованы, не перезаписываются.
Для повышения производительности Databricks рекомендует планировать регулярные OPTIMIZE
задания для кластеризации данных. Для таблиц с большим количеством обновлений или вставок Databricks рекомендует планировать OPTIMIZE
задание каждые один или два часа. Так как кластеризация жидкости увеличивается, большинство OPTIMIZE
заданий для кластеризованных таблиц выполняются быстро.
Принудительная перекластеризация для всех записей
В Databricks Runtime 16.0 и более поздних версиях можно принудительно выполнить повторение всех записей в таблице со следующим синтаксисом:
OPTIMIZE table_name FULL;
Внимание
При необходимости выполнение OPTIMIZE FULL
перекластеризует все существующие данные. Для больших таблиц, которые ранее не были кластеризованы по указанным ключам, эта операция может занять несколько часов.
Запустите OPTIMIZE FULL
при первом включении кластеризации или изменении ключей кластеризации. Если вы ранее выполнили OPTIMIZE FULL
и не было изменений в ключах кластеризации, OPTIMIZE FULL
выполняется так же, как и OPTIMIZE
. Всегда используйте OPTIMIZE FULL
, чтобы убедиться, что макет данных отражает текущие ключи кластеризации.
Чтение данных из кластеризованной таблицы
Данные в кластеризованной таблице можно считывать с помощью любого клиента Delta Lake, поддерживающего чтение векторов удаления. Для получения наилучших результатов запроса включите ключи кластеризации в фильтры запросов, как показано в следующем примере:
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
Изменение ключей кластеризации
Ключи кластеризации для таблицы можно изменить в любое время, выполнив ALTER TABLE
команду, как показано в следующем примере:
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
При изменении ключей кластеризации последующие OPTIMIZE
операции и операции записи используют новый подход кластеризации, но существующие данные не перезаписываются.
Кроме того, можно отключить кластеризацию, задав ключи NONE
следующим образом:
ALTER TABLE table_name CLUSTER BY NONE;
Настройка ключей NONE
кластера не перезаписывает данные, которые уже кластеризованы, но не позволяет будущим OPTIMIZE
операциям использовать ключи кластеризации.
Узнайте, как кластеризована таблица
Вы можете использовать команды DESCRIBE
для просмотра ключей кластеризации таблицы, как в следующих примерах:
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
Совместимость таблиц с гибкой кластеризацией
Таблицы, созданные с помощью технологической кластеризации в Databricks Runtime 14.1 и выше, по умолчанию используют контрольные точки версии 2. Таблицы можно считывать и записывать с помощью контрольных точек версии 2 в Databricks Runtime 13.3 LTS и выше.
Вы можете отключить контрольные точки версии 2 и понизить протоколы таблиц для чтения таблиц с жидкой кластеризацией в Databricks Runtime 12.2 LTS и более поздних версиях. См. сведения о функциях таблицы Drop Delta.
Ограничения
Применяются следующие ограничения:
- В Databricks Runtime 15.1 и ниже кластеризация при записи данных не поддерживает запросы источника данных, включающие фильтры, объединения или агрегации.
- Нагрузки структурированного потокового режима не поддерживают кластеризацию данных при записи.
- В Databricks Runtime 15.4 LTS и ниже нельзя создать таблицу с поддержкой кластеризации жидкости с помощью записи структурированной потоковой передачи. Структурированная потоковая передача можно использовать для записи данных в существующую таблицу с включенным кластеризированием жидкости.