Поделиться через


Мониторинг конвейеров DLT

В этой статье описывается использование встроенных функций мониторинга и наблюдаемости для конвейеров DLT. Такие функции поддерживают такие задачи, как:

Чтобы проверить и диагностировать производительность запросов, см. историю запросов DLT-пайплайнов. Эта функция доступна в общедоступной предварительной версии.

Добавление уведомлений по электронной почте для событий конвейера

Вы можете настроить один или несколько адресов электронной почты для получения уведомлений, когда происходит следующее:

  • Обновление конвейера успешно завершено.
  • Обновление конвейера завершается сбоем либо с ошибкой, допускающей повторные попытки, либо с ошибкой, которую нельзя повторить. Выберите этот параметр, чтобы получить уведомление обо всех сбоях конвейера.
  • Обновление конвейера завершается неустранимой (фатальной) ошибкой. Выберите этот параметр, чтобы получить уведомление только в том случае, если возникает ошибка без повторных попыток.
  • Сбой одного потока данных.

Чтобы настроить уведомления по электронной почте при создании или изменении конвейера:

  1. Щелкните Добавить уведомление.
  2. Введите один или несколько адресов электронной почты для получения уведомлений.
  3. Установите флажок для каждого типа уведомления, чтобы отправить на настроенные адреса электронной почты.
  4. Щелкните Добавить уведомление.

Какие сведения о конвейере доступны в пользовательском интерфейсе?

График конвейера отображается сразу после успешного запуска обновления конвейера. Стрелки представляют зависимости между наборами данных в конвейере. По умолчанию на странице сведений о конвейере отображается последнее обновление таблицы, но в раскрывающемся меню можно выбрать старые обновления.

Сведения включают идентификатор конвейера, исходный код, затраты на вычисления, выпуск продукта и канал, настроенный для конвейера.

Чтобы просмотреть табличное представление наборов данных, перейдите на вкладку List. Представление List позволяет просматривать все наборы данных в вашем конвейере, представленные в виде строки в таблице, и полезно, если DAG конвейера слишком велик для визуализации в представлении Graph. Вы можете управлять наборами данных, отображаемыми в таблице, с помощью нескольких фильтров, таких как имя набора данных, тип и состояние. Чтобы вернуться к визуализации DAG, щелкните Graph.

Пользователь , запускающийся от имени, является владельцем конвейера, а обновления конвейера выполняются с разрешениями этого пользователя. Чтобы изменить пользователя run as, сначала щелкните Разрешения, а затем измените владельца конвейера.

Как просмотреть сведения о наборе данных?

Щелкнув набор данных в графе конвейера или списке наборов данных, отображаются сведения о наборе данных. Сведения включают схему набора данных, метрики качества данных и ссылку на исходный код, определяющий набор данных.

История обновлений

Чтобы просмотреть журнал и состояние обновлений конвейера, щелкните раскрывающееся меню журнала обновлений в верхней строке.

Выберите обновление в раскрывающемся меню, чтобы увидеть его график, подробности и события. Чтобы вернуться к последнему обновлению, щелкните Показать последнее обновление.

Просмотр метрик потокового вещания

Важный

Наблюдаемость потоковой передачи для DLT доступна в общедоступной предварительной версии.

Метрики потоковой обработки данных можно просматривать из источников данных, поддерживаемых структурированной потоковой обработкой данных в Spark, таких как Apache Kafka, Amazon Kinesis, Auto Loader и Delta Tables, для каждого потока в вашем конвейере DLT. Метрики отображаются в виде диаграмм в правой области пользовательского интерфейса DLT и включают секунды невыполненной работы, байты невыполненной работы, записи невыполненной работы и файлы невыполненной работы. На диаграммах отображается максимальное значение, агрегированное по минуте, а подсказка отображает максимальные значения при наведении указателя мыши на диаграмму. Данные ограничены последними 48 часами с текущего времени.

Таблицы в вашем конвейере, для которых доступны стриминговые метрики, отображают значок диаграммы DLT при просмотре структуры DAG конвейера в графическом режиме в пользовательском интерфейсе. Чтобы просмотреть метрики потоковой передачи, щелкните значок диаграммы DLT, чтобы отобразить диаграмму метрик потоковой передачи на вкладке потоков на правой панели. Вы также можете применить фильтр для просмотра только таблиц с метриками потоковой передачи, щелкнув List, а затем щелкнув Имеет метрики потоковой передачи.

Каждый источник потоковой передачи поддерживает только определенные метрики. Метрики, не поддерживаемые источником потоковой передачи, недоступны для просмотра в пользовательском интерфейсе. В следующей таблице показаны метрики, доступные для поддерживаемых источников потоковой передачи:

источник невыполненные байты записи невыполненной работы секунды невыполненной работы файлы невыполненной работы
Кафка
кинетика
Дельта
Автозагрузчик
Google Pub/Sub

Что такое журнал событий DLT?

Журнал событий DLT содержит все сведения, связанные с конвейером, включая журналы аудита, проверки качества данных, ход выполнения конвейера и происхождения данных. Журнал событий можно использовать для отслеживания, понимания и мониторинга состояния конвейеров данных.

Записи журнала событий можно просматривать в пользовательском интерфейсе DLT, API DLT или напрямую запрашивать журнал событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.

Можно также определить пользовательские действия для выполнения при регистрации событий, например, отправки оповещений, используя перехватчики событий.

Важный

Не удаляйте журнал событий или родительский каталог или схему, в которой публикуется журнал событий. Удаление журнала событий может привести к сбою обновления конвейера во время будущих запусков.

схема журнала событий

В следующей таблице описывается схема журнала событий. Некоторые из этих полей содержат данные JSON, требующие синтаксического анализа для выполнения некоторых запросов, таких как поле details. Azure Databricks поддерживает оператор : для анализа полей JSON. См. : (знак двоеточия) оператор.

Поле Описание
id Уникальный идентификатор записи журнала событий.
sequence Документ JSON, содержащий метаданные для идентификации и упорядочивания событий.
origin Документ JSON, содержащий метаданные для источника события, например поставщик облачных служб, регион поставщика облачных служб, user_id, pipeline_idили pipeline_type, чтобы показать, где был создан конвейер, либо DBSQL или WORKSPACE.
timestamp Время записи события.
message Сообщение, доступное для чтения человеком, описывающее событие.
level Тип события, например INFO, WARN, ERRORили METRICS.
maturity_level Стабильность схемы событий. Возможные значения:
  • STABLE: схема стабильна и не изменится.
  • NULL: схема стабильна и не изменится. Значение может быть NULL, если запись была создана до того, как было добавлено поле maturity_level (релиз 2022.37).
  • EVOLVING: схема не стабильна и может измениться.
  • DEPRECATED: схема устарела, и среда выполнения DLT может перестать создавать это событие в любое время.
error Если произошла ошибка, будут предоставлены её подробности.
details Документ JSON, содержащий структурированные сведения о событии. Это основное поле, используемое для анализа событий.
event_type Тип события.

запрос журнала событий

Заметка

В этом разделе описывается поведение и синтаксис по умолчанию для работы с журналами событий для конвейеров, настроенных с каталогом Unity и режимом публикации по умолчанию.

По умолчанию DLT записывает журнал событий в скрытую таблицу Delta в каталоге по умолчанию и схеме, настроенной для конвейера. Несмотря на скрытие, таблица по-прежнему может запрашиваться всеми достаточно привилегированными пользователями. По умолчанию только владелец конвейера может запрашивать таблицу журнала событий.

По умолчанию имя скрытого журнала событий отформатировано как event_log_{pipeline_id}, где идентификатор конвейера — это UUID, назначенный системой, в котором дефисы заменены на подчеркивания.

Вы можете взаимодействовать с конфигурацией JSON для публикации журнала событий. При публикации журнала событий укажите имя журнала событий и при необходимости укажите каталог и схему, как показано в следующем примере:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

Расположение журнала событий также служит местоположением схемы для любых запросов Auto Loader в потоке данных. Databricks рекомендует создать представление по таблице журнала событий перед изменением привилегий, так как некоторые параметры вычислений могут позволить пользователям получить доступ к метаданным схемы, если таблица журнала событий предоставляется напрямую. В следующем примере синтаксиса создается представление таблицы журнала событий и используется в примерах запросов журнала событий, включенных в эту статью.

CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;

Каждый экземпляр запуска конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор для самого последнего обновления и сохранить его во временном представлении под именем latest_update. Это представление используется в примерах запросов журнала событий, включенных в эту статью:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

В каталоге Unity представления поддерживают потоковые запросы. В следующем примере используется структурированная потоковая передача для запроса представления, определенного в верхней части таблицы журнала событий:

df = spark.readStream.table("event_log_raw")

Владелец конвейера может опубликовать журнал событий в виде общедоступной таблицы Delta, переключив параметр Publish event log to metastore в разделе Advanced конфигурации конвейера. При необходимости можно указать новое имя таблицы, каталог и схему для журнала событий.

сведения о происхождении запросов из журнала событий

События, содержащие сведения о происхождении, имеют тип события flow_definition. Объект details:flow_definition содержит output_dataset и input_datasets, определяющие каждую связь в графе.

Для извлечения входных и выходных наборов данных можно использовать следующий запрос, чтобы просмотреть сведения о происхождении:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
customers null
sales_orders_raw null
sales_orders_cleaned ["customers", "sales_orders_raw"]
sales_order_in_la ["sales_orders_cleaned"]

Запросите качество данных из журнала событий

Если вы определяете ожидания для наборов данных в конвейере, метрики качества данных хранятся в объекте details:flow_progress.data_quality.expectations. События, содержащие сведения о качестве данных, имеют тип события flow_progress. В следующем примере запрашивается метрика качества данных для последнего обновления конвейера:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
sales_orders_cleaned valid_order_number 4083 0

Запрос событий автозагрузки из журнала событий

DLT создает события при обработке файлов Auto Loader. Для событий загрузчика, event_typeoperation_progress, а details:operation_progress:type — это либо AUTO_LOADER_LISTING, либо AUTO_LOADER_BACKFILL. Объект details:operation_progress также включает поля status, duration_ms, auto_loader_details:source_pathи auto_loader_details:num_files_listed.

В следующем примере выполняется запрос событий автозагрузчика для последнего обновления:

SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,
  latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest.update_id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')

Отслеживание очереди данных с помощью запроса к журналу событий

DLT отслеживает количество данных, присутствующих в невыполненной работы в объекте details:flow_progress.metrics.backlog_bytes. События, содержащие метрики невыполненной работы, имеют тип события flow_progress. В следующем примере запрашиваются метрики накопившихся задач для последнего обновления конвейера.

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Заметка

Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии среды выполнения Databricks.

Мониторинг улучшенных событий автомасштабирования из журнала событий для конвейеров без включённых функций безсерверности

Для конвейеров DLT, которые не используют бессерверные вычисления, журнал событий записывает изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип события autoscale. Сведения о изменении размера кластера хранятся в объекте details:autoscale. В следующем примере выполняется запрос на изменение размера расширенного кластера с автоматическим масштабированием для последнего обновления конвейера.

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

мониторинг использования вычислительных ресурсов

cluster_resources события предоставляют метрики по количеству слотов задач в кластере, уровню их использования и количеству задач, которые ожидают постановки в расписание.

Если включен расширенный автомасштабирование, события cluster_resources также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executorsи optimal_num_executors. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSи BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Эти сведения можно просматривать вместе с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.

В следующем примере выполняется запрос истории размера очереди задач для последнего обновления конвейера.

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

В следующем примере запрашивается история использования за последнее обновление конвейера.

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

В следующем примере выполняется запрос журнала счетчиков исполнителя, сопровождаемых метриками, доступными только для расширенных конвейеров автомасштабирования, включая количество исполнителей, запрашиваемых алгоритмом в последнем запросе, оптимальное количество исполнителей, рекомендуемых алгоритмом на основе последних метрик, и состояния алгоритма автомасштабирования:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

аудит потоков данных DLT

Вы можете использовать записи журнала событий DLT и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в DLT.

DLT использует учетные данные владельца конвейера для запуска обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. DLT регистрирует действия пользователя на конвейере, включая создание конвейера, изменение конфигурации и инициацию обновлений.

См. события каталога Unity для справки по событиям аудита каталога Unity.

Запрос действий пользователей в журнале событий

Журнал событий можно использовать для аудита событий, например действий пользователей. События, содержащие сведения о действиях пользователя, имеют тип события user_action.

Сведения о действии хранятся в объекте user_action в поле details. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Сведения о создании представления event_log_raw, используемого в этом запросе, см. в статье Запрос журнала событий.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

информация о среде выполнения

Вы можете просмотреть информацию о среде выполнения для обновления конвейера, например, версию Databricks Runtime для обновления:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0