次の方法で共有


DLT パイプラインの監視

この記事では、DLT パイプラインに組み込まれた監視機能と可観測性機能を使用する方法について説明します。 これらの機能では、次のようなタスクがサポートされます。

  • パイプラインの更新の進行状況と状態の監視。 UI で使用できるパイプラインの詳細 を参照してください。.
  • パイプラインの更新の成功または失敗などのパイプライン イベントに関するアラート。 パイプライン イベント の電子メール通知の追加を参照してください。
  • Apache Kafka や自動ローダー (パブリック プレビュー) などのストリーミング ソースのメトリックの表示。 を参照して、のストリーミングメトリクスを表示してください。
  • データ系列、データ品質メトリック、リソースの使用状況など、パイプラインの更新に関する詳細情報の抽出。 DLT イベント ログ を参照してください。.
  • 特定のイベントが発生したときに実行するカスタム アクションの定義。 イベント フックを使用した DLT パイプラインのカスタム監視の定義については、を参照してください。

クエリのパフォーマンスを検査および診断するには、「DLT パイプラインのクエリ履歴にアクセスする」を参照してください。 この機能はパブリック プレビュー段階にあります。

パイプライン イベントに対するメール通知を追加する

次の場合に通知を受信するように 1 つ以上のメール アドレスを構成できます:

  • パイプラインの更新が正常に完了しました。
  • パイプラインの更新は、再試行可能なエラーまたは再試行不可能なエラーで失敗します。 すべてのパイプラインエラーの通知を受け取る場合は、このオプションを選択します。
  • パイプラインの更新は、再試行できない (致命的な) エラーで失敗します。 再試行できないエラーが発生した場合にのみ通知を受け取る場合は、このオプションを選択します。
  • 1 つのデータ フローが失敗します。

パイプラインを 作成 または編集するときに電子メール通知を構成するには:

  1. [通知の追加] をクリックします。
  2. 通知を受信する 1 つ以上のメール アドレスを入力します。
  3. 各通知の種類のチェック ボックスをオンにして、構成済みのメール アドレスに送信します。
  4. [通知の追加] をクリックします。

UI で使用できるパイプラインの詳細は何ですか?

パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 既定では、パイプラインの詳細ページにテーブルの最新の更新プログラムが表示されますが、ドロップダウン メニューから古い更新プログラムを選択できます。

詳細には、パイプライン ID、ソース コード、コンピューティング コスト、製品エディション、パイプライン用に構成されたチャネルが含まれます。

データセットの表形式ビューを表示するには、[リスト] タブをクリックします。リスト ビューを使用すると、パイプライン内のすべてのデータセットをテーブル内の行として表すことができます。パイプライン DAG が大きすぎて、Graph ビューで視覚化するのに役立ちます。 データセット名、型、状態などの複数のフィルターを使用して、テーブルに表示されるデータセットを制御できます。 もう一度 DAG の視覚化に切り替えるには、[グラフ] をクリックします。

Run as ユーザーはパイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。 run as ユーザーを変更するには、[アクセス許可] をクリックしてパイプラインの所有者を変更します。

データセットの詳細を表示するにはどうすればよいですか?

パイプライン グラフまたはデータセットの一覧でデータセットをクリックすると、データセットに関する詳細が表示されます。 詳細には、データセット スキーマ、データ品質メトリック、データセットを定義するソース コードへのリンクが含まれます。

更新履歴の表示

パイプラインの更新の履歴と状態を表示するには、上部のバーにある更新履歴のドロップダウン メニューをクリックします。

ドロップダウン メニューで更新プログラムを選択して、更新プログラムのグラフ、詳細、イベントを表示します。 最新の更新プログラムに戻すには、[最新の更新プログラム 表示] をクリックします。

ストリーミング メトリックを表示する

重要

DLT のストリーミング監視機能は、パブリック プレビュー 段階にあります。

DLT パイプラインの各ストリーミング フローについて、Apache Kafka、Amazon Kinesis、Auto Loader、Delta テーブルなど、Spark Structured Streaming でサポートされているデータ ソースからのストリーミング メトリックを表示できます。 メトリックは、DLT UI の右側のウィンドウにグラフとして表示され、バックログの秒、バックログ バイト、バックログ レコード、バックログ ファイルが含まれます。 グラフには、分単位で集計された最大値が表示され、グラフにカーソルを合わせるとヒントに最大値が表示されます。 データは、現在の時刻から過去 48 時間に制限されます。

ストリーミング メトリックを使用できるパイプライン内のテーブルには、UI Graph ビューでパイプライン DAG を表示するときに、DLT グラフ アイコン アイコンが表示されます。 ストリーミング メトリックを表示するには、DLT グラフ アイコン をクリックして、右側のウィンドウの [フロー] タブにストリーミング メトリック グラフを表示します。 フィルターを適用して、ストリーミング メトリックを含むテーブルのみを表示することもできます。そのためには、[リスト] をクリックし、[ストリーミング メトリックが ] をクリックします。

各ストリーミング ソースでは、特定のメトリックのみがサポートされます。 ストリーミング ソースでサポートされていないメトリックは、UI で表示できません。 次の表は、サポートされているストリーミング ソースで使用できるメトリックを示しています。

ソース バックログ バイト数 バックログ レコード バックログ秒数 バックログ ファイル
カフカ
キネシス
デルタ
自動ローダー
Google Pub/Sub (グーグルパブサブ)

DLT イベント ログとは

DLT イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データ系列など、パイプラインに関連するすべての情報が含まれます。 イベント ログを使用して、データ パイプラインの状態を追跡、把握、および監視することができます。

イベント ログ エントリは、DLT ユーザー インターフェイス、DLT API、またはイベント ログに直接クエリを実行して表示できます。 このセクションでは、イベント ログに直接クエリを実行することに重点を置いています。

また、イベント フックを使用してイベント (例: アラートの送信) がログに記録されると、実行するカスタム アクションを定義することもできます。

重要

イベント ログ、またはイベント ログが発行されている親カタログまたはスキーマは削除しないでください。 イベント ログを削除すると、パイプラインが将来の実行中に更新に失敗する可能性があります。

イベントログスキーマ

次の表では、イベント ログ スキーマについて説明します。 フィールドの一部に、details フィールドなど、いくつかのクエリを実行するために解析を必要とする JSON データが含まれています。 Azure Databricks では、JSON フィールドを解析するための : 演算子がサポートされています。 : 演算子 (コロン記号)を参照してください。

フィールド 説明
id イベント ログ レコードの一意識別子。
sequence イベントを識別および順序付けるためのメタデータを含む JSON ドキュメント。
origin イベントの発生元のメタデータ (クラウド プロバイダー、クラウド プロバイダーリージョン、user_idpipeline_idpipeline_type など) を含む JSON ドキュメント。パイプラインが作成された場所 (DBSQL または WORKSPACE) を示します。
timestamp イベントが記録された時刻。
message 人が判読できる、イベントを説明するメッセージ。
level イベントの種類 (INFOWARNERRORMETRICS など)。
maturity_level イベント スキーマの安定性。 指定できる値は次のとおりです。
  • STABLE: このスキーマは安定しており、変更されません。
  • NULL: このスキーマは安定しており、変更されません。 NULL フィールドが追加される前にレコードが作成された場合、この値は maturity_level になる可能性があります (リリース 2022.37)。
  • EVOLVING: このスキーマは安定していないため、変更される可能性があります。
  • DEPRECATED: スキーマは非推奨となり、DLT ランタイムはいつでもこのイベントの生成を停止する可能性があります。
error エラーが発生した場合の、エラーの詳細説明。
details イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用される主なフィールドです。
event_type イベントの種類。

イベント ログのクエリを実行する

注釈

このセクションでは、Unity カタログで構成されたパイプラインのイベント ログを操作するための既定の動作と構文と、既定の発行モードについて説明します。

既定では、DLT は、パイプライン用に構成された既定のカタログとスキーマの非表示の Delta テーブルにイベント ログを書き込みます。 非表示の状態でも、十分な特権を持つすべてのユーザーがテーブルに対してクエリを実行できます。 既定では、パイプラインの所有者のみがイベント ログ テーブルに対してクエリを実行できます。

既定では、非表示のイベント ログの名前は event_log_{pipeline_id}として書式設定されます。パイプライン ID はシステム割り当て UUID で、ダッシュはアンダースコアに置き換えられます。

JSON 構成を操作して、イベント ログを発行できます。 イベント ログを発行する場合は、イベント ログの名前を指定し、必要に応じて、次の例のようにカタログとスキーマを指定できます。

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

イベント ログの場所は、パイプライン内の自動ローダー クエリのスキーマの場所としても機能します。 Databricks では、権限を変更する前にイベント ログ テーブルのビューを作成することをお勧めします。一部のコンピューティング設定では、イベント ログ テーブルが直接共有されている場合に、ユーザーがスキーマ メタデータにアクセスできる場合があるためです。 次の構文例では、イベント ログ テーブルにビューを作成し、この記事に含まれるイベント ログ クエリの例で使用します。

CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;

パイプライン実行の各インスタンスは、更新と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリを実行して、最新の更新プログラムの識別子を検索し、latest_update 一時ビューに保存します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Unity カタログでは、ビューはストリーミング クエリをサポートします。 次の例では、構造化ストリーミングを使用して、イベント ログ テーブルの上に定義されたビューに対してクエリを実行します。

df = spark.readStream.table("event_log_raw")

パイプラインの所有者は、パイプライン構成の Advanced セクションで Publish event log to metastore オプションを切り替えることで、イベント ログをパブリック Delta テーブルとして発行できます。 必要に応じて、イベント ログの新しいテーブル名、カタログ、およびスキーマを指定できます。

イベント ログに系列情報のクエリを実行する

データ系列に関する情報を含むイベントには、イベントの種類 flow_definition があります。 details:flow_definition オブジェクトには、グラフ内の各リレーションシップを定義する output_datasetinput_datasets が含まれます。

次のクエリを使用して、入力データセットと出力データセットを抽出して系列情報を表示できます。

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
customers null
sales_orders_raw null
sales_orders_cleaned ["customers", "sales_orders_raw"]
sales_order_in_la ["sales_orders_cleaned"]

イベント ログにデータ品質のクエリを実行する

パイプラインのデータセットで期待値を定義すると、データ品質メトリックが details:flow_progress.data_quality.expectations オブジェクトに格納されます。 データ品質に関する情報を含むイベントには、イベントの種類 flow_progress があります。 次の例では、最後のパイプライン更新のデータ品質メトリックを照会します。

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
sales_orders_cleaned valid_order_number 4083 0

イベント ログからの自動ローダー イベントのクエリ

DLT は、自動ローダーがファイルを処理するときにイベントを生成します。 自動ローダー イベントの場合、event_typeoperation_progress され、details:operation_progress:typeAUTO_LOADER_LISTING または AUTO_LOADER_BACKFILLです。 details:operation_progress オブジェクトには、statusduration_msauto_loader_details:source_path、および auto_loader_details:num_files_listed フィールドも含まれます。

次の例では、最新の更新プログラムの自動ローダー イベントを照会します。

SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,
  latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest.update_id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')

イベントログにクエリを実行してデータ バックログを監視する

DLT は、details:flow_progress.metrics.backlog_bytes オブジェクトのバックログに存在するデータの量を追跡します。 バックログ メトリックを含むイベントには、イベントの種類 flow_progress があります。 次の例では、最後のパイプライン更新のバックログ メトリックを照会します。

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

注釈

パイプラインのデータ ソースの種類と Databricks Runtime のバージョンによっては、バックログ メトリックを使用できない場合があります。

サーバーレスが有効になっていないパイプラインのイベント ログからの拡張自動スケール イベントを監視する

サーバーレス コンピューティングを使用しない DLT パイプラインの場合、パイプラインで拡張自動スケールが有効になっている場合、イベント ログはクラスターのサイズ変更をキャプチャします。 拡張自動スケールに関する情報を含むイベントには、イベントの種類が autoscale。 クラスターのサイズ変更要求の情報は、details:autoscale オブジェクトに格納されます。 次の例では、最後のパイプライン更新に対する拡張自動スケール クラスターのサイズ変更要求を照会します。

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

リソース使用率を監視する

cluster_resources イベントでは、クラスター内のタスク スロットの数、それらのタスク スロットの使用率、スケジュールを待機しているタスクの数に関するメトリックが提供されます。

拡張自動スケールが有効になっている場合、 cluster_resources イベントには、 latest_requested_num_executorsoptimal_num_executorsなど、自動スケール アルゴリズムのメトリックも含まれます。 イベントには、アルゴリズムの状態も、CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION などのさまざまな状態として表示されます。 この情報は、自動スケール イベントと組み合わせて表示して、拡張自動スケールの全体像を提供できます。

次の例では、最後のパイプライン更新のタスク キュー サイズ履歴を照会します。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、最後のパイプライン更新の使用率履歴を照会します。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、強化された自動スケール パイプラインでのみ使用できるメトリックと共に、最新の要求でアルゴリズムによって要求された Executor の数、最新のメトリックに基づいてアルゴリズムによって推奨される Executor の最適な数、自動スケール アルゴリズムの状態など、Executor カウント履歴を照会します。

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

分散型台帳技術(DLT)パイプラインの監査

DLT イベント ログ レコードやその他の Azure Databricks 監査ログを使用して、DLT でデータがどのように更新されているかを把握できます。

DLT は、パイプライン所有者の資格情報を使用して更新を実行します。 パイプライン所有者を更新することで、使用される認証情報を変更できます。 DLT は、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインでのアクションについてユーザーを記録します。

Unity カタログ監査イベントのリファレンスについては、Unity カタログ イベント を参照してください。

イベント ログにユーザー アクションのクエリを実行する

イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。 ユーザー アクションに関する情報を含むイベントには、イベントの種類 user_action があります。

アクションに関する情報は、user_action フィールドの details オブジェクトに格納されます。 次のクエリを使用して、ユーザー イベントの監査ログを作成します。 このクエリで使用される event_log_raw ビューを作成するには、「イベント ログのクエリを実行する」を参照してください。

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

ランタイム情報

パイプライン更新プログラムのランタイム情報 (更新プログラムの Databricks Runtime バージョンなど) を表示できます。

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0