mbutrovich opened a new pull request, #4201:
URL: https://github.com/apache/datafusion-comet/pull/4201

   ## Which issue does this PR close?
   
   Closes #.
   
   ## Rationale for this change
   
   `CometScanRule` and `CometExecRule` together do nine implicit phases 
coordinated through plan-tree wrappers (`CometScanWrapper`, 
`CometSinkPlaceHolder`), tag strings scattered across companion objects, 
retry-on-self recursion for broadcast convertibility, and a 
`revertRedundantColumnarShuffle` post-pass that exists only because the main 
rule decides convertibility before knowing whether parents and children will 
follow. The split also means scan validation lives in one rule, conversion in 
another, and a `CometScanExec` wrapper carries state between them. Hard to 
reason about, harder to extend.
   
   This PR replaces both rules with a single three-phase rule (`CometPlanner`) 
and moves coordination state from wrapper plan nodes onto `TreeNodeTag`s.
   
   ### Why three phases solves the redundant-transition problem
   
   The old rule walks bottom-up and commits per node based on local 
information. When it sees a `ShuffleExchangeExec`, it asks "is the shuffle 
serde happy?". If yes, it converts to `CometShuffleExchangeExec` immediately. 
Later, when it visits the parent `HashAggregate`, it might decide the aggregate 
cannot convert. The plan now reads `Spark HashAgg → CometShuffle → Spark 
HashAgg`: a columnar shuffle producing batches that get converted back to rows 
at both ends. PR #4010 added `revertRedundantColumnarShuffle` as a post-pass to 
detect that pattern and revert. That approach is whack-a-mole: every 
speculative conversion that does not pan out needs its own revert pass, and the 
broadcast path already had similar retry-on-self recursion to guess whether a 
parent BHJ would convert before deciding to convert the broadcast.
   
   The three phases remove the speculation:
   
   - **Phase 1 (LIKELY_COMET)** asks the per-node question once, in isolation: 
"would the serde accept this op if we ignored what its parents and children 
do?". Pure tagging, no commitment.
   - **Phase 2 (DECISION)** walks with both parent and child predictions in 
hand. The shuffle rule reads "convert iff parent LIKELY_COMET or any child 
LIKELY_COMET". A shuffle between two JVM aggregates is `selfLikely=true, 
parentLikely=false, childLikely=false` → `Passthrough`. The conversion never 
happens, so there is nothing to revert.
   - **Phase 3 (Emit)** mutates the plan only where Phase 2 said `Convert`.
   
   Same machinery catches the broadcast-without-Comet-consumer case (now a 
`BroadcastConsumerIndex` built once during Phase 1) and the 
spark-to-columnar-without-consumer case (Phase 2's S2C rule gates on parent 
LIKELY_COMET). Three patterns that previously required either a post-pass or a 
recursive pre-check now live in one place.
   
   ### Why this lets us delete the placeholder / wrapper classes
   
   The placeholder classes existed because the old rule needed in-tree carriers 
for state during iteration:
   
   - **`CometScanExec`**: inter-rule carrier. `CometScanRule` decides which 
kind of native scan and stamps `scanImpl`; `CometExecRule` reads that and 
produces `CometNativeScanExec`. After `SCAN_NATIVE_ICEBERG_COMPAT` removal, 
`CometScanExec` has zero execution-time consumers.
   - **`CometBatchScanExec`**: same idea for V2 scans. CometPlanner stashes 
Iceberg metadata on the `ICEBERG_METADATA` tag and Phase 3 builds the final 
`CometIcebergNativeScanExec` directly from the raw `BatchScanExec`.
   - **`CometSinkPlaceHolder`** and **`CometScanWrapper`**: serde `createExec` 
returns these when the operator is JVM-orchestrated 
(`CometShuffleExchangeExec`, `CometBroadcastExchangeExec`, 
`CometCollectLimitExec`, S2C wrappers). The placeholder carries the protobuf so 
a parent can wire it as a child, then a separate post-pass strips the 
placeholder. CometPlanner's `runSerde` unwraps inline at emit time and writes 
the protobuf to the `NATIVE_OP` tag. No placeholder, no strip pass.
   
   ## What changes are included in this PR?
   
   New `org.apache.comet.planner` package:
   
   - `CometPlanner` (the rule). Pipeline: `prePass` (NaN/zero normalization, 
RewriteJoin) → Phase 1 → Phase 2 → Phase 3 → `revertOrphanedBroadcasts` → 
`cleanupLogicalLinks` → `convertBlocks` → `postPass` (subquery broadcast 
rewrite).
   - `phases/`: the five passes above.
   - `gates/`: `V1ScanGate`, `V2ScanClassifier`, `S2CGate`. Per-node 
classifiers returning ADTs.
   - `tags/CometTags.scala`: formal vocabulary for the cross-phase tags 
(`LIKELY_COMET`, `DECISION`, `COMET_CONVERTED`, `NATIVE_OP`, 
`ICEBERG_METADATA`).
   - `PlanningContext.scala`: state threaded through phases plus 
`BroadcastConsumerIndex`.
   
   Integration:
   
   - Both rule paths are registered; `spark.comet.planner.enabled` (default 
`true`) selects which one runs. Each rule asserts on the flag at entry to 
surface drift.
   - `CometNativeScanExec` no longer holds a `@transient CometScanExec` field. 
File partitions come from 
`originalPlan.inputRDD.asInstanceOf[FileScanRDD].filePartitions`. The 
`CometNativeScan` serde now operates on `FileSourceScanExec` directly.
   - `CometShuffleExchangeExec.isCometPlan` also recognizes the `NATIVE_OP` 
tag, so JVM-orchestrated Comet ops without `CometPlan` membership still 
register as native children.
   - `CometPlanAdaptiveDynamicPruningFilters` updated to convert 
`originalPlan.partitionFilters` instead of the removed `scan.partitionFilters` 
(same dual-filter pattern, different source).
   
   Two non-obvious mechanics worth flagging during review:
   
   - **`cleanupLogicalLinks`**: explicitly UNSETs `LOGICAL_PLAN_TAG` and 
`LOGICAL_PLAN_INHERITED_TAG` on Comet ops whose original Spark op had no 
logical link. Without this, `setLogicalLink` propagation leaves stale INHERITED 
links on exchanges, AQE's `replaceWithQueryStagesInLogicalPlan` picks the wrong 
physical match, and a previously-emitted `CometSortMergeJoin` survives a 
re-plan that should have replaced it. Mirrors the legacy "Set up logical links" 
pass.
   - **`convertBlocks` on the skip path**: when `COMET_EXEC_ENABLED=false` or 
the root is already `COMET_CONVERTED`, the planner still runs `convertBlocks`. 
AQE re-planning can graft a previously-emitted `CometNativeExec` interior node 
(with no `SerializedPlan`) into a freshly Spark-planned outer plan, and that 
node needs a serialized plan to execute as the new top of its block.
   
   Out of scope, follow-up PRs in dependency order:
   
   1. Delete `CometScanRule` + `CometExecRule` after a stabilization period.
   2. Remove `native_iceberg_compat` + `SCAN_AUTO` + `COMET_NATIVE_SCAN_IMPL`.
   3. Delete `CometScanExec` and `CometBatchScanExec` (dead after step 2).
   4. Remove `CometScanWrapper` / `CometSinkPlaceHolder` (no producer left 
after step 1).
   
   ## How are these changes tested?
   
   Existing tests. I've tested CometExecSuite, CometFuzzTestSuite, and 
CometIcebergNativeSuite locally. I want to see what CI does. Anything that 
explicitly enables `iceberg_compat` likely needs a config to use the old rules 
too.
   
   Rollback is `--conf spark.comet.planner.enabled=false`, which routes to the 
original `CometScanRule` + `CometExecRule` path unchanged.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to