mbutrovich opened a new issue, #4145:
URL: https://github.com/apache/datafusion-comet/issues/4145
### Describe the bug
`CometExecRule.transform` walks the plan with `transformUp` and for each
node calls `convertNode(op)` followed by
`convertSubqueryBroadcasts(converted)`. When `convertNode` wraps a V2
`BatchScanExec` in `CometSparkToColumnarExec`, the wrapped scan's
`runtimeFilters` (where AQE DPP places its `SubqueryAdaptiveBroadcastExec`) are
hidden from the subsequent `convertSubqueryBroadcasts` call.
`transformExpressionsUp` only walks the current node's expressions, not its
children's.
The SAB never gets wrapped in `CometSubqueryAdaptiveBroadcastExec`. Spark's
`PlanAdaptiveDynamicPruningFilters` at `queryStageOptimizerRule` time then
finds the unwrapped SAB, fails its `sameResult` match against the Comet
build-side exchange, and replaces DPP with `Literal.TrueLiteral`.
The code in
`spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala`:
```scala
plan.transformUp { case op =>
val converted = convertNode(op) // may wrap op in
CometSparkToColumnarExec
convertSubqueryBroadcasts(converted) // misses wrapped op's
runtimeFilters / partitionFilters
}
```
Not urgent today: V2 Parquet AQE DPP isn't supported in Spark (Comet #3510;
[SPARK-53439](https://issues.apache.org/jira/browse/SPARK-53439) tracks it
upstream, still Open with no fix version, and
[apache/spark#52180](https://github.com/apache/spark/pull/52180) closed without
merging). No current user path hits this. Filing now so it's tracked against
when V2 Parquet DPP eventually lands.
### Steps to reproduce
Seen while debugging `CometExecSuite.AQE DPP: V2 BatchScan broadcast query
stage creation order (SPARK-34637)` by temporarily enabling
`CometSparkToColumnarExec` for the InMemoryBatchScan:
```scala
withSQLConf(
CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true",
CometConf.COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.key ->
"BatchScan,Range,InMemoryTableScan,RDDScan,OneRowRelation",
// ...existing confs
) { /* run the test */ }
```
With those set, the plan shows the scan's `runtimeFilters` containing a
plain `SubqueryAdaptiveBroadcast` (unwrapped) instead of
`CometSubqueryAdaptiveBroadcast`, and the final plan has
`dynamicpruningexpression(true)`.
### Expected behavior
The SAB should be wrapped in `CometSubqueryAdaptiveBroadcastExec` regardless
of whether `convertNode` also wraps the scan in `CometSparkToColumnarExec`, so
that `CometPlanAdaptiveDynamicPruningFilters` can rewrite it into
`CometSubqueryBroadcastExec` at `queryStageOptimizerRule` time.
### Additional context
Surfaced in #4112 (AQE DPP for native Parquet scans with broadcast reuse)
while investigating a 4.1 failure in the `V2 BatchScan broadcast query stage
creation order (SPARK-34637)` test. That test was gated with
`assume(!isSpark41Plus)` in #4112 because Spark 4.1 elides the shuffle that
gives Comet its entry point for the test's query. This bug turned up when
trying to restore the entry point by wrapping the BatchScan in
`CometSparkToColumnarExec`.
Likely fix: reorder in `CometExecRule.transform` so
SAB/`SubqueryBroadcastExec` wrapping runs before `convertNode`:
```scala
plan.transformUp { case op =>
val withSubs = convertSubqueryBroadcasts(op)
convertNode(withSubs)
}
```
Or a separate pass using `transformAllExpressions`, which recurses into all
nodes' expressions:
```scala
val converted = plan.transformUp { case op => convertNode(op) }
converted.transformAllExpressions {
// SAB / SubqueryBroadcastExec wrapping logic
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]