mbutrovich opened a new pull request, #4053:
URL: https://github.com/apache/datafusion-comet/pull/4053

   ## Which issue does this PR close?
   
   Closes #4042.
   
   ## Rationale for this change
   
   SPARK-43402 (Spark 4.0) pushes `ScalarSubquery` into 
`FileSourceScanExec.dataFilters` for Parquet filter pushdown. When 
`CometScanRule` replaces the scan with `CometNativeScanExec`, the subquery in 
`dataFilters` becomes a second copy (the first is in the filter expression 
above). Spark's `ReuseSubquery` rules should deduplicate these, but two issues 
prevented it:
   
   1. `CometNativeScanExec.equals` did not include `dataFilters`, so Spark's 
`transformAllExpressionsWithPruning` silently dropped expression 
transformations on that field (it uses `fastEquals` to detect changes).
   2. In AQE, Spark's `ReuseAdaptiveSubquery` runs before Comet's node 
replacements (`CometScanRule`/`CometExecRule`), so the reuse it applies gets 
lost when Comet copies expressions from the original Spark nodes.
   
   Without the fix, the subquery executes twice and `ReusedSubqueryExec` is 
never produced.
   
   ## What changes are included in this PR?
   
   **`CometNativeScanExec.equals`/`hashCode`**: Added `dataFilters` so 
expression transformations (subquery reuse, etc.) are detected by Spark's tree 
traversal.
   
   **`CometReuseSubquery`**: New `queryStageOptimizerRule` that re-applies 
subquery deduplication after Comet node conversions in AQE. Same algorithm as 
Spark's `ReuseExchangeAndSubquery` (subquery portion): top-down traversal, 
caching by canonical form. For non-AQE, Spark's `ReuseExchangeAndSubquery` 
already runs after columnar rules and handles reuse correctly.
   
   **Scalar subquery execution-time pushdown**: 
`CometNativeScanExec.serializedPartitionData` resolves `ScalarSubquery` to 
`Literal` at execution time and appends the resolved filter to the 
`NativeScanCommon` protobuf. Same approach as 
`FileSourceScanLike.pushedDownFilters` in `DataSourceScanExec.scala`.
   
   **Shim split**: `ShimCometSparkSessionExtensions` moved from `spark-3.x/` to 
`spark-3.4/` (no-op for `injectQueryStageOptimizerRule`) and `spark-3.5/` 
(calls API). `injectQueryStageOptimizerRule` was added in Spark 3.5 
(SPARK-45785).
   
   **Rule ordering documentation**: Added pipeline diagram to 
`CometSparkSessionExtensions` class doc covering both AQE and non-AQE rule 
execution order.
   
   **4.0.1.diff**: Unignored `SubquerySuite` SPARK-43402 test, added 
`CometNativeScanExec` to scan matching.
   
   ## How are these changes tested?
   
   New test in `CometExecSuite`: "scalar subquery in data filters does not 
break subquery reuse" (Spark 4.0 only, both AQE on/off). Verifies correct 
results via `checkSparkAnswer`, `ScalarSubquery` presence in `dataFilters`, 
`ReusedSubqueryExec` in the plan, and `numFiles` metric. Mirrors Spark's 
`SubquerySuite` SPARK-43402 test assertions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to