Поделиться через


Источник файла хранилища BLOB-объектов Azure с хранилищем очередей Azure (устаревшая версия)

Внимание

Поддержка этой документации прекращена, она может больше не обновляться. Продукты, службы или технологии, упомянутые в этом контенте, больше не поддерживаются. См. статью об автозагрузчике.

Соединитель ABS-AQS предоставляет оптимизированный источник файлов, который использует Хранилище очередей Azure (AQS) для поиска новых файлов, записанных в контейнер Хранилища BLOB-объектов Azure (ABS), без многократного вывода всех файлов. Это дает два преимущества:

  • Меньшая задержка: нет необходимости list вложенных структур каталогов на ABS, что медленно и требует много ресурсов.
  • Более низкие затраты: больше не выполняются дорогостоящие запросы API LIST в ABS.

Примечание.

Источник ABS-AQS удаляет сообщения из очереди AQS по мере использования событий. Если вы хотите, чтобы другие конвейеры потребляли сообщения из этой очереди, set отдельной очереди AQS для оптимизированного средства чтения. Вы можете set несколько подписок сетки событий для публикации в разных очередях.

Использование источника файлов ABS-AQS

Чтобы использовать источник файлов ABS-AQS, необходимо выполнить следующие действия:

  • Set уведомления о событиях ABS, используя подписки сетки событий Azure и перенаправив их в AQS. См. Реагирование на события хранилища BLOB-объектов.

  • Укажите параметры fileFormat, queueUrl и schema. Например:

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

Аутентификация с помощью Хранилища очередей Azure и хранилища Blob-объектов

Для аутентификации с помощью Хранилища очередей Azure и хранилища Blob-объектов используйте маркеры подписанного URL-адреса или ключи учетной записи хранения. Необходимо указать строку подключения для учетной записи хранения, where очередь развертывается, которая содержит маркер SAS или ключи доступа к учетной записи хранения. Дополнительные сведения см. в статье Настройка строк подключения службы хранилища Azure.

Кроме того, вам потребуется предоставить доступ к контейнерам хранилища BLOB-объектов Azure. Сведения о настройке доступа к контейнеру хранилища BLOB-объектов Azure см. в статье "Подключение к Azure Data Lake Storage 2-го поколения и хранилищу BLOB-объектов".

Примечание.

Databricks настоятельно рекомендует использовать секреты управления для предоставления строка подключения.

Настройка

Вариант Тип По умолчанию. Description
allowOverwrites Логический true Следует ли повторно обрабатывать BLOB-объект, который перезаписывается.
connectionString Строка Нет (обязательный параметр) Строк подключения для доступа к очереди.
fetchParallelism Целое 1 Число потоков, используемых для получения сообщений из службы очередей.
fileFormat Строка Нет (обязательный параметр) Формат файлов, такой как parquet, json, csv, text и т. д.
ignoreFileDeletion Логический false Если у вас есть конфигурации жизненного цикла или вы удаляете исходные файлы вручную, необходимо set этот параметр, чтобы обеспечить true.
maxFileAge Целое 604800 Определяет, сколько времени (в секундах) уведомления о файлах хранятся в состоянии, чтобы предотвратить повторную обработку.
pathRewrites Строка JSON. "{}" При использовании точек подключения можно переписать префикс пути container@storageAccount/key с точкой подключения. Перезаписывать можно только префиксы. Например, для конфигурации {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"} путь wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json переписывается в
dbfs:/mnt/data-warehouse/2017/08/fileA.json.
queueFetchInterval Строка длительности, например, 2m в течение 2 минут. "5s" Время ожидания между выборками, если очередь пуста. Расходы Azure за запрос API к AQS. Таким образом, если данные не поступают часто, это значение может быть установлено на set для длительного времени. Пока очередь не пуста, мы будем получать данные непрерывно. Если новые файлы создаются каждые 5 минут, возможно, стоит set увеличить queueFetchInterval, чтобы сократить затраты на AQS.
queueName Строка Нет (обязательный параметр) Имя очереди AQS.

Если вы наблюдаете много сообщений в журналах драйверов, которые выглядят как Fetched 0 new events and 3 old events., where, как правило, вы наблюдаете гораздо больше старых событий, чем новые, следует уменьшить интервал триггера потока.

Если вы используете файлы из расположения в хранилище BLOB-объектов where вы ожидаете, что некоторые файлы могут быть удалены до их обработки, вы можете set следующую конфигурацию, чтобы игнорировать ошибку и продолжить обработку:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Вопросы и ответы

Если для параметра ignoreFileDeletion установлено значение False (ложь) (по умолчанию), и объект был удален, будет ли он завершаться сбоем всего конвейера?

Да, если мы получаем событие, сообщающее, что файл был удален, он завершится ошибкой всего конвейера.

Как setmaxFileAgeсделать?

Хранилище очередей Azure обеспечивает по крайней мере семантику доставки сообщений, поэтому необходимо оставаться в состоянии удаления дубликатов. Значение по умолчанию для параметра maxFileAge равно 7 дням, что соответствует максимальному значению TTL сообщения в очереди.