mbutrovich opened a new pull request, #4033: URL: https://github.com/apache/datafusion-comet/pull/4033
## Which issue does this PR close? Closes #4022. ## Rationale for this change Under AQE, Spark's `PlanAdaptiveDynamicPruningFilters` converts `SubqueryAdaptiveBroadcastExec` to `SubqueryBroadcastExec` for DPP broadcast reuse. However, `CometIcebergNativeScanExec` wraps `BatchScanExec` and hides its `runtimeFilters` from the plan's expression tree. Spark's rule can't see the DPP expressions, so the SAB stays unconverted and the dim table executes independently — a double broadcast execution. The existing workaround used reflection to set `InSubqueryExec`'s private `result` field, bypassing `executeCollect()` (which throws on SAB). This was fragile and didn't achieve broadcast reuse. ## What changes are included in this PR? ### New rule: `CometPlanAdaptiveDynamicPruningFilters` A columnar rule (registered as `postColumnarTransitions`) that converts `SubqueryAdaptiveBroadcastExec` to `CometSubqueryBroadcastExec` inside `CometIcebergNativeScanExec.originalPlan.runtimeFilters`. The subquery wraps the join's already-materialized `BroadcastQueryStageExec`, achieving true broadcast reuse — no re-execution of the dim table. Key design decisions: - **Registered as `postColumnarTransitions`**, not `queryStageOptimizerRule`. `CometExecRule` runs in `preColumnarTransitions` and recreates scan instances, which would discard earlier modifications. - **Matches by `buildKeys` exprIds** to disambiguate multiple broadcast joins in the same plan. - **Searches both `CometBroadcastHashJoinExec` and `BroadcastHashJoinExec`** to handle Spark fallback (e.g., disabled Comet BHJ config). Uses `CometSubqueryBroadcastExec` for Comet broadcasts (Arrow data) and `SubqueryBroadcastExec` for Spark broadcasts (HashedRelation). - **Falls back to `Literal.TrueLiteral`** when no matching broadcast join exists (e.g., SortMergeJoin). This disables DPP but produces correct results. ### Metrics: `LazyIcebergMetric` Replaces `capturedMetricValues` -> `serializedPartitionData` chain with `LazyIcebergMetric`, whose `value` getter lazily triggers planning. This decouples `metrics` MAP construction (accessed by `SparkPlanInfo` before AQE runs) from DPP resolution, which must happen after the rule converts the SAB. ### `equals` fix `CometIcebergNativeScanExec.equals` now includes `runtimeFilters`. Without this, `transformUp` can't detect changes when the rule replaces SAB expressions, because the old and new scans are "equal" by the old definition. ### Shim changes - `ShimSubqueryBroadcast`: adds `createSubqueryBroadcastExec` (version-safe constructor) and `resolveSubqueryAdaptiveBroadcast` (reflection fallback for 3.4, unreachable throw for 3.5+). - `ShimCometSparkSessionExtensions`: moved from `spark-3.x/` to `spark-3.4/` and `spark-3.5/` (originally needed for `injectQueryStageOptimizerRule` shim, kept as-is since the split is harmless). ### Reflection hack removal The inline reflection code in `CometIcebergNativeScanExec.serializedPartitionData` (`setInSubqueryResult`, `Cast` matching, manual column index lookup) is removed on 3.5+. On 3.4, the reflection fallback is preserved in the shim since the rule API (`injectColumnar`) works on 3.4 but `CometPlanAdaptiveDynamicPruningFilters` may not convert the SAB in all edge cases. ## How are these changes tested? 8 new tests in `CometIcebergNativeSuite`: - **AQE DPP - single filter with partition pruning**: verifies `CometSubqueryBroadcastExec` + `BroadcastQueryStageExec` child + correct results - **AQE DPP - multiple filters on two partition columns**: two DPP filters on the same join reuse the same broadcast - **AQE DPP - CometSubqueryBroadcastExec replaces SubqueryAdaptiveBroadcastExec**: verifies only 1 `CometBroadcastExchangeExec` in the plan (broadcast reuse) - **AQE DPP - multiple DPP filters reuse same broadcast**: both filters reuse the same `BroadcastQueryStageExec` - **AQE DPP - two separate broadcast joins disambiguated by buildKeys**: two separate broadcast joins, each SAB maps to the correct one - **AQE DPP - graceful fallback when broadcast join is not Comet**: disabled Comet BHJ, falls back to `SubqueryBroadcastExec` - **AQE DPP - empty broadcast result prunes all partitions**: empty dim table, query returns empty - **AQE DPP - no broadcast join (SMJ) disables DPP gracefully**: SortMergeJoin with no broadcast, SAB converted to `Literal.TrueLiteral` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
