Поделиться через


Загрузка данных с помощью таблиц потоковой передачи в Databricks SQL

Databricks рекомендует использовать потоковые таблицы для приема данных с помощью Databricks SQL. Потоковая таблица — это таблица , зарегистрированная в каталоге Unity с дополнительной поддержкой потоковой или добавочной обработки данных. Конвейер DLT автоматически создается для каждой потоковой таблицы. Таблицы потоковой передачи можно использовать для добавочной загрузки данных из Kafka и облачного хранилища объектов.

В этой статье показано использование потоковых таблиц для загрузки данных из облачного хранилища объектов, настроенного в качестве тома каталога Unity (рекомендуется) или внешнего местоположения.

Примечание.

Чтобы узнать, как использовать таблицы Delta Lake в качестве источников и приемников потоков, см. раздел Чтение и запись потоков в таблицы Delta.

Внимание

Потоковые таблицы, созданные в Databricks SQL, поддерживаются бессерверным конвейером DLT. Рабочая область должна поддерживать бессерверные конвейеры для использования этой функции.

Перед началом работы

Перед началом работы необходимо выполнить следующие требования.

Требования к рабочей области:

Требования к вычислениям:

Необходимо использовать один из следующих вариантов:

  • Хранилище SQL, использующее Current канал.
  • Вычисление со стандартным режимом доступа (ранее общим режимом доступа) в Databricks Runtime 13.3 LTS или более поздней версии.
  • Вычисление с выделенным режимом доступа (ранее режимом доступа с одним пользователем) в Databricks Runtime 15.4 LTS или более поздней версии.

    В Databricks Runtime 15.3 и ниже нельзя использовать выделенные вычислительные ресурсы для запроса таблиц потоковой передачи, принадлежащих другим пользователям. Вы можете использовать выделенные вычисления в среде выполнения Databricks версии 15.3 и ниже, только если вы владеете потоковой таблицей. Создатель таблицы - владелец.

    Databricks Runtime 15.4 LTS и более поздние версии поддерживают запросы к таблицам, созданным с помощью DLT, на выделенных вычислительных ресурсах, независимо от владения таблицами. Чтобы воспользоваться преимуществами фильтрации данных, предоставляемых в Databricks Runtime 15.4 LTS и более поздних версиях, необходимо убедиться, что рабочей области включен для бессерверных вычислительных, так как функции фильтрации данных, поддерживающие созданные DLT таблицы, выполняются на бессерверных вычислениях. Вы можете взимать плату за бессерверные вычислительные ресурсы при использовании выделенных вычислений для выполнения операций фильтрации данных. См. детализированное управление доступом для выделенных ресурсов (ранее однопользовательских вычислительных ресурсов).

Требования к разрешениям:

Другие требования:

  • Путь к исходным данным.

    Пример пути тома: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Пример пути к внешнему расположению: abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Примечание.

    В этой статье предполагается, что данные, которые вы хотите загрузить, хранятся в облачном хранилище, связанном с томом каталога Unity или внешним расположением, к которому у вас есть доступ.

Обнаружение и предварительный просмотр исходных данных

  1. На боковой панели рабочей области щелкните "Запросы" и нажмите кнопку "Создать запрос".

  2. В редакторе запросов выберите хранилище SQL, использующее Current канал из раскрывающегося списка.

  3. Вставьте приведенные ниже значения в редактор, заменив значения в угловых скобках (<>) для сведений, определяющих исходные данные, и нажмите кнопку "Выполнить".

    Примечание.

    При запуске табличной функции read_files могут возникнуть ошибки определения схемы, если значения по умолчанию для функции не могут обработать ваши данные. Например, может потребоваться настроить многострочный режим для файлов CSV или JSON. Список параметров синтаксического анализа см. в табличной функции read_files.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Загрузка данных в потоковую таблицу

Чтобы создать потоковую таблицу из данных в облачном хранилище объектов, вставьте следующую команду в редактор запросов и нажмите кнопку "Выполнить".

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Настройка канала среды выполнения

Потоковые таблицы, созданные с помощью хранилищ SQL, автоматически обновляются с помощью конвейера DLT. Конвейеры DLT используют среду выполнения в канале current по умолчанию. Ознакомьтесь с заметками о выпуске DLT и процессом обновления, чтобы узнать о ходе выпуска.

Databricks рекомендует использовать канал current для производственных рабочих нагрузок. Новые функции сначала выпускаются в preview канале. Конвейер можно задать для канала DLT предварительной версии, чтобы протестировать новые функции, указав preview в качестве свойства таблицы. Это свойство можно указать при создании таблицы или после создания таблицы с помощью инструкции ALTER.

В следующем примере кода показано, как настроить канал для предварительного просмотра в инструкции CREATE:

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

Обновите потоковую таблицу с помощью конвейера DLT

В этом разделе описываются шаблоны обновления таблицы потоковой передачи с последними данными, доступными из источников, определенных в запросе.

Процесс обновления потоковой таблицы при CREATE или REFRESH осуществляется с помощью бессерверного конвейера DLT. Каждая определяемая таблица потоковой передачи имеет связанный конвейер DLT.

Когда вы выполняете команду REFRESH, появляется ссылка на конвейер DLT. Чтобы проверить состояние обновления, можно использовать ссылку конвейера DLT.

Примечание.

Только владелец таблицы может обновить потоковую таблицу, чтобы получить последние данные. Пользователь, создающий таблицу, является владельцем, и владелец не может быть изменен. Вам может потребоваться обновить таблицу потоковой передачи перед использованием запросов перемещения по времени.

См. Что такое DLT?.

Загружайте только новые данные

По умолчанию функция read_files считывает все существующие данные в исходном каталоге во время создания таблицы, а затем обрабатывает вновь поступающие записи с каждым обновлением.

Чтобы избежать приема данных, которые уже существуют в исходном каталоге во время создания таблицы, задайте параметр includeExistingFiles для false. Это означает, что только данные, поступающие в каталог после создания таблицы, обрабатываются. Например:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Полное обновление таблицы потоковой передачи

Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в источниках, которые не хранят всю историю данных или имеют короткие периоды хранения, например, Kafka, так как полное обновление усекает существующие данные. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.

Например:

REFRESH STREAMING TABLE my_bronze_table FULL

Настройка расписания потоковой таблицы для автоматического обновления

Чтобы настроить потоковую таблицу для автоматического обновления на основе определенного расписания, вставьте следующее в редактор запросов и нажмите кнопку "Выполнить".

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Например, запросы расписания обновления см. в ALTER STREAMING TABLE.

Отслеживание состояния обновления

Вы можете увидеть состояние обновления потоковой таблицы, просмотрев поток, который управляет потоковой таблицей в интерфейсе DLT, или просмотрев информацию обновления, возвращаемую командой DESCRIBE EXTENDED для потоковой таблицы.

DESCRIBE EXTENDED <table-name>

Потоковая обработка данных из Kafka

Пример приема потоковой передачи из Kafka см. в read_kafka.

Предоставление пользователям доступа к потоковой таблице

Чтобы предоставить пользователям привилегии SELECT в таблице потоковой передачи, чтобы они могли запросить ее, вставьте следующее в редактор запросов и нажмите кнопку "Выполнить".

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Дополнительные сведения о предоставлении привилегий для защищаемых объектов каталога Unity см. в разделе "Права каталога Unity" и защищаемые объекты.

окончательное удаление записей из потоковой таблицы

Внимание

Поддержка инструкции REORG с потоковыми таблицами доступна в общедоступной предварительной версии.

Примечание.

  • Использование инструкции REORG с потоковой таблицей требует Databricks Runtime 15.4 и более поздних версий.
  • Хотя инструкцию REORG можно использовать с любой таблицей потоковой передачи, это необходимо только при удалении записей из таблицы потоковой передачи с включенными векторами удаления . Команда не действует при использовании со стриминговой таблицей, если векторы удаления не включены.

Чтобы физически удалить записи из базового хранилища для таблицы потоковой передачи с включенными векторами удаления, например для соответствия GDPR, необходимо выполнить дополнительные действия, чтобы обеспечить выполнение операции VACUUM на данных потоковой таблицы.

Ниже описаны более подробные инструкции.

  1. Обновите записи или удалите записи из таблицы потоковой передачи.
  2. Выполните выражение REORG над потоковой таблицей, указав параметр APPLY (PURGE). Например, REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. Дождитесь, пока истечет срок хранения данных в стриминговой таблице. Срок хранения данных по умолчанию составляет семь дней, но его можно настроить с помощью свойства таблицы delta.deletedFileRetentionDuration. См. Настройка сохранения данных для запросов по временному перемещению.
  4. REFRESH потоковая таблица. См. , как обновить потоковую таблицу с помощью конвейера DLT. В течение 24 часов после операции REFRESH задачи обслуживания DLT, включая операцию VACUUM, необходимую для окончательного удаления записей, выполняются автоматически. См. задачи обслуживания , выполняемые DLT.

Мониторинг запусков с помощью журнала запросов

Вы можете использовать страницу журнала запросов для доступа к сведениям о запросах и профилям запросов, которые помогут определить плохое выполнение запросов и узких мест в конвейере DLT, используемом для запуска обновлений потоковой таблицы. Общие сведения о типе информации, доступной в журналах запросов и профилях запросов, см. в разделе "Журнал запросов" и "Профиль запросов".

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии. Администраторы рабочей области могут включить эту функцию на странице "Предварительные версии". См. статью "Управление предварительными версиями Azure Databricks".

Все инструкции, связанные с таблицами потоковой передачи, отображаются в журнале запросов. Вы можете использовать фильтр раскрывающегося списка Statement для выбора команды и инспекции связанных запросов. За всеми операторами CREATE следует оператор REFRESH, который выполняется асинхронно в конвейере DLT. Инструкции REFRESH обычно включают подробные планы запросов, которые предоставляют аналитические сведения о оптимизации производительности.

Чтобы получить доступ к REFRESH операторам в интерфейсе истории запросов, выполните следующие действия.

  1. Щелкните значок истории в левой боковой панели, чтобы открыть пользовательский интерфейс истории запросов.
  2. Выберите флажок REFRESH, используя фильтр раскрывающегося списка заявления.
  3. Щелкните имя инструкции запроса, чтобы просмотреть сводные сведения, такие как длительность запроса и агрегированные метрики.
  4. Щелкните "Просмотреть профиль запроса", чтобы открыть профиль запроса. Для получения информации о навигации в профиле запроса см. Профиль запроса.
  5. При необходимости можно использовать ссылки в разделе "Источник запросов", чтобы открыть связанный запрос или конвейер.

Вы также можете получить доступ к сведениям о запросах с помощью ссылок в редакторе SQL или из записной книжки, подключенной к хранилищу SQL.

Дополнительные ресурсы