Использование потока изменений данных Delta Lake в Azure Databricks
Поток изменений данных позволяет Azure Databricks отслеживать изменения на уровне строк между версиями таблицы Delta. Когда эта функция включена в таблице Delta, среда выполнения регистрирует события изменений для всех данных, записываемых в таблицу. Сюда входят данные строк вместе с метаданными, указывающими, была ли соответствующая строка вставлена, удалена или обновлена.
Внимание
Поток данных изменений работает в тандеме с историей таблиц для предоставления сведений об изменениях. Так как клонирование таблицы Delta создает отдельную историю, поток данных изменений в клонированных таблицах не совпадает с потоком данных изменений исходной таблицы.
Инкрементная обработка данных об изменениях
Databricks рекомендует использовать поток изменений в сочетании со структурированной потоковой передачей для постепенной обработки изменений из таблиц Delta. Вы должны использовать структурированное потоковое вещание в Azure Databricks для автоматического отслеживания версий в канале изменений данных вашей таблицы.
Примечание.
DLT предоставляет функциональные возможности для простого распространения измененных данных и хранения результатов в виде SCD (медленно меняющегося измерения) типа 1 или типа 2 таблиц. См. API "Применение изменений": упрощение захвата изменений данных с использованием DLT.
Чтобы прочитать поток данных об изменениях из таблицы, необходимо включить поток данных об изменениях для этой таблицы. См. Включение потока изменений данных.
Установите параметр readChangeFeed
в true
при настройке потока для чтения канала данных изменений таблицы, как показано в следующем примере синтаксиса.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
По умолчанию поток возвращает последний моментальный снимок таблицы при первом запуске потока в качестве INSERT
и будущих изменений в виде измененных данных.
Изменения данных фиксируются в рамках транзакции Delta Lake и становятся доступными одновременно с новыми данными, фиксирующимися в таблице.
При необходимости можно указать начальную версию. См. статью " Следует ли указать начальную версию?".
Канал изменений данных также поддерживает пакетное выполнение, которое требует указания начальной версии. Ознакомьтесь с изменениями в пакетных запросах.
При считывании изменений также поддерживаются такие параметры, как ограничения скорости (maxFilesPerTrigger
, maxBytesPerTrigger
) и excludeRegex
.
Ограничение скорости может быть атомарной операцией для версий, отличных от начальной версии снимка. Таким образом, ко всему коммиту будет применено ограничение скорости, или будет возвращён весь коммит.
Следует ли указать начальную версию?
При необходимости можно указать начальную версию, если вы хотите игнорировать изменения, которые произошли до определенной версии. Можно указать версию с помощью метки времени или номера идентификатора версии, записанного в журнале транзакций Delta.
Примечание.
Начальная версия требуется для пакетных операций чтения, и многие пакетные шаблоны могут извлечь выгоду из настройки необязательной конечной версии.
При настройке рабочих нагрузок структурированной потоковой передачи с участием потока данных изменений важно понимать, как указание начальной версии влияет на обработку.
Многие рабочие нагрузки потоковой передачи, особенно новые конвейеры обработки данных, пользуются поведением по умолчанию. При поведении по умолчанию первый пакет обрабатывается, когда поток записывает сначала все существующие записи в таблице как INSERT
операции в поток изменений данных.
Если целевая таблица уже содержит все записи с соответствующими изменениями до определенной точки, укажите начальную версию, чтобы избежать обработки состояния исходной таблицы в виде INSERT
событий.
Следующий пример показывает синтаксис для восстановления после сбоя потоковой передачи, при котором была повреждена контрольная точка. В этом примере предполагается следующее:
- Канал изменений данных включен в исходной таблице при создании таблицы.
- Целевая конечная таблица обработала все изменения включительно до версии 75.
- Журнал версий исходной таблицы доступен для версий 70 и выше.
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
В этом примере также необходимо указать новое расположение контрольной точки.
Внимание
Если указать начальную версию, поток не может начинаться с новой контрольной точки, если начальная версия больше не присутствует в журнале таблиц. Delta Lake автоматически очищает исторические версии, что означает, что все указанные начальные версии в конечном итоге удаляются.
См. Можно ли использовать канал данных об изменениях для воспроизведения всей истории таблицы?.
Чтение изменений в пакетных запросах
С помощью синтаксиса пакетного запроса можно считывать все изменения, начиная с конкретной версии или считывать изменения в заданном диапазоне версий.
Версия указывается в виде целого числа, а отметка времени — в виде строки в формате yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
Начальные и конечные версии включены в запросы. Чтобы прочитать изменения из конкретной начальной версии до последней версии таблицы, укажите только начальную версию.
Если вы указываете версию или метку времени, которые старше, чем те, где записаны события изменений, то есть когда был включен канал изменений, возникает ошибка, указывающая, что канал изменений не был включен.
В следующих примерах синтаксиса показано использование параметров начальной и конечной версии с пакетными считываниями:
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Python
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Примечание.
По умолчанию, если пользователь передает версию или метку времени, превышающую последнюю фиксацию в таблице, возникает ошибка timestampGreaterThanLatestCommit
. В Databricks Runtime 11.3 LTS и более поздних версиях поток изменений данных может обрабатывать случай вне диапазона, если пользователь задаёт следующую конфигурацию: true
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Если в таблице указана начальная версия, превышающая версию последней фиксации, или метка времени, которая более новая, чем последняя фиксация в таблице, то при включении предыдущей конфигурации возвращается пустой результат чтения.
Если вы предоставите конечную версию, превышающую последнюю фиксацию в таблице или конечную метку времени, которая позже, чем последняя фиксация в таблице, то при включении предыдущей конфигурации в режиме пакетного чтения возвращаются все изменения между начальной версией и последней фиксацией.
Какова схема потока изменений данных?
При чтении схемы из канала измененных данных для таблицы используется схема последней версии таблицы.
Примечание.
Большинство операций изменения схемы и эволюции полностью поддерживаются. Таблица с включенным сопоставлением столбцов не поддерживает все варианты использования и демонстрирует другое поведение. См. ограничения канала изменений для таблиц, где включено сопоставление столбцов.
Помимо столбцов данных из схемы таблицы Delta, канал изменений содержит столбцы метаданных, определяющие тип события изменения:
Имя столбца | Тип | Значения |
---|---|---|
_change_type |
Строка |
insert , update_preimage , update_postimage , delete (1) |
_commit_version |
Длинный | Журнал Delta или версия таблицы, содержащая изменение. |
_commit_timestamp |
Метка времени | Метка времени, связанная с созданием коммита. |
(1)preimage
— это значение перед обновлением, postimage
это значение после обновления.
Примечание.
Невозможно включить поток изменения данных в таблице, если схема содержит столбцы с такими же именами, как у добавляемых столбцов. Переименуйте столбцы в таблице, чтобы устранить этот конфликт, прежде чем пытаться включить поток изменения данных.
Включение канала изменений данных
Вы можете читать только поток данных об изменениях для включенных таблиц. Чтобы сделать доступной опцию потока данных об изменениях, её необходимо явно включить одним из следующих методов.
Новая таблица: задайте свойство таблицы
delta.enableChangeDataFeed = true
в командеCREATE TABLE
.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Существующая таблица: задайте свойство таблицы
delta.enableChangeDataFeed = true
в командеALTER TABLE
.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Все новые таблицы:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Внимание
Записываются только изменения, внесенные после включения канала данных об изменениях. Прошлые изменения таблицы не записываются.
Изменить хранилище данных
Включение канала данных изменений приводит к небольшому увеличению затрат на хранение таблицы. Записи измененных данных создаются при выполнении запроса и, как правило, гораздо меньше общего размера перезаписанных файлов.
Azure Databricks фиксирует данные об изменениях для операций UPDATE
, DELETE
и MERGE
в папке _change_data
в каталоге таблицы. Некоторые операции, такие как операции только вставки и полное удаление разделов, не создают данных в каталоге _change_data
, так как Azure Databricks может эффективно вычислять канал изменения данных непосредственно из журнала транзакций.
Все операции чтения для файлов данных в папке _change_data
должны проходить через поддерживаемые API Delta Lake.
На файлы в папке _change_data
распространяется действие политики хранения таблицы. Поток изменений данных удаляется при запуске команды VACUUM
.
Можно ли использовать поток изменений данных для воспроизведения всей истории таблицы?
Поток данных изменений не предназначен для постоянного архива всех изменений в таблице. Поток данных об изменениях записывает только те изменения, которые происходят после его активации.
Поток данных изменений и Delta Lake позволяют всегда восстанавливать полный снимок исходной таблицы, что означает, что вы можете начать новый поток чтения из таблицы с включенным потоком данных изменений и зафиксировать текущую версию этой таблицы и все изменения, происходящие после.
Записи в канале данных изменения необходимо рассматривать как временные и доступные только в течение указанного периода хранения. Журнал транзакций Delta регулярно удаляет версии таблиц и соответствующие версии ленты данных изменений. Когда версия удаляется из журнала транзакций, вы больше не сможете прочитать поток данных изменений для этой версии.
Если вашему варианту использования требуется сохранить постоянную историю всех изменений в таблице, следует использовать добавочную логику для записи записей из канала изменений в новую таблицу. В следующем примере кода показано использование trigger.AvailableNow
, которое использует добавочную обработку структурированной потоковой передачи, но обрабатывает доступные данные как пакетную рабочую нагрузку. Эту рабочую нагрузку можно запланировать с выполнением асинхронно с вашими основными конвейерами обработки, чтобы создать резервную копию потока изменения данных для аудита или возможности повторного воспроизведения.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Изменение ограничений канала данных для таблиц с включенным сопоставлением столбцов
С включенным сопоставлением столбцов в таблице Delta можно удалить или переименовать столбцы в таблице без перезаписи файлов данных для существующих данных. При включенном сопоставлении столбцов поток изменений данных имеет ограничения после выполнения изменений схемы, не связанных с аддитивными изменениями, таких как переименование или удаление столбца, изменение типа данных или изменения, связанные с null-допустимостью.
Внимание
- Вы не можете считывать канал данных о изменениях для транзакции или диапазона, в которых происходит изменение схемы, не являющееся аддитивным, с помощью семантики пакетной обработки.
- В Databricks Runtime 12.2 LTS и ниже таблицы с включенным сопоставлением столбцов, которые испытали неаддитивные изменения схемы, не поддерживают потоковую обработку данных из канала изменений. См. Трансляция с сопоставлением столбцов и изменениями схемы.
- В Databricks Runtime 11.3 LTS и ниже невозможно считывать поток изменений данных для таблиц с включенным сопоставлением столбцов, которые подвергались переименованию или удалению столбцов.
В Databricks Runtime 12.2 LTS и более поздних версиях вы можете выполнять пакетное чтение канала данных изменений для таблиц с включенным сопоставлением столбцов, в которых произошли недобавочные изменения схемы. Вместо использования схемы последней версии таблицы операции чтения используют схему конечной версии таблицы, указанной в запросе. Запросы по-прежнему завершаются ошибкой, если указанный диапазон версий охватывает неаддитивное изменение схемы.