mbutrovich opened a new pull request, #4033:
URL: https://github.com/apache/datafusion-comet/pull/4033

   ## Which issue does this PR close?
   
   Closes #4022.
   
   ## Rationale for this change
   
   Under AQE, Spark's `PlanAdaptiveDynamicPruningFilters` converts 
`SubqueryAdaptiveBroadcastExec` to `SubqueryBroadcastExec` for DPP broadcast 
reuse. However, `CometIcebergNativeScanExec` wraps `BatchScanExec` and hides 
its `runtimeFilters` from the plan's expression tree. Spark's rule can't see 
the DPP expressions, so the SAB stays unconverted and the dim table executes 
independently — a double broadcast execution.
   
   The existing workaround used reflection to set `InSubqueryExec`'s private 
`result` field, bypassing `executeCollect()` (which throws on SAB). This was 
fragile and didn't achieve broadcast reuse.
   
   ## What changes are included in this PR?
   
   ### New rule: `CometPlanAdaptiveDynamicPruningFilters`
   
   A columnar rule (registered as `postColumnarTransitions`) that converts 
`SubqueryAdaptiveBroadcastExec` to `CometSubqueryBroadcastExec` inside 
`CometIcebergNativeScanExec.originalPlan.runtimeFilters`. The subquery wraps 
the join's already-materialized `BroadcastQueryStageExec`, achieving true 
broadcast reuse — no re-execution of the dim table.
   
   Key design decisions:
   - **Registered as `postColumnarTransitions`**, not 
`queryStageOptimizerRule`. `CometExecRule` runs in `preColumnarTransitions` and 
recreates scan instances, which would discard earlier modifications.
   - **Matches by `buildKeys` exprIds** to disambiguate multiple broadcast 
joins in the same plan.
   - **Searches both `CometBroadcastHashJoinExec` and `BroadcastHashJoinExec`** 
to handle Spark fallback (e.g., disabled Comet BHJ config). Uses 
`CometSubqueryBroadcastExec` for Comet broadcasts (Arrow data) and 
`SubqueryBroadcastExec` for Spark broadcasts (HashedRelation).
   - **Falls back to `Literal.TrueLiteral`** when no matching broadcast join 
exists (e.g., SortMergeJoin). This disables DPP but produces correct results.
   
   ### Metrics: `LazyIcebergMetric`
   
   Replaces `capturedMetricValues` -> `serializedPartitionData` chain with 
`LazyIcebergMetric`, whose `value` getter lazily triggers planning. This 
decouples `metrics` MAP construction (accessed by `SparkPlanInfo` before AQE 
runs) from DPP resolution, which must happen after the rule converts the SAB.
   
   ### `equals` fix
   
   `CometIcebergNativeScanExec.equals` now includes `runtimeFilters`. Without 
this, `transformUp` can't detect changes when the rule replaces SAB 
expressions, because the old and new scans are "equal" by the old definition.
   
   ### Shim changes
   
   - `ShimSubqueryBroadcast`: adds `createSubqueryBroadcastExec` (version-safe 
constructor) and `resolveSubqueryAdaptiveBroadcast` (reflection fallback for 
3.4, unreachable throw for 3.5+).
   - `ShimCometSparkSessionExtensions`: moved from `spark-3.x/` to `spark-3.4/` 
and `spark-3.5/` (originally needed for `injectQueryStageOptimizerRule` shim, 
kept as-is since the split is harmless).
   
   ### Reflection hack removal
   
   The inline reflection code in 
`CometIcebergNativeScanExec.serializedPartitionData` (`setInSubqueryResult`, 
`Cast` matching, manual column index lookup) is removed on 3.5+. On 3.4, the 
reflection fallback is preserved in the shim since the rule API 
(`injectColumnar`) works on 3.4 but `CometPlanAdaptiveDynamicPruningFilters` 
may not convert the SAB in all edge cases.
   
   ## How are these changes tested?
   
   8 new tests in `CometIcebergNativeSuite`:
   
   - **AQE DPP - single filter with partition pruning**: verifies 
`CometSubqueryBroadcastExec` + `BroadcastQueryStageExec` child + correct results
   - **AQE DPP - multiple filters on two partition columns**: two DPP filters 
on the same join reuse the same broadcast
   - **AQE DPP - CometSubqueryBroadcastExec replaces 
SubqueryAdaptiveBroadcastExec**: verifies only 1 `CometBroadcastExchangeExec` 
in the plan (broadcast reuse)
   - **AQE DPP - multiple DPP filters reuse same broadcast**: both filters 
reuse the same `BroadcastQueryStageExec`
   - **AQE DPP - two separate broadcast joins disambiguated by buildKeys**: two 
separate broadcast joins, each SAB maps to the correct one
   - **AQE DPP - graceful fallback when broadcast join is not Comet**: disabled 
Comet BHJ, falls back to `SubqueryBroadcastExec`
   - **AQE DPP - empty broadcast result prunes all partitions**: empty dim 
table, query returns empty
   - **AQE DPP - no broadcast join (SMJ) disables DPP gracefully**: 
SortMergeJoin with no broadcast, SAB converted to `Literal.TrueLiteral`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to