次の方法で共有


DLT パイプラインの開発

パイプライン コードの開発とテストは、他の Apache Spark ワークロードとは異なります。 この記事では、パイプライン コードを開発する際にサポートされる機能、ベスト プラクティス、および考慮事項の概要について説明します。 その他の推奨事項とベスト プラクティスについては、「ソフトウェア開発 & DevOps のベスト プラクティスを DLT パイプラインに適用する」を参照してください。

手記

コードを検証したり、更新を実行したりするには、ソース コードをパイプライン構成に追加する必要があります。 「DLT パイプラインの構成」を参照してください。

パイプライン ソース コードに対して有効なファイルは何ですか?

DLT パイプライン コードには、Python または SQL を使用できます。 1 つのパイプラインをバックアップする Python ファイルと SQL ソース コード ファイルを混在させることができますが、各ファイルに含めることができる言語は 1 つだけです。 「Python を使用してパイプライン コードを開発する」と「SQL を使用したパイプライン コードの開発」参照してください。

パイプラインのソース コードを指定するときに、ノートブックとワークスペース ファイルを使用できます。 ワークスペース ファイルは、任意の IDE または Databricks ファイル エディターで作成された Python または SQL スクリプトを表します。 「ワークスペース ファイルとは」を参照してください。.

Python コードをモジュールまたはライブラリとして開発する場合は、コードをインストールしてインポートしてから、ソース コードとして構成された Python ノートブックまたはワークスペース ファイルからメソッドを呼び出す必要があります。 DLT パイプライン の Python 依存関係の管理に関するページを参照してください。

手記

Python ノートブックで任意の SQL コマンドを使用する必要がある場合は、構文パターン spark.sql("<QUERY>") を使用して、SQL を Python コードとして実行できます。

Unity カタログ関数を使用すると、SQL で使用する任意の Python ユーザー定義関数を登録できます。 Unity カタログ のユーザー定義関数 (UDF) のを参照してください。

DLT 開発機能の概要

DLT は、多くの Azure Databricks 機能を拡張して活用し、新しい機能と概念を導入します。 次の表に、パイプライン コード開発をサポートする概念と機能の概要を示します。

特徴 説明
開発モード 新しいパイプラインは、既定で開発モードで実行するように構成されます。 Databricks では、対話型の開発とテストに開発モードを使用することをお勧めします。 開発モードと運用モードを参照してください。
検証 Validate 更新プログラムは、テーブルに対して更新を実行することなく、パイプライン ソース コードの正確性を検証します。 「テーブルが更新されるのを待たずにパイプラインでエラーを確認する」を参照してください。
ノートブック DLT パイプラインのソース コードとして構成されたノートブックには、コードの検証と更新の実行のための対話型オプションが用意されています。 「ノートブックで DLT パイプラインを開発およびデバッグする」を参照してください。
パラメーター ソース コードとパイプライン構成のパラメーターを活用して、テストと拡張性を簡素化します。 「DLT パイプラインでパラメーターを使用する」を参照してください。
Databricks アセット バンドル Databricks アセット バンドルを使用すると、パイプライン構成とソース コードをワークスペース間で移動できます。 「DLT パイプラインを Databricks Asset Bundle プロジェクトに変換する」を参照してください。

開発とテスト用のサンプル データセットを作成する

Databricks では、開発データセットとテスト データセットを作成して、予想されるデータと、形式が正しくないレコードや破損している可能性のあるレコードを含むパイプライン ロジックをテストすることをお勧めします。 次のような、開発とテストに役立つデータセットを作成する方法は複数あります。

  • 運用データセットからデータのサブセットを選択します。
  • PII を含むソースには、匿名化または人為的に生成されたデータを使用します。
  • ダウンストリーム変換ロジックに基づいて、明確に定義された結果を含むテスト データを作成します。
  • データ スキーマの期待を損なうレコードを作成して、潜在的なデータ破損、形式が正しくないレコード、アップストリームのデータ変更を予測します。

たとえば、次のコードを使用してデータセットを定義するノートブックがあるとします。

CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
  "/production/data",
  format => "json")

次のようなクエリを使用して、特定のレコードを含むサンプル データセットを作成できます。

CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

次の例では、公開されたデータをフィルター処理して、開発またはテスト用の実稼働データのサブセットを作成する方法を示します。

CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

これらの異なるデータセットを使用するには、変換ロジックを実装するノートブックで複数のパイプラインを作成します。 各パイプラインは、input_data データセットからデータを読み取ることができますが、環境に固有のデータセットを作成するノートブックを含むように構成されています。