mbutrovich opened a new pull request, #4112:
URL: https://github.com/apache/datafusion-comet/pull/4112

   ## Which issue does this PR close?
   
   Partially addresses #3510. Closes #4045 (V1 Parquet AQE DPP). Related PRs: 
#4011 (non-AQE DPP), #4053 (scalar subquery pushdown + `CometReuseSubquery`), 
#4037 (non-AQE DPP edge case tests), #4033 (AQE DPP for Iceberg, draft).
   
   ## Rationale for this change
   
   Under AQE (the default), Spark creates `SubqueryAdaptiveBroadcastExec` (SAB) 
for DPP. Spark's `PlanAdaptiveDynamicPruningFilters` converts these by finding 
`BroadcastHashJoinExec` in the plan. After Comet replaces it with 
`CometBroadcastHashJoinExec`, Spark's rule can't find a match. With 
`onlyInBroadcast=true`, it replaces DPP with `Literal.TrueLiteral`, disabling 
partition pruning. With the previous `isAqeDynamicPruningFilter` rejection, the 
scan fell back to Spark entirely, losing native acceleration for all DPP 
queries under AQE.
   
   ## What changes are included in this PR?
   
   ### Two-phase SAB conversion
   
   Spark's `PlanAdaptiveDynamicPruningFilters` runs before custom 
`queryStageOptimizerRules` and converts SABs to `TrueLiteral`. We work around 
this in two phases:
   
   1. **CometExecRule** (queryStagePreparationRules, before Spark's rule): 
Wraps SABs in `CometSubqueryAdaptiveBroadcastExec` so Spark's pattern match 
doesn't recognize them.
   2. **CometPlanAdaptiveDynamicPruningFilters** (queryStageOptimizerRule, 
after Spark's rule): Converts to `CometSubqueryBroadcastExec` wired to the 
join's `BroadcastQueryStageExec` for broadcast reuse.
   
   ### Dual-filter resolution
   
   `CometNativeScanExec.partitionFilters` and `CometScanExec.partitionFilters` 
contain separate `InSubqueryExec` instances. `CometExecRule` only wraps the 
outer filters (the inner `CometScanExec` is `@transient`, not in the expression 
tree). `CometPlanAdaptiveDynamicPruningFilters` converts both, matching 
`CometSubqueryAdaptiveBroadcastExec` (wrapped, outer) and 
`SubqueryAdaptiveBroadcastExec` (unwrapped, inner).
   
   ### Spark 3.4 fallback
   
   `injectQueryStageOptimizerRule` is unavailable on 3.4. SAB wrapping is gated 
on `isSpark35Plus`. On 3.4, scans with AQE DPP fall back to Spark so that 
Spark's rule handles DPP natively.
   
   ### Broadcast fallback cases
   
   - **Spark BHJ** (Comet BHJ disabled): The rule finds `BroadcastHashJoinExec` 
and creates `SubqueryBroadcastExec` (Spark's type) via 
`createSubqueryBroadcastExec` shim.
   - **SMJ** (no broadcast): No matching broadcast join. Falls back to 
`Literal.TrueLiteral` (DPP disabled, correct results).
   - **ReusedExchangeExec**: When AQE reuses exchanges across the main plan and 
scalar subquery plans (shared `AdaptiveExecutionContext`), 
`BroadcastQueryStageExec.plan` may be `ReusedExchangeExec`. The rule unwraps it 
to verify the underlying exchange type.
   
   ## How are these changes tested?
   
   10 new AQE DPP tests in `CometExecSuite` covering the combination matrix:
   
   | # | Scenario | Verifies |
   |---|----------|----------|
   | 1 | BHJ golden path | Native scan, `CometSubqueryBroadcastExec`, 
`BroadcastQueryStageExec` child, reference equality with join's broadcast 
(reuse), no unconverted SABs, 3.4 fallback |
   | 2 | Spark BHJ (Comet BHJ disabled) | No `CometSubqueryBroadcastExec` 
(Spark handles DPP) |
   | 3 | SMJ (no broadcast) | Native scan, no `CometSubqueryBroadcastExec`, no 
unconverted SABs |
   | 4 | Two separate broadcast joins | buildKeys disambiguation, native scan, 
no unconverted SABs |
   | 5 | Empty broadcast result | Empty result, no errors |
   | 6 | Dual filter resolution | Both outer+inner filters resolved (correct 
results prove this) |
   | 7 | Broadcast exchange reuse | `BroadcastQueryStageExec` children, no 
unconverted SABs |
   | 8 | Non-atomic type (struct/array) | Correct results with complex join 
keys |
   | 9 | Non-atomic type + CometSubqueryBroadcast | 
`CometSubqueryBroadcastExec` present, no unconverted SABs |
   | 10 | Scalar subquery + DPP reuse | Cross-plan broadcast reuse via 
`ReusedExchangeExec` |
   
   All tests have version-specific assertions (3.5+ native path vs 3.4 
fallback). Existing DPP tests (`CometDppFallbackRepro3949Suite`, 
`CometShuffleFallbackStickinessSuite`) updated to disable native scan to 
preserve the `stageContainsDPPScan` stickiness code path.
   
   Existing non-AQE DPP tests renamed to consistent `"[non-AQE|AQE] DPP: 
<scenario>"` format.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to