mbutrovich opened a new issue, #4145:
URL: https://github.com/apache/datafusion-comet/issues/4145

   ### Describe the bug
   
   `CometExecRule.transform` walks the plan with `transformUp` and for each 
node calls `convertNode(op)` followed by 
`convertSubqueryBroadcasts(converted)`. When `convertNode` wraps a V2 
`BatchScanExec` in `CometSparkToColumnarExec`, the wrapped scan's 
`runtimeFilters` (where AQE DPP places its `SubqueryAdaptiveBroadcastExec`) are 
hidden from the subsequent `convertSubqueryBroadcasts` call. 
`transformExpressionsUp` only walks the current node's expressions, not its 
children's.
   
   The SAB never gets wrapped in `CometSubqueryAdaptiveBroadcastExec`. Spark's 
`PlanAdaptiveDynamicPruningFilters` at `queryStageOptimizerRule` time then 
finds the unwrapped SAB, fails its `sameResult` match against the Comet 
build-side exchange, and replaces DPP with `Literal.TrueLiteral`.
   
   The code in 
`spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala`:
   
   ```scala
   plan.transformUp { case op =>
     val converted = convertNode(op)            // may wrap op in 
CometSparkToColumnarExec
     convertSubqueryBroadcasts(converted)        // misses wrapped op's 
runtimeFilters / partitionFilters
   }
   ```
   
   Not urgent today: V2 Parquet AQE DPP isn't supported in Spark (Comet #3510; 
[SPARK-53439](https://issues.apache.org/jira/browse/SPARK-53439) tracks it 
upstream, still Open with no fix version, and 
[apache/spark#52180](https://github.com/apache/spark/pull/52180) closed without 
merging). No current user path hits this. Filing now so it's tracked against 
when V2 Parquet DPP eventually lands.
   
   ### Steps to reproduce
   
   Seen while debugging `CometExecSuite.AQE DPP: V2 BatchScan broadcast query 
stage creation order (SPARK-34637)` by temporarily enabling 
`CometSparkToColumnarExec` for the InMemoryBatchScan:
   
   ```scala
   withSQLConf(
     CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true",
     CometConf.COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.key ->
       "BatchScan,Range,InMemoryTableScan,RDDScan,OneRowRelation",
     // ...existing confs
   ) { /* run the test */ }
   ```
   
   With those set, the plan shows the scan's `runtimeFilters` containing a 
plain `SubqueryAdaptiveBroadcast` (unwrapped) instead of 
`CometSubqueryAdaptiveBroadcast`, and the final plan has 
`dynamicpruningexpression(true)`.
   
   ### Expected behavior
   
   The SAB should be wrapped in `CometSubqueryAdaptiveBroadcastExec` regardless 
of whether `convertNode` also wraps the scan in `CometSparkToColumnarExec`, so 
that `CometPlanAdaptiveDynamicPruningFilters` can rewrite it into 
`CometSubqueryBroadcastExec` at `queryStageOptimizerRule` time.
   
   ### Additional context
   
   Surfaced in #4112 (AQE DPP for native Parquet scans with broadcast reuse) 
while investigating a 4.1 failure in the `V2 BatchScan broadcast query stage 
creation order (SPARK-34637)` test. That test was gated with 
`assume(!isSpark41Plus)` in #4112 because Spark 4.1 elides the shuffle that 
gives Comet its entry point for the test's query. This bug turned up when 
trying to restore the entry point by wrapping the BatchScan in 
`CometSparkToColumnarExec`.
   
   Likely fix: reorder in `CometExecRule.transform` so 
SAB/`SubqueryBroadcastExec` wrapping runs before `convertNode`:
   
   ```scala
   plan.transformUp { case op =>
     val withSubs = convertSubqueryBroadcasts(op)
     convertNode(withSubs)
   }
   ```
   
   Or a separate pass using `transformAllExpressions`, which recurses into all 
nodes' expressions:
   
   ```scala
   val converted = plan.transformUp { case op => convertNode(op) }
   converted.transformAllExpressions {
     // SAB / SubqueryBroadcastExec wrapping logic
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to