次の方法で共有


アダプティブ クエリの実行

アダプティブ クエリ実行 (AQE) は、クエリの実行中に発生するクエリの再最適化です。

実行時に再最適化を行う理由は、Azure Databricks では、シャッフルおよびブロードキャスト交換 (AQE ではクエリ ステージと呼ばれます) の最後に最新の正確な統計情報が得られることにあります。 その結果、Azure Databricks は、より優れた物理戦略を選択したり、最適なシャッフル後 partition サイズと数を選択したり、ヒントを必要とするために使用される最適化 (スキュー join 処理など) を実行したりできます。

これは、統計収集が有効でない場合や統計が古い場合に非常に便利です。 また、複雑なクエリの途中やデータ スキューの発生後など、静的に派生した統計が不正確である where にも役立ちます。

資格

AQE は既定で有効になっています。 これには 4 つの主要な機能があります。

  • 並べ替えマージjoinをブロードキャスト ハッシュjoinに動的に変更します。
  • シャッフル交換後にパーティションを動的に結合します (小さなパーティションを合理的なサイズのパーティションに結合します)。 非常に小さいタスクでは、I/O スループットが低下し、スケジュールのオーバーヘッドとタスクのセットアップのオーバーヘッドが多くなる傾向があります。 小さなタスクを組み合わせると、リソースが節約され、クラスターのスループットが向上します。
  • 偏りのあるタスクをほぼ均等なサイズのタスクに分割 (および必要に応じて複製) することにより、並べ替えマージjoinおよびシャッフル ハッシュjoinでスキューを動的に処理します。
  • 空のリレーションを動的に検出して伝達します。

アプリケーション

AQE は、次のすべてのクエリに適用されます。

  • 非ストリーミング
  • 少なくとも 1 つの交換 (通常、join、集計、または windowがある場合)、1 つのサブクエリ、またはその両方を含みます。

すべての AQE 適用クエリが必ずしも再最適化されるとは限りません。 再最適化では、静的にコンパイルされたクエリ プランとは異なるクエリ プランが発生する場合もあれば、そうでない場合もあります。 クエリのプランが AQE によって変更されたかどうかを確認するには、次のセクション クエリ プランを参照してください。

クエリ プラン

このセクションでは、さまざまな方法でクエリ プランを調べる方法について説明します。

このセクションでは、次の操作を行います。

Spark UI

AdaptiveSparkPlan ノード

AQE 適用クエリには、通常、各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。 クエリが実行される前または実行される前に、対応する AdaptiveSparkPlan ノードの isFinalPlan フラグが falseとして表示されます。クエリの実行が完了すると、isFinalPlan フラグが true. に変わります

進化する計画

クエリ プラン図は、実行が進行するにつれて進化し、実行中の最新のプランが反映されます。 既に実行されている(メトリックが使用可能な)ノードは変更されませんが、実行されていないノードは再最適化の結果として時間の経過とともに変更される可能性があります。

クエリ プラン図の例を次に示します。

クエリプランのダイアグラム

DataFrame.explain()

AdaptiveSparkPlan ノード

AQE 適用クエリには、通常、各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。 クエリが実行される前または実行される前に、対応する AdaptiveSparkPlan ノードの isFinalPlan フラグが falseとして表示されます。クエリの実行が完了すると、isFinalPlan フラグが trueに変わります。

現在のプランと初期プラン

AdaptiveSparkPlan ノードの下には、実行が完了したかどうかに応じて、初期プラン (AQE 最適化を適用する前のプラン) と現在のプランまたは最終プランの両方があります。 現在のプランは、実行が進むにつれて進化します。

ランタイム統計

各シャッフル ステージとブロードキャスト ステージには、データ統計が含まれています。

ステージの実行前または実行中は、統計はコンパイル時の推定値であり、例えばフラグ isRuntimefalseの状態になります(例: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);)。

ステージの実行が完了すると、統計は実行時に収集され、フラグ isRuntimetrueになります (例: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

DataFrame.explain の例を次に示します。

  • 実行前

    実行前

  • 実行中

    実行中の 実行中

  • 実行後

    実行後

SQL EXPLAIN

AdaptiveSparkPlan ノード

AQE 適用クエリには、通常、各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。

現在のプランはありません

SQL EXPLAIN はクエリを実行しないため、現在のプランは常に初期プランと同じであり、最終的に AQE によって実行 get 内容は反映されません。

SQL explain の例を次に示します。

SQL explain

有効性

1 つ以上の AQE 最適化が有効になると、クエリ プランが変更されます。 これらの AQE 最適化の効果は、現在のプランと最終的なプランの差異、および初期プランと特定プランノードの差異によって示されています。

  • 現在/最終の計画と初期計画の間で異なる物理ノード join を踏まえ、並べ替えマージ join をブロードキャストハッシュ joinに動的に変更します

    Join方法の文字列

  • パーティションを動的に結合する: ノード CustomShuffleReader とプロパティ Coalesced

    カスタムシャッフルリーダー

    のカスタムシャッフルリーダー文字列

  • スキューjoinを動的に処理する: ノード SortMergeJoin のフィールド isSkew が true として示されます。

    スキューjoinプラン

    スキューjoin文字列

  • 空のリレーションを動的に検出して伝達します。プランの一部 (または全体) は、ノード LocalTableScan に置き換えられ、リレーション フィールドは空になります。

    ローカル table スキャン

    ローカル table スキャン文字列

構成

このセクションでは、次の操作を行います。

アダプティブ クエリ実行を有効または無効にする

財産
spark.databricks.optimizer.adaptive.enabled

型: Boolean

アダプティブ クエリ実行を有効または無効にするかどうか。

既定値: true

自動最適化シャッフル機能を有効にする

財産
spark.sql.shuffle.partitions

タイプ: Integer

結合または集計のデータをシャッフルするときに使用する既定のパーティション数。 auto 値を設定すると、自動最適化シャッフルが有効になり、クエリ プランとクエリ入力データ サイズに基づいてこの数が自動的に決定されます。

注: 構造化ストリーミングの場合、同じチェックポイントの場所からクエリを再起動する間は、この構成を変更できません。

既定値: 200

ソート・マージ join をブロードキャスト・ハッシュ join に動的に変更する

財産
spark.databricks.adaptive.autoBroadcastJoinThreshold

型: Byte String

実行時にブロードキャスト join への切り替えをトリガーするしきい値。

既定値: 30MB

パーティションを動的に結合する

財産
spark.sql.adaptive.coalescePartitions.enabled

型: Boolean

partition 合体を有効または無効にするかどうかを指定します。

既定値: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

タイプ: Byte String

結合後のターゲット サイズ。 結合された partition サイズは、このターゲット サイズに近くなりますが、このサイズより大きくはありません。

既定値: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

型: Byte String

結合後のパーティションの最小サイズ。 結合したpartitionのサイズは、このサイズ以上になります。

既定値: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

型: Integer

結合後のパーティションの最小数。 この設定は次のものを明示的にオーバーライドするため、推奨されません:
spark.sql.adaptive.coalescePartitions.minPartitionSize

既定値: クラスターコア数の2倍

スキュー join を動的に処理する

財産
spark.sql.adaptive.skewJoin.enabled

型: Boolean

スキュー join の処理を有効にするか無効にするか。

既定値: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

型: Integer

partition の中央値サイズを乗じることで、partition が歪んでいるかどうかを判断するための一因となる要素。

既定値: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

型: Byte String

partition が歪んでいるかどうかを判断するためのしきい値。

既定値: 256MB

(partition size > skewedPartitionFactor * median partition size)(partition size > skewedPartitionThresholdInBytes) の両方が trueされている場合、partition は歪んでいると見なされます。

空のリレーションを動的に検出して伝達する

財産
spark.databricks.adaptive.emptyRelationPropagation.enabled

型: Boolean

動的な空のリレーション伝達を有効または無効にするかどうか。

既定値: true

よく寄せられる質問 (FAQ)

このセクションでは、次の操作を行います。

AQE が小さな jointableをブロードキャストしなかったのはなぜですか?

ブロードキャストが予想される関係のサイズがこのしきい値に該当するが、まだブロードキャストされていない場合:

  • join の種類を確認します。 ブロードキャストは、特定の join の種類ではサポートされていません。たとえば、LEFT OUTER JOIN の左関係はブロードキャストできません。
  • リレーションに多数の空のパーティションが含まれている場合、タスクの大部分は並べ替えマージ join を使用して迅速に完了するか、スキュー join 処理で最適化できる可能性があります。 AQE では、空でないパーティションの割合が spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoinより低い場合に、このような並べ替えマージ結合をブロードキャスト ハッシュ結合に変更することを回避します。

AQE を有効にしたブロードキャスト join 戦略ヒントを引き続き使用する必要がありますか?

はい。 静的に計画されたブロードキャスト join は、通常、AQE によって動的に計画されたブロードキャストよりもパフォーマンスが高くなります。これは、AQE が join の両側に対してシャッフルを実行するまで (実際の関係サイズが取得されるまで) ブロードキャスト join に切り替えない可能性があるためです。 そのため、クエリがわかっている場合は、ブロードキャスト ヒントを使用することをお勧めします。 AQE では、静的最適化と同じ方法でクエリ ヒントが考慮されますが、ヒントの影響を受けず動的最適化を適用することもできます。

スキュー join のヒントと AQE スキュー join の最適化の違いは何ですか? どれを使用する必要がありますか?

AQE スキュー join は完全に自動であり、一般的には対応するヒントよりもパフォーマンスが優れているため、スキュー join ヒントを使用するのではなく、AQE スキュー join 処理に依存することをお勧めします。

AQE によってjoin順序が自動的に調整されなかったのはなぜですか?

動的 join の並べ替えは AQE の一部ではありません。

AQE がデータ スキューを検出しなかったのはなぜですか?

AQE が歪んだ partitionとして partition を検出するには、次の 2 つのサイズ条件を満たす必要があります。

  • partition サイズが spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes より大きい (既定では 256 MB)
  • partition サイズは、すべてのパーティションの中央値サイズに傾斜因子 partitionspark.sql.adaptive.skewJoin.skewedPartitionFactor (既定では 5) をかけたものより大きくなっています。

さらに、スキュー処理のサポートは、LEFT OUTER JOINなど、特定の join の種類に制限されており、左側のスキューのみを最適化できます。

遺産

"アダプティブ実行" という用語は Spark 1.6 以降に存在しましたが、Spark 3.0 の新しい AQE は根本的に異なります。 機能面では、Spark 1.6 は "パーティションを動的に結合する" 部分のみを実行します。 技術的なアーキテクチャに関しては、新しい AQE はランタイム統計に基づくクエリの動的な計画と再計画のフレームワークであり、この記事で説明した最適化など、さまざまな最適化をサポートし、より潜在的な最適化を可能にするために拡張できます。