アダプティブ クエリの実行
アダプティブ クエリ実行 (AQE) は、クエリの実行中に発生するクエリの再最適化です。
実行時に再最適化を行う理由は、Azure Databricks では、シャッフルおよびブロードキャスト交換 (AQE ではクエリ ステージと呼ばれます) の最後に最新の正確な統計情報が得られることにあります。 その結果、Azure Databricks は、より優れた物理戦略を選択したり、最適なシャッフル後 partition サイズと数を選択したり、ヒントを必要とするために使用される最適化 (スキュー join 処理など) を実行したりできます。
これは、統計収集が有効でない場合や統計が古い場合に非常に便利です。 また、複雑なクエリの途中やデータ スキューの発生後など、静的に派生した統計が不正確である where にも役立ちます。
資格
AQE は既定で有効になっています。 これには 4 つの主要な機能があります。
- 並べ替えマージjoinをブロードキャスト ハッシュjoinに動的に変更します。
- シャッフル交換後にパーティションを動的に結合します (小さなパーティションを合理的なサイズのパーティションに結合します)。 非常に小さいタスクでは、I/O スループットが低下し、スケジュールのオーバーヘッドとタスクのセットアップのオーバーヘッドが多くなる傾向があります。 小さなタスクを組み合わせると、リソースが節約され、クラスターのスループットが向上します。
- 偏りのあるタスクをほぼ均等なサイズのタスクに分割 (および必要に応じて複製) することにより、並べ替えマージjoinおよびシャッフル ハッシュjoinでスキューを動的に処理します。
- 空のリレーションを動的に検出して伝達します。
アプリケーション
AQE は、次のすべてのクエリに適用されます。
- 非ストリーミング
- 少なくとも 1 つの交換 (通常、join、集計、または windowがある場合)、1 つのサブクエリ、またはその両方を含みます。
すべての AQE 適用クエリが必ずしも再最適化されるとは限りません。 再最適化では、静的にコンパイルされたクエリ プランとは異なるクエリ プランが発生する場合もあれば、そうでない場合もあります。 クエリのプランが AQE によって変更されたかどうかを確認するには、次のセクション クエリ プランを参照してください。
クエリ プラン
このセクションでは、さまざまな方法でクエリ プランを調べる方法について説明します。
このセクションでは、次の操作を行います。
Spark UI
AdaptiveSparkPlan
ノード
AQE 適用クエリには、通常、各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan
ノードが含まれます。
クエリが実行される前または実行される前に、対応する AdaptiveSparkPlan
ノードの isFinalPlan
フラグが false
として表示されます。クエリの実行が完了すると、isFinalPlan
フラグが true.
に変わります
進化する計画
クエリ プラン図は、実行が進行するにつれて進化し、実行中の最新のプランが反映されます。 既に実行されている(メトリックが使用可能な)ノードは変更されませんが、実行されていないノードは再最適化の結果として時間の経過とともに変更される可能性があります。
クエリ プラン図の例を次に示します。
DataFrame.explain()
AdaptiveSparkPlan
ノード
AQE 適用クエリには、通常、各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan
ノードが含まれます。 クエリが実行される前または実行される前に、対応する AdaptiveSparkPlan
ノードの isFinalPlan
フラグが false
として表示されます。クエリの実行が完了すると、isFinalPlan
フラグが true
に変わります。
現在のプランと初期プラン
各 AdaptiveSparkPlan
ノードの下には、実行が完了したかどうかに応じて、初期プラン (AQE 最適化を適用する前のプラン) と現在のプランまたは最終プランの両方があります。 現在のプランは、実行が進むにつれて進化します。
ランタイム統計
各シャッフル ステージとブロードキャスト ステージには、データ統計が含まれています。
ステージの実行前または実行中は、統計はコンパイル時の推定値であり、例えばフラグ isRuntime
が false
の状態になります(例: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
)。
ステージの実行が完了すると、統計は実行時に収集され、フラグ isRuntime
は true
になります (例: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
DataFrame.explain
の例を次に示します。
実行前
実行中
実行中の
実行後
SQL EXPLAIN
AdaptiveSparkPlan
ノード
AQE 適用クエリには、通常、各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。
現在のプランはありません
SQL EXPLAIN
はクエリを実行しないため、現在のプランは常に初期プランと同じであり、最終的に AQE によって実行 get 内容は反映されません。
SQL explain の例を次に示します。
有効性
1 つ以上の AQE 最適化が有効になると、クエリ プランが変更されます。 これらの AQE 最適化の効果は、現在のプランと最終的なプランの差異、および初期プランと特定プランノードの差異によって示されています。
現在/最終の計画と初期計画の間で異なる物理ノード join を踏まえ、並べ替えマージ join をブロードキャストハッシュ joinに動的に変更します
パーティションを動的に結合する: ノード
CustomShuffleReader
とプロパティCoalesced
スキューjoinを動的に処理する: ノード
SortMergeJoin
のフィールドisSkew
が true として示されます。空のリレーションを動的に検出して伝達します。プランの一部 (または全体) は、ノード LocalTableScan に置き換えられ、リレーション フィールドは空になります。
構成
このセクションでは、次の操作を行います。
- アダプティブ クエリ実行 を有効または無効にする
- 自動最適化シャッフル を有効にする
- 並べ替えマージjoinをブロードキャスト ハッシュjoinに動的に変更する
- パーティションを動的に結合する
- スキューjoinを動的に処理する
- 空のリレーションを動的に検出して伝達
アダプティブ クエリ実行を有効または無効にする
財産 |
---|
spark.databricks.optimizer.adaptive.enabled 型: Boolean アダプティブ クエリ実行を有効または無効にするかどうか。 既定値: true |
自動最適化シャッフル機能を有効にする
財産 |
---|
spark.sql.shuffle.partitions タイプ: Integer 結合または集計のデータをシャッフルするときに使用する既定のパーティション数。 auto 値を設定すると、自動最適化シャッフルが有効になり、クエリ プランとクエリ入力データ サイズに基づいてこの数が自動的に決定されます。注: 構造化ストリーミングの場合、同じチェックポイントの場所からクエリを再起動する間は、この構成を変更できません。 既定値: 200 |
ソート・マージ join をブロードキャスト・ハッシュ join に動的に変更する
財産 |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold 型: Byte String 実行時にブロードキャスト join への切り替えをトリガーするしきい値。 既定値: 30MB |
パーティションを動的に結合する
財産 |
---|
spark.sql.adaptive.coalescePartitions.enabled 型: Boolean partition 合体を有効または無効にするかどうかを指定します。 既定値: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes タイプ: Byte String 結合後のターゲット サイズ。 結合された partition サイズは、このターゲット サイズに近くなりますが、このサイズより大きくはありません。 既定値: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize 型: Byte String 結合後のパーティションの最小サイズ。 結合したpartitionのサイズは、このサイズ以上になります。 既定値: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum 型: Integer 結合後のパーティションの最小数。 この設定は次のものを明示的にオーバーライドするため、推奨されません: spark.sql.adaptive.coalescePartitions.minPartitionSize 。既定値: クラスターコア数の2倍 |
スキュー join を動的に処理する
財産 |
---|
spark.sql.adaptive.skewJoin.enabled 型: Boolean スキュー join の処理を有効にするか無効にするか。 既定値: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor 型: Integer partition の中央値サイズを乗じることで、partition が歪んでいるかどうかを判断するための一因となる要素。 既定値: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 型: Byte String partition が歪んでいるかどうかを判断するためのしきい値。 既定値: 256MB |
(partition size > skewedPartitionFactor * median partition size)
と (partition size > skewedPartitionThresholdInBytes)
の両方が true
されている場合、partition は歪んでいると見なされます。
空のリレーションを動的に検出して伝達する
財産 |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled 型: Boolean 動的な空のリレーション伝達を有効または無効にするかどうか。 既定値: true |
よく寄せられる質問 (FAQ)
このセクションでは、次の操作を行います。
- AQE が小さな jointableをブロードキャストしなかったのはなぜですか?
- AQE が有効になっているブロードキャスト join 戦略ヒントを引き続き使用する必要がありますか?
- スキュー join のヒントと AQE スキュー join の最適化との違いは何でしょうか? どれを使用する必要がありますか?
- AQE が join の順序を自動的に調整しなかったのはなぜですか?
- AQE がデータ スキューを検出しなかったのはなぜですか?
AQE が小さな jointableをブロードキャストしなかったのはなぜですか?
ブロードキャストが予想される関係のサイズがこのしきい値に該当するが、まだブロードキャストされていない場合:
- join の種類を確認します。 ブロードキャストは、特定の join の種類ではサポートされていません。たとえば、
LEFT OUTER JOIN
の左関係はブロードキャストできません。 - リレーションに多数の空のパーティションが含まれている場合、タスクの大部分は並べ替えマージ join を使用して迅速に完了するか、スキュー join 処理で最適化できる可能性があります。 AQE では、空でないパーティションの割合が
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
より低い場合に、このような並べ替えマージ結合をブロードキャスト ハッシュ結合に変更することを回避します。
AQE を有効にしたブロードキャスト join 戦略ヒントを引き続き使用する必要がありますか?
はい。 静的に計画されたブロードキャスト join は、通常、AQE によって動的に計画されたブロードキャストよりもパフォーマンスが高くなります。これは、AQE が join の両側に対してシャッフルを実行するまで (実際の関係サイズが取得されるまで) ブロードキャスト join に切り替えない可能性があるためです。 そのため、クエリがわかっている場合は、ブロードキャスト ヒントを使用することをお勧めします。 AQE では、静的最適化と同じ方法でクエリ ヒントが考慮されますが、ヒントの影響を受けず動的最適化を適用することもできます。
スキュー join のヒントと AQE スキュー join の最適化の違いは何ですか? どれを使用する必要がありますか?
AQE スキュー join は完全に自動であり、一般的には対応するヒントよりもパフォーマンスが優れているため、スキュー join ヒントを使用するのではなく、AQE スキュー join 処理に依存することをお勧めします。
AQE によってjoin順序が自動的に調整されなかったのはなぜですか?
動的 join の並べ替えは AQE の一部ではありません。
AQE がデータ スキューを検出しなかったのはなぜですか?
AQE が歪んだ partitionとして partition を検出するには、次の 2 つのサイズ条件を満たす必要があります。
- partition サイズが
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
より大きい (既定では 256 MB) - partition サイズは、すべてのパーティションの中央値サイズに傾斜因子 partition
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(既定では 5) をかけたものより大きくなっています。
さらに、スキュー処理のサポートは、LEFT OUTER JOIN
など、特定の join の種類に制限されており、左側のスキューのみを最適化できます。
遺産
"アダプティブ実行" という用語は Spark 1.6 以降に存在しましたが、Spark 3.0 の新しい AQE は根本的に異なります。 機能面では、Spark 1.6 は "パーティションを動的に結合する" 部分のみを実行します。 技術的なアーキテクチャに関しては、新しい AQE はランタイム統計に基づくクエリの動的な計画と再計画のフレームワークであり、この記事で説明した最適化など、さまざまな最適化をサポートし、より潜在的な最適化を可能にするために拡張できます。