mbutrovich opened a new pull request, #4112: URL: https://github.com/apache/datafusion-comet/pull/4112
## Which issue does this PR close? Partially addresses #3510. Closes #4045 (V1 Parquet AQE DPP). Related PRs: #4011 (non-AQE DPP), #4053 (scalar subquery pushdown + `CometReuseSubquery`), #4037 (non-AQE DPP edge case tests), #4033 (AQE DPP for Iceberg, draft). ## Rationale for this change Under AQE (the default), Spark creates `SubqueryAdaptiveBroadcastExec` (SAB) for DPP. Spark's `PlanAdaptiveDynamicPruningFilters` converts these by finding `BroadcastHashJoinExec` in the plan. After Comet replaces it with `CometBroadcastHashJoinExec`, Spark's rule can't find a match. With `onlyInBroadcast=true`, it replaces DPP with `Literal.TrueLiteral`, disabling partition pruning. With the previous `isAqeDynamicPruningFilter` rejection, the scan fell back to Spark entirely, losing native acceleration for all DPP queries under AQE. ## What changes are included in this PR? ### Two-phase SAB conversion Spark's `PlanAdaptiveDynamicPruningFilters` runs before custom `queryStageOptimizerRules` and converts SABs to `TrueLiteral`. We work around this in two phases: 1. **CometExecRule** (queryStagePreparationRules, before Spark's rule): Wraps SABs in `CometSubqueryAdaptiveBroadcastExec` so Spark's pattern match doesn't recognize them. 2. **CometPlanAdaptiveDynamicPruningFilters** (queryStageOptimizerRule, after Spark's rule): Converts to `CometSubqueryBroadcastExec` wired to the join's `BroadcastQueryStageExec` for broadcast reuse. ### Dual-filter resolution `CometNativeScanExec.partitionFilters` and `CometScanExec.partitionFilters` contain separate `InSubqueryExec` instances. `CometExecRule` only wraps the outer filters (the inner `CometScanExec` is `@transient`, not in the expression tree). `CometPlanAdaptiveDynamicPruningFilters` converts both, matching `CometSubqueryAdaptiveBroadcastExec` (wrapped, outer) and `SubqueryAdaptiveBroadcastExec` (unwrapped, inner). ### Spark 3.4 fallback `injectQueryStageOptimizerRule` is unavailable on 3.4. SAB wrapping is gated on `isSpark35Plus`. On 3.4, scans with AQE DPP fall back to Spark so that Spark's rule handles DPP natively. ### Broadcast fallback cases - **Spark BHJ** (Comet BHJ disabled): The rule finds `BroadcastHashJoinExec` and creates `SubqueryBroadcastExec` (Spark's type) via `createSubqueryBroadcastExec` shim. - **SMJ** (no broadcast): No matching broadcast join. Falls back to `Literal.TrueLiteral` (DPP disabled, correct results). - **ReusedExchangeExec**: When AQE reuses exchanges across the main plan and scalar subquery plans (shared `AdaptiveExecutionContext`), `BroadcastQueryStageExec.plan` may be `ReusedExchangeExec`. The rule unwraps it to verify the underlying exchange type. ## How are these changes tested? 10 new AQE DPP tests in `CometExecSuite` covering the combination matrix: | # | Scenario | Verifies | |---|----------|----------| | 1 | BHJ golden path | Native scan, `CometSubqueryBroadcastExec`, `BroadcastQueryStageExec` child, reference equality with join's broadcast (reuse), no unconverted SABs, 3.4 fallback | | 2 | Spark BHJ (Comet BHJ disabled) | No `CometSubqueryBroadcastExec` (Spark handles DPP) | | 3 | SMJ (no broadcast) | Native scan, no `CometSubqueryBroadcastExec`, no unconverted SABs | | 4 | Two separate broadcast joins | buildKeys disambiguation, native scan, no unconverted SABs | | 5 | Empty broadcast result | Empty result, no errors | | 6 | Dual filter resolution | Both outer+inner filters resolved (correct results prove this) | | 7 | Broadcast exchange reuse | `BroadcastQueryStageExec` children, no unconverted SABs | | 8 | Non-atomic type (struct/array) | Correct results with complex join keys | | 9 | Non-atomic type + CometSubqueryBroadcast | `CometSubqueryBroadcastExec` present, no unconverted SABs | | 10 | Scalar subquery + DPP reuse | Cross-plan broadcast reuse via `ReusedExchangeExec` | All tests have version-specific assertions (3.5+ native path vs 3.4 fallback). Existing DPP tests (`CometDppFallbackRepro3949Suite`, `CometShuffleFallbackStickinessSuite`) updated to disable native scan to preserve the `stageContainsDPPScan` stickiness code path. Existing non-AQE DPP tests renamed to consistent `"[non-AQE|AQE] DPP: <scenario>"` format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
