Запустите первую структурированную потоковую задачу
В этой статье приведены примеры кода и объяснение основных понятий, необходимых для выполнения первых структурированных запросов потоковой передачи в Azure Databricks. Структурированная потоковая передача можно использовать для практически в реальном времени и добавочных рабочих нагрузок обработки.
Структурированная потоковая обработка — это одна из нескольких технологий, которые обеспечивают работу потоковых таблиц в DLT. Databricks рекомендует использовать DLT для всех новых рабочих нагрузок ETL, приема и структурированной потоковой передачи. См. Что такое DLT?.
Примечание.
Хотя DLT предоставляет немного измененный синтаксис для объявления таблиц потоковой передачи, общий синтаксис для настройки потоковых операций чтения и преобразований применяется ко всем вариантам использования потоковой передачи в Azure Databricks. DLT также упрощает потоковую передачу, управляя сведениями о состоянии, метаданными и многочисленными конфигурациями.
Использование автозагрузчика для чтения потоковых данных из хранилища объектов
В следующем примере демонстрируется загрузка данных JSON с помощью автозагрузчика, который используется cloudFiles
для обозначения формата и параметров. Этот schemaLocation
параметр включает вывод схемы и эволюцию. Вставьте следующий код в ячейку записной книжки Databricks и запустите ячейку для создания потокового кадра данных с именем raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Как и другие операции чтения в Azure Databricks, настройка потокового чтения фактически не загружает данные. Перед началом потока необходимо активировать действие в данных.
Примечание.
Вызов display()
на потоковом DataFrame запускает задание потоковой передачи. В большинстве случаев использования структурированной потоковой передачи действие, активирующее поток, должно записывать данные в хранилище. См. Замечания по производству для структурированной потоковой передачи.
Выполнение потокового преобразования
Структурированная потоковая передача поддерживает большинство преобразований, доступных в Azure Databricks и Spark SQL. Вы можете даже загружать модели MLflow как UDF и выполнять потоковые прогнозы в процессе преобразования.
В следующем примере кода выполняется простое преобразование для обогащения загруженных данных JSON дополнительной информацией с использованием функций Spark SQL.
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
В итоге transformed_df
содержатся инструкции по загрузке и преобразованию каждой записи по мере их поступления в источник данных.
Примечание.
Структурированная потоковая передача обрабатывает источники данных как несвязанные или бесконечные наборы данных. Таким образом, некоторые преобразования не поддерживаются в структурированных рабочих нагрузках потоковой передачи, так как для их сортировки требуется бесконечное количество элементов.
Большинство агрегатов и многих соединений требуют управления сведениями о состоянии с подложками, окнами и режимом вывода. См. "Применение водяных знаков для управления порогами обработки данных".
Выполните инкрементальную пакетную запись в Delta Lake
В следующем примере выполняется запись в Delta Lake, используя указанный путь к файлу и контрольную точку.
Внимание
Обязательно укажите уникальное расположение контрольной точки для каждого настраиваемого модуля потоковой передачи. Контрольная точка предоставляет уникальный идентификатор для вашего потока, отслеживая все обработанные записи и информацию о состоянии, связанную с вашим потоковым запросом.
Параметр availableNow
триггера указывает структурированной потоковой передаче обработать все ранее пропущенные записи из исходного набора данных и затем завершить работу, чтобы можно было безопасно выполнить следующий код, не опасаясь, что поток останется запущенным.
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
В этом примере новые записи не поступают в наш источник данных, поэтому повторное выполнение этого кода не включает новые записи.
Предупреждение
Структурированное выполнение потоковой передачи может предотвратить автоматическое завершение работы вычислительных ресурсов. Чтобы избежать непредвиденных затрат, обязательно завершите потоковые запросы.
Чтение данных из Delta Lake, преобразование и запись в Delta Lake
Delta Lake имеет обширную поддержку работы со структурированной потоковой передачей как источником, так и приемником. См. потоковые чтения и записи таблиц Delta.
В следующем примере показан синтаксис для добавочной загрузки всех новых записей из таблицы Delta, объединения их с моментальным снимком другой таблицы Delta и их записи в таблицу Delta.
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
У вас должны быть соответствующие разрешения, которые позволяют читать исходные таблицы, записывать в целевые таблицы и работать с указанным расположением контрольной точки. Заполните все параметры, обозначаемые угловыми скобками (<>
) с помощью соответствующих значений для источников данных и приемников.
Примечание.
DLT предоставляет полностью декларативный синтаксис для создания конвейеров Delta Lake и управляет свойствами, такими как триггеры и контрольные точки автоматически. См. Что такое DLT?.
Чтение данных из Kafka, преобразование и запись в Kafka
Apache Kafka и другие системы обмена сообщениями предоставляют одну из самых низких задержек, доступных для больших наборов данных. Azure Databricks можно использовать для применения преобразований к данным, полученным из Kafka, а затем записывать данные обратно в Kafka.
Примечание.
Запись данных в облачное хранилище объектов добавляет дополнительные затраты на задержку. Если вы хотите хранить данные из шины обмена сообщениями в Delta Lake, но требуется наименьшая задержка для потоковых рабочих нагрузок, Databricks рекомендует настроить отдельные задания потоковой передачи для приема данных в lakehouse и применить почти в режиме реального времени преобразования для приемников нисходящей шины обмена сообщениями.
В следующем примере кода показан простой шаблон для обогащения данных из Kafka путем объединения их с данными в таблице Delta, а затем записи обратно в Kafka.
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
У вас должны быть соответствующие разрешения, настроенные для доступа к службе Kafka. Заполните все параметры, обозначаемые угловыми скобками (<>
) с помощью соответствующих значений для источников данных и приемников. См. сведения о потоковой обработке с помощью Apache Kafka и Azure Databricks.