Поделиться через


Как построить простые, эффективные и с низкой задержкой конвейеры данных

Сегодняшние предприятия, основанные на данных, постоянно создают данные, что требует конвейеров обработки данных, которые постоянно поглощают и преобразуют эти данные. Эти конвейеры должны иметь возможность обрабатывать и доставлять данные ровно один раз, создавать результаты с задержками менее 200 миллисекунд и всегда пытаться свести к минимуму затраты.

В этой статье описываются подходы к пакетной и инкрементальной обработке потоков для создания конвейеров инженерных данных, почему инкрементальная обработка потоков является лучшим вариантом, а также следующие шаги для начала работы с предложениями инкрементальной потоковой обработки от Databricks, Потоковая передача в Azure Databricks и Что такое DLT?. Эти функции позволяют быстро записывать и запускать конвейеры, гарантирующие семантику доставки, задержку, затраты и многое другое.

Ловушки повторяющихся пакетных заданий

При настройке конвейера данных можно сначала записывать повторяющиеся пакетные задания для приема данных. Например, каждый час можно запускать задание Spark, которое считывает данные из источника и записывает их в приемник, например, Delta Lake. Задача с этим подходом заключается в постепенной обработке вашего источника, потому что задание Spark, которое выполняется каждый час, должно начинаться там, где закончилось последнее. Вы можете записать самую последнюю метку времени обработанных данных, а затем выбрать все строки с метками времени более поздними, чем эта, но есть подводные камни:

Чтобы запустить конвейер непрерывных данных, можно попытаться запланировать почасовое пакетное задание, которое последовательно считывает данные из вашего источника, выполняет преобразования и записывает результат в место назначения, например, Delta Lake. Такой подход может иметь недостатки:

  • Задание Spark, которое запрашивает все новые данные после метки времени, будет пропускать поздние данные.
  • Задание Spark, которое завершается ошибкой, может привести к нарушению гарантий единственного выполнения, если обработка не будет выполнена тщательно.
  • Задание Spark, которое перечисляет содержимое облачных хранилищ, чтобы найти новые файлы, станет дорогостоящим.

Затем необходимо повторно преобразовать эти данные. Вы можете создавать повторяющиеся пакетные задания, которые затем агрегируют данные или применяют другие операции, что усложняет и снижает эффективность конвейера.

Пример партии

Чтобы полностью понять недостатки пакетной загрузки и преобразования для конвейера обработки данных, рассмотрим следующие примеры.

Пропущенные данные

Учитывая топик Kafka с данными об использовании, которые определяют, сколько взимается с клиентов, и конвейер производит прием в пакетах, последовательность событий может выглядеть следующим образом:

  1. Первый пакет содержит две записи в 8 утра и 8:30 утра.
  2. Вы обновляете последнюю метку времени на 8:30 утра.
  3. Вы получите еще одну запись в 8:15 утра.
  4. Второй пакетный запрос для всего после 8:30 утра, так что вы пропустите запись в 8:15 утра.

Кроме того, вы не хотите перевысить стоимость или недовзять плату с пользователей, поэтому необходимо убедиться, что вы обрабатываете каждую запись ровно один раз.

избыточная обработка

Затем предположим, что данные содержат строки покупок пользователей, и вы хотите агрегировать продажи в час, чтобы вы знали самое популярное время в магазине. Если покупки в течение одного часа прибывают в разных пакетах, у вас будет несколько пакетов, которые создают выходные данные в течение одного часа:

Пример пакетной загрузки

Является ли интервал времени с 8 утра до 9 утра содержащим два элемента (выходные данные пакета 1), один элемент (выходные данные пакета 2) или три (выходные данные ни от одного из пакетов)? Данные, необходимые для создания заданного окна времени, распределены по нескольким пакетам обработки. Чтобы устранить эту проблему, можно секционировать данные по дням и повторно обработать всю секцию, когда необходимо вычислить результат. Затем можно перезаписать результаты в приемнике:

Пример пакетной загрузки

Однако это происходит за счет задержки и затрат, так как второй пакет должен выполнять ненужные действия по обработке данных, которые, возможно, уже обработаны.

Отсутствие подводных камней при инкрементальной обработке потока

Добавочная обработка потоков позволяет избежать всех ошибок повторяющихся пакетных заданий для приема и преобразования данных. Databricks и структура потоковой передачи, а также DLT управляют сложностями реализации потоковой передачи, позволяя вам сосредоточиться на бизнес-логике. Необходимо указать только источник подключения, какие преобразования следует выполнить с данными и где записать результат.

Инкрементальное поглощение

Пошаговая загрузка данных в Databricks осуществляется с помощью структурированной потоковой обработки данных Apache Spark, которая может пошагово получать источник данных и записывать его в хранилище. Механизм структурированной потоковой передачи может потреблять данные в точности один раз, и механизм может обрабатывать неупорядоченные данные. Движок можно запускать в блокнотах или с использованием потоковых таблиц в DLT.

Подсистема структурированной потоковой передачи в Databricks предоставляет собственные источники потоковой передачи, такие как автозагрузчик, который может постепенно обрабатывать облачные файлы экономически эффективным способом. Databricks также предоставляет коннекторы для других популярных шин сообщений, таких как Apache Kafka, Amazon Kinesis, Apache Pulsarи Google Pub/Sub.

Пошаговое преобразование

Инкрементальное преобразование в Databricks со структурированной потоковой передачей позволяет задавать преобразования для DataFrame с тем же API, что и для пакетных запросов, при этом автоматически отслеживая данные между пакетами и агрегированные значения со временем, чтобы пользователю не приходилось делать это вручную. Он никогда не должен повторно обрабатывать данные, поэтому он быстрее и эффективнее, чем повторяющиеся пакетные задания. Структурированная потоковая передача создает поток данных, который можно добавлять к приемнику, например, Delta Lake, Kafka или любой другой поддерживаемый коннектор.

Материализованные представления в DLT поддерживаются движком Enzyme. Фермент по-прежнему постепенно обрабатывает источник, но вместо производства потока он создает материализованное представление, которое является предварительно вычисленной таблицей, которая хранит результаты запроса, предоставляемого вами. Фермент может эффективно определить, как новые данные влияют на результаты запроса, и он сохраняет предварительно вычисляемую таблицу up-to-date.

Материализованные представления создают представление на основе вашего агрегата, которое всегда эффективно обновляется, так что, например, в описанном выше сценарии вы всегда будете знать, что окно времени с 8:00 до 9:00 содержит три элемента.

Структурированная потоковая передача или DLT?

Существенное различие между структурированной потоковой передачей и DLT — это способ, в котором выполняется выполнение запросов потоковой передачи. В структурированной потоковой передаче вы вручную указываете множество конфигураций, и вам нужно вручную объединять запросы. Необходимо явно запускать запросы, дожидаясь их завершения, отменять их при сбоях и выполнять другие действия. В DLT вы декларативно передаете вашей конвейеры для выполнения, и система поддерживает их выполнение.

DLT также имеет такие возможности, как материализованные представления, которые эффективно и поэтапно предварительно вычисляют трансформации данных.

Дополнительную информацию об этих функциях см. в стриминге на Azure Databricks и Что такое DLT?.

Дальнейшие действия