Потоковая обработка данных и инкрементальная загрузка
Azure Databricks использует Apache Spark Structured Streaming для поддержки многочисленных продуктов, связанных с нагрузками на приём данных, в том числе:
- Автозагрузчик
COPY INTO
- Конвейеры DLT
- Материализованные представления и таблицы потоковой передачи в Databricks SQL
В этой статье рассматриваются некоторые различия между семантикой потоковой и добавочной пакетной обработки и предоставляется общий обзор настройки рабочих нагрузок приема для требуемой семантики в Databricks.
В чем разница между потоковой передачей и добавочным приемом пакетов?
Возможные конфигурации рабочих процессов поступления варьируются от обработки в режиме, близком к реальному времени, до нерегулярной инкрементальной пакетной обработки. Оба шаблона используют структурированную потоковую передачу Apache Spark для добавочной обработки, но имеют другую семантику. Для простоты в этой статье упоминается почти реального времени загрузка как потоковая загрузка и менее частая инкрементная обработка как инкрементная пакетная загрузка.
Прием потоковой передачи
Потоковая передача в контексте извлечения данных и обновления таблиц относится к обработке данных практически в режиме реального времени, когда Azure Databricks извлекает записи из источника и передает их в приемник в микропакетах с использованием всегда доступной инфраструктуры. Рабочая нагрузка потоковой передачи непрерывно получает обновления из настроенных источников данных, если не происходит сбой, который останавливает получение.
Инкрементная загрузка пакетов
Инкрементальная пакетная загрузка — это шаблон, в котором все новые записи, приходящие из источника данных, обрабатываются в кратковременной задаче. Пакетная инкрементная обработка часто выполняется по расписанию, но она также может запускаться вручную или на основе поступления файла.
Инкрементный пакетный прием данных отличается от пакетного приема тем, что он автоматически обнаруживает новые записи в источнике данных и игнорирует записи, которые уже были приняты.
Импорт данных с заданиями
Databricks Jobs позволяет вам оркестрировать рабочие процессы и планировать задачи, включающие записные книжки, библиотеки, конвейеры DLT и SQL-запросы Databricks.
Примечание.
Вы можете использовать все вычислительные типы и типы задач Azure Databricks для настройки добавочного приема пакетов. Поддержка потоковой передачи осуществляется только в производственной среде для классических вычислительных заданий и DLT.
Задания имеют два основных режима работы:
- Непрерывные задания автоматически повторяются при возникновении сбоя. Этот режим предназначен для приема потоковой передачи.
-
Запускаемые задания выполняют задачи при активации. Триггеры включают:
- Триггеры на основе времени, выполняющие задания по указанному расписанию.
- Триггеры на основе файлов, выполняющие задания, когда файлы оказываются в указанном месте.
- Другие триггеры, такие как вызовы REST API, выполнение команд интерфейса командной строки Azure Databricks или нажатие кнопки "Запустить сейчас " в пользовательском интерфейсе рабочей области.
Для добавочных пакетных рабочих нагрузок настройте задания с помощью AvailableNow
режима триггера следующим образом:
Python
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
Для потоковых рабочих нагрузок интервал триггера по умолчанию равен processingTime ="500ms"
. В следующем примере показано, как обрабатывать микропакет каждые 5 секунд:
Python
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
Scala
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
Внимание
Бессерверные задания не поддерживают Scala, непрерывный режим или временные интервалы триггеров для структурированной потоковой передачи. Используйте классические задания, если вам нужна семантика приема почти в режиме реального времени.
Прием с помощью DLT
Как и задания, конвейеры DLT могут выполняться в активированном или непрерывном режиме. Для обеспечения семантики потоковой передачи почти в реальном времени с потоковыми таблицами используйте непрерывный режим.
Используйте потоковые таблицы для настройки потоковой или добавочной пакетной загрузки данных из облачного хранилища объектов, Apache Kafka, Amazon Kinesis, Google Pub/Sub или Apache Pulsar.
Lakeflow Connect использует DLT для конфигурации потоков данных из подключенных систем. См. Lakeflow Connect.
Материализованные представления гарантируют семантику операций, эквивалентную пакетным рабочим нагрузкам, но могут оптимизировать множество операций, чтобы вычислить результаты постепенно. См. инкрементальное обновление материализованных представлений.
Загрузка с помощью Databricks SQL
Потоковые таблицы можно использовать для настройки инкрементальной пакетной обработки данных из облачного хранилища объектов, Apache Kafka, Amazon Kinesis, Google Pub/Sub или Apache Pulsar.
Материализованные представления можно использовать для настройки добавочной пакетной обработки из источников Delta. См. инкрементальное обновление для материализованных представлений.
COPY INTO
предоставляет знакомый синтаксис SQL для добавочной пакетной обработки файлов данных в облачном хранилище объектов. Поведение COPY INTO
похоже на шаблоны, поддерживаемые потоковыми таблицами для облачного хранилища объектов, но не все параметры по умолчанию эквивалентны для всех поддерживаемых форматов файлов.