Поделиться через


Рекомендации по структурированной потоковой передаче в рабочей среде

В этой статье содержатся рекомендации по планированию структурированных рабочих нагрузок потоковой передачи с помощью заданий в Azure Databricks.

Databricks рекомендует всегда делать следующее:

  • Удалите ненужный код из записных книжек, который возвращает результаты, такие как display и count.
  • Не запускайте структурированные рабочие нагрузки потоковой передачи на универсальных вычислительных ресурсах. Всегда планировать потоки в качестве заданий с помощью вычислений заданий.
  • Планирование заданий с помощью Continuous режима.
  • Не включите автоматическое масштабирование для вычислений для структурированных заданий потоковой передачи.

Некоторые рабочие нагрузки пользуются следующими преимуществами:

Azure Databricks представила DLT для уменьшения сложности управления рабочей инфраструктурой для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать DLT для новых структурированных конвейеров потоковой передачи. См. Что такое DLT?.

Примечание.

Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать DLT с расширенным автомасштабированием для потоковых рабочих нагрузок. См. раздел Оптимизация использования кластеров потоков DLT с улучшенным автомасштабированием.

Проектирование нагрузок стриминга с расчетом на сбои

Databricks рекомендует всегда настраивать задания потоковой передачи для автоматического перезапуска при сбое. Некоторые функции, включая эволюцию схемы, предполагают, что структурированные рабочие нагрузки потоковой передачи настроены на автоматический повтор попыток. См. настройку структурированных потоковых заданий для перезапуска потоковых запросов при сбое.

Некоторые операции, такие как foreachBatch, предоставляют гарантию выполнения по крайней мере один раз, а не точно один раз. Для этих операций следует обеспечить, чтобы конвейер обработки был идемпотентным. См. раздел Использование foreachBatch для записи в произвольные приемники данных.

Примечание.

При перезапуске запроса обрабатывается микропакет, запланированный во время предыдущего выполнения. Если задание завершилось сбоем из-за ошибки нехватки памяти или вы вручную отменили задание из-за слишком большого микро-пакета, может потребоваться увеличить масштаб вычислений, чтобы успешно обработать микро-пакет.

При изменении конфигураций между запусками эти конфигурации применяются к первому запланированному пакету. См. раздел «Восстановление после изменений в запросе структурированного потокового вещания».

Когда происходит повторная попытка задания?

Вы можете запланировать несколько задач в рамках задания Azure Databricks. При настройке задания с помощью непрерывного триггера нельзя задать зависимости между задачами.

Можно запланировать несколько потоков в одном задании, используя один из следующих подходов:

  • Несколько задач: Определите задание с несколькими задачами, выполняющими рабочие нагрузки потоковой передачи с помощью непрерывного триггера.
  • Несколько запросов: определение нескольких потоковых запросов в исходном коде для одной задачи.

Вы также можете объединить эти стратегии. В следующей таблице сравниваются эти подходы.

Множественные задачи Несколько запросов
Как используется общий доступ к вычислительным ресурсам? Databricks рекомендует развертывать вычислительные ресурсы, соответствующие каждой задаче потоковой обработки. Вы также можете совместно использовать вычислительные ресурсы между задачами. Все запросы используют одинаковые вычислительные ресурсы. Можно по желанию назначать запросы пулам планировщика.
Как обрабатываются повторные попытки? Все задачи должны завершиться сбоем перед повторными попытками выполнения задания. Задача повторяется, если любой запрос завершается ошибкой.

Настройка заданий структурированной потоковой передачи для перезапуска запросов потоковой передачи при сбое

Databricks рекомендует настраивать все потоковые рабочие нагрузки с помощью непрерывного триггера. См . статью "Непрерывное выполнение заданий".

Непрерывный триггер обеспечивает следующее поведение по умолчанию:

  • Запрещает несколько одновременных запусков задания.
  • Запускает новый запуск при сбое предыдущего запуска.
  • Использует экспоненциальную задержку для повторных попыток.

Databricks рекомендует всегда использовать вычислительные ресурсы для заданий вместо универсальных вычислений при планировании рабочих процессов. При сбое задания и повторных попытках развертываются новые вычислительные ресурсы.

Примечание.

Вам не нужно использовать streamingQuery.awaitTermination() или spark.streams.awaitAnyTermination(). Задания автоматически не допускают завершения выполнения, пока активен потоковый запрос.

Использование пулов планировщика для нескольких потоковых запросов

Пулы расписаний можно настроить для назначения вычислительной емкости запросам при выполнении нескольких потоковых запросов из одного исходного кода.

По умолчанию все запросы, запущенные в ноутбуке, выполняются в том же пуле планирования. Задания Apache Spark, созданные триггерами из всех потоковых запросов в записной книжке, выполняются последовательно в порядке FIFO. Это может привести к ненужным задержкам в запросах, так как они не обеспечивают эффективное совместное использование кластерных ресурсов.

Пулы планировщиков позволяют объявлять, какие запросы структурированной потоковой передачи совместно используют вычислительные ресурсы.

В следующем примере query1 назначается выделенному пулу, в то время как query2 и query3 используют совместный пул планировщика.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Примечание.

Конфигурация локального свойства должна находиться в той же ячейке записной книжки, где запускается запрос потоковой передачи.

Дополнительные сведения см. в документации по планировщику Fair Scheduler Apache.