Мониторинг конвейеров DLT
В этой статье описывается использование встроенных функций мониторинга и наблюдаемости для конвейеров DLT. Такие функции поддерживают такие задачи, как:
- Наблюдение за ходом и состоянием обновлений в конвейере. См. Какие сведения о конвейере доступны в пользовательском интерфейсе?.
- Оповещение о конвейерных событиях, таких как успех или сбой обновлений конвейера. См. Добавьте уведомления по электронной почте для событий работы конвейера.
- Просмотр метрик для источников потоковой передачи, таких как Apache Kafka и автозагрузчик (общедоступная предварительная версия). См. просмотр метрик потоковой передачи.
- Извлечение подробной информации об обновлениях в конвейерах, таких как происхождение данных, метрики качества данных и использование ресурсов. См. Что такое журнал событий DLT?.
- Определение пользовательских действий, выполняемых при возникновении определенных событий. См. раздел Определение пользовательского мониторинга конвейеров DLT с помощью перехватчиков событий.
Чтобы проверить и диагностировать производительность запросов, см. историю запросов DLT-пайплайнов. Эта функция доступна в общедоступной предварительной версии.
Добавление уведомлений по электронной почте для событий конвейера
Вы можете настроить один или несколько адресов электронной почты для получения уведомлений, когда происходит следующее:
- Обновление конвейера успешно завершено.
- Обновление конвейера завершается сбоем либо с ошибкой, допускающей повторные попытки, либо с ошибкой, которую нельзя повторить. Выберите этот параметр, чтобы получить уведомление обо всех сбоях конвейера.
- Обновление конвейера завершается неустранимой (фатальной) ошибкой. Выберите этот параметр, чтобы получить уведомление только в том случае, если возникает ошибка без повторных попыток.
- Сбой одного потока данных.
Чтобы настроить уведомления по электронной почте при создании или изменении конвейера:
- Щелкните Добавить уведомление.
- Введите один или несколько адресов электронной почты для получения уведомлений.
- Установите флажок для каждого типа уведомления, чтобы отправить на настроенные адреса электронной почты.
- Щелкните Добавить уведомление.
Какие сведения о конвейере доступны в пользовательском интерфейсе?
График конвейера отображается сразу после успешного запуска обновления конвейера. Стрелки представляют зависимости между наборами данных в конвейере. По умолчанию на странице сведений о конвейере отображается последнее обновление таблицы, но в раскрывающемся меню можно выбрать старые обновления.
Сведения включают идентификатор конвейера, исходный код, затраты на вычисления, выпуск продукта и канал, настроенный для конвейера.
Чтобы просмотреть табличное представление наборов данных, перейдите на вкладку List. Представление List позволяет просматривать все наборы данных в вашем конвейере, представленные в виде строки в таблице, и полезно, если DAG конвейера слишком велик для визуализации в представлении Graph. Вы можете управлять наборами данных, отображаемыми в таблице, с помощью нескольких фильтров, таких как имя набора данных, тип и состояние. Чтобы вернуться к визуализации DAG, щелкните Graph.
Пользователь , запускающийся от имени, является владельцем конвейера, а обновления конвейера выполняются с разрешениями этого пользователя. Чтобы изменить пользователя run as
, сначала щелкните Разрешения, а затем измените владельца конвейера.
Как просмотреть сведения о наборе данных?
Щелкнув набор данных в графе конвейера или списке наборов данных, отображаются сведения о наборе данных. Сведения включают схему набора данных, метрики качества данных и ссылку на исходный код, определяющий набор данных.
История обновлений
Чтобы просмотреть журнал и состояние обновлений конвейера, щелкните раскрывающееся меню журнала обновлений в верхней строке.
Выберите обновление в раскрывающемся меню, чтобы увидеть его график, подробности и события. Чтобы вернуться к последнему обновлению, щелкните Показать последнее обновление.
Просмотр метрик потокового вещания
Важный
Наблюдаемость потоковой передачи для DLT доступна в общедоступной предварительной версии.
Метрики потоковой обработки данных можно просматривать из источников данных, поддерживаемых структурированной потоковой обработкой данных в Spark, таких как Apache Kafka, Amazon Kinesis, Auto Loader и Delta Tables, для каждого потока в вашем конвейере DLT. Метрики отображаются в виде диаграмм в правой области пользовательского интерфейса DLT и включают секунды невыполненной работы, байты невыполненной работы, записи невыполненной работы и файлы невыполненной работы. На диаграммах отображается максимальное значение, агрегированное по минуте, а подсказка отображает максимальные значения при наведении указателя мыши на диаграмму. Данные ограничены последними 48 часами с текущего времени.
Таблицы в вашем конвейере, для которых доступны стриминговые метрики, отображают значок при просмотре структуры DAG конвейера в графическом режиме в пользовательском интерфейсе. Чтобы просмотреть метрики потоковой передачи, щелкните значок
, чтобы отобразить диаграмму метрик потоковой передачи на вкладке потоков на правой панели. Вы также можете применить фильтр для просмотра только таблиц с метриками потоковой передачи, щелкнув List, а затем щелкнув Имеет метрики потоковой передачи.
Каждый источник потоковой передачи поддерживает только определенные метрики. Метрики, не поддерживаемые источником потоковой передачи, недоступны для просмотра в пользовательском интерфейсе. В следующей таблице показаны метрики, доступные для поддерживаемых источников потоковой передачи:
источник | невыполненные байты | записи невыполненной работы | секунды невыполненной работы | файлы невыполненной работы |
---|---|---|---|---|
Кафка | ✓ | ✓ | ||
кинетика | ✓ | ✓ | ||
Дельта | ✓ | ✓ | ||
Автозагрузчик | ✓ | ✓ | ||
Google Pub/Sub | ✓ | ✓ |
Что такое журнал событий DLT?
Журнал событий DLT содержит все сведения, связанные с конвейером, включая журналы аудита, проверки качества данных, ход выполнения конвейера и происхождения данных. Журнал событий можно использовать для отслеживания, понимания и мониторинга состояния конвейеров данных.
Записи журнала событий можно просматривать в пользовательском интерфейсе DLT, API DLT или напрямую запрашивать журнал событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.
Можно также определить пользовательские действия для выполнения при регистрации событий, например, отправки оповещений, используя перехватчики событий.
Важный
Не удаляйте журнал событий или родительский каталог или схему, в которой публикуется журнал событий. Удаление журнала событий может привести к сбою обновления конвейера во время будущих запусков.
схема журнала событий
В следующей таблице описывается схема журнала событий. Некоторые из этих полей содержат данные JSON, требующие синтаксического анализа для выполнения некоторых запросов, таких как поле details
. Azure Databricks поддерживает оператор :
для анализа полей JSON. См. :
(знак двоеточия) оператор.
Поле | Описание |
---|---|
id |
Уникальный идентификатор записи журнала событий. |
sequence |
Документ JSON, содержащий метаданные для идентификации и упорядочивания событий. |
origin |
Документ JSON, содержащий метаданные для источника события, например поставщик облачных служб, регион поставщика облачных служб, user_id , pipeline_id или pipeline_type , чтобы показать, где был создан конвейер, либо DBSQL или WORKSPACE . |
timestamp |
Время записи события. |
message |
Сообщение, доступное для чтения человеком, описывающее событие. |
level |
Тип события, например INFO , WARN , ERROR или METRICS . |
maturity_level |
Стабильность схемы событий. Возможные значения:
|
error |
Если произошла ошибка, будут предоставлены её подробности. |
details |
Документ JSON, содержащий структурированные сведения о событии. Это основное поле, используемое для анализа событий. |
event_type |
Тип события. |
запрос журнала событий
Заметка
В этом разделе описывается поведение и синтаксис по умолчанию для работы с журналами событий для конвейеров, настроенных с каталогом Unity и режимом публикации по умолчанию.
- Сведения о поведении конвейеров каталога Unity, использующих устаревший режим публикации, см. в статье Работа с журналом событий для конвейеров устаревшей публикации каталога Unity.
- Сведения о поведении и синтаксисе конвейеров хранилища метаданных Hive см. в статье Работа с журналом событий для конвейеров хранилища метаданных Hive.
По умолчанию DLT записывает журнал событий в скрытую таблицу Delta в каталоге по умолчанию и схеме, настроенной для конвейера. Несмотря на скрытие, таблица по-прежнему может запрашиваться всеми достаточно привилегированными пользователями. По умолчанию только владелец конвейера может запрашивать таблицу журнала событий.
По умолчанию имя скрытого журнала событий отформатировано как event_log_{pipeline_id}
, где идентификатор конвейера — это UUID, назначенный системой, в котором дефисы заменены на подчеркивания.
Вы можете взаимодействовать с конфигурацией JSON для публикации журнала событий. При публикации журнала событий укажите имя журнала событий и при необходимости укажите каталог и схему, как показано в следующем примере:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
Расположение журнала событий также служит местоположением схемы для любых запросов Auto Loader в потоке данных. Databricks рекомендует создать представление по таблице журнала событий перед изменением привилегий, так как некоторые параметры вычислений могут позволить пользователям получить доступ к метаданным схемы, если таблица журнала событий предоставляется напрямую. В следующем примере синтаксиса создается представление таблицы журнала событий и используется в примерах запросов журнала событий, включенных в эту статью.
CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;
Каждый экземпляр запуска конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор для самого последнего обновления и сохранить его во временном представлении под именем latest_update
. Это представление используется в примерах запросов журнала событий, включенных в эту статью:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
В каталоге Unity представления поддерживают потоковые запросы. В следующем примере используется структурированная потоковая передача для запроса представления, определенного в верхней части таблицы журнала событий:
df = spark.readStream.table("event_log_raw")
Владелец конвейера может опубликовать журнал событий в виде общедоступной таблицы Delta, переключив параметр Publish event log to metastore
в разделе Advanced конфигурации конвейера. При необходимости можно указать новое имя таблицы, каталог и схему для журнала событий.
сведения о происхождении запросов из журнала событий
События, содержащие сведения о происхождении, имеют тип события flow_definition
. Объект details:flow_definition
содержит output_dataset
и input_datasets
, определяющие каждую связь в графе.
Для извлечения входных и выходных наборов данных можно использовать следующий запрос, чтобы просмотреть сведения о происхождении:
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
output_dataset |
input_datasets |
---|---|
customers |
null |
sales_orders_raw |
null |
sales_orders_cleaned |
["customers", "sales_orders_raw"] |
sales_order_in_la |
["sales_orders_cleaned"] |
Запросите качество данных из журнала событий
Если вы определяете ожидания для наборов данных в конвейере, метрики качества данных хранятся в объекте details:flow_progress.data_quality.expectations
. События, содержащие сведения о качестве данных, имеют тип события flow_progress
. В следующем примере запрашивается метрика качества данных для последнего обновления конвейера:
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
---|---|---|---|
sales_orders_cleaned |
valid_order_number |
4083 | 0 |
Запрос событий автозагрузки из журнала событий
DLT создает события при обработке файлов Auto Loader. Для событий загрузчика, event_type
— operation_progress
, а details:operation_progress:type
— это либо AUTO_LOADER_LISTING
, либо AUTO_LOADER_BACKFILL
. Объект details:operation_progress
также включает поля status
, duration_ms
, auto_loader_details:source_path
и auto_loader_details:num_files_listed
.
В следующем примере выполняется запрос событий автозагрузчика для последнего обновления:
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,
latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest.update_id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')
Отслеживание очереди данных с помощью запроса к журналу событий
DLT отслеживает количество данных, присутствующих в невыполненной работы в объекте details:flow_progress.metrics.backlog_bytes
. События, содержащие метрики невыполненной работы, имеют тип события flow_progress
. В следующем примере запрашиваются метрики накопившихся задач для последнего обновления конвейера.
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
Заметка
Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии среды выполнения Databricks.
Мониторинг улучшенных событий автомасштабирования из журнала событий для конвейеров без включённых функций безсерверности
Для конвейеров DLT, которые не используют бессерверные вычисления, журнал событий записывает изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип события autoscale
. Сведения о изменении размера кластера хранятся в объекте details:autoscale
. В следующем примере выполняется запрос на изменение размера расширенного кластера с автоматическим масштабированием для последнего обновления конвейера.
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
мониторинг использования вычислительных ресурсов
cluster_resources
события предоставляют метрики по количеству слотов задач в кластере, уровню их использования и количеству задач, которые ожидают постановки в расписание.
Если включен расширенный автомасштабирование, события cluster_resources
также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executors
и optimal_num_executors
. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE
, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
и BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
.
Эти сведения можно просматривать вместе с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.
В следующем примере выполняется запрос истории размера очереди задач для последнего обновления конвейера.
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
В следующем примере запрашивается история использования за последнее обновление конвейера.
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
В следующем примере выполняется запрос журнала счетчиков исполнителя, сопровождаемых метриками, доступными только для расширенных конвейеров автомасштабирования, включая количество исполнителей, запрашиваемых алгоритмом в последнем запросе, оптимальное количество исполнителей, рекомендуемых алгоритмом на основе последних метрик, и состояния алгоритма автомасштабирования:
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
аудит потоков данных DLT
Вы можете использовать записи журнала событий DLT и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в DLT.
DLT использует учетные данные владельца конвейера для запуска обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. DLT регистрирует действия пользователя на конвейере, включая создание конвейера, изменение конфигурации и инициацию обновлений.
См. события каталога Unity для справки по событиям аудита каталога Unity.
Запрос действий пользователей в журнале событий
Журнал событий можно использовать для аудита событий, например действий пользователей. События, содержащие сведения о действиях пользователя, имеют тип события user_action
.
Сведения о действии хранятся в объекте user_action
в поле details
. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Сведения о создании представления event_log_raw
, используемого в этом запросе, см. в статье Запрос журнала событий.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
user@company.com |
2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
2021-05-27T00:35:51.971+0000 | START |
user@company.com |
информация о среде выполнения
Вы можете просмотреть информацию о среде выполнения для обновления конвейера, например, версию Databricks Runtime для обновления:
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11.0 |