mbutrovich opened a new pull request, #4201: URL: https://github.com/apache/datafusion-comet/pull/4201
## Which issue does this PR close? Closes #. ## Rationale for this change `CometScanRule` and `CometExecRule` together do nine implicit phases coordinated through plan-tree wrappers (`CometScanWrapper`, `CometSinkPlaceHolder`), tag strings scattered across companion objects, retry-on-self recursion for broadcast convertibility, and a `revertRedundantColumnarShuffle` post-pass that exists only because the main rule decides convertibility before knowing whether parents and children will follow. The split also means scan validation lives in one rule, conversion in another, and a `CometScanExec` wrapper carries state between them. Hard to reason about, harder to extend. This PR replaces both rules with a single three-phase rule (`CometPlanner`) and moves coordination state from wrapper plan nodes onto `TreeNodeTag`s. ### Why three phases solves the redundant-transition problem The old rule walks bottom-up and commits per node based on local information. When it sees a `ShuffleExchangeExec`, it asks "is the shuffle serde happy?". If yes, it converts to `CometShuffleExchangeExec` immediately. Later, when it visits the parent `HashAggregate`, it might decide the aggregate cannot convert. The plan now reads `Spark HashAgg → CometShuffle → Spark HashAgg`: a columnar shuffle producing batches that get converted back to rows at both ends. PR #4010 added `revertRedundantColumnarShuffle` as a post-pass to detect that pattern and revert. That approach is whack-a-mole: every speculative conversion that does not pan out needs its own revert pass, and the broadcast path already had similar retry-on-self recursion to guess whether a parent BHJ would convert before deciding to convert the broadcast. The three phases remove the speculation: - **Phase 1 (LIKELY_COMET)** asks the per-node question once, in isolation: "would the serde accept this op if we ignored what its parents and children do?". Pure tagging, no commitment. - **Phase 2 (DECISION)** walks with both parent and child predictions in hand. The shuffle rule reads "convert iff parent LIKELY_COMET or any child LIKELY_COMET". A shuffle between two JVM aggregates is `selfLikely=true, parentLikely=false, childLikely=false` → `Passthrough`. The conversion never happens, so there is nothing to revert. - **Phase 3 (Emit)** mutates the plan only where Phase 2 said `Convert`. Same machinery catches the broadcast-without-Comet-consumer case (now a `BroadcastConsumerIndex` built once during Phase 1) and the spark-to-columnar-without-consumer case (Phase 2's S2C rule gates on parent LIKELY_COMET). Three patterns that previously required either a post-pass or a recursive pre-check now live in one place. ### Why this lets us delete the placeholder / wrapper classes The placeholder classes existed because the old rule needed in-tree carriers for state during iteration: - **`CometScanExec`**: inter-rule carrier. `CometScanRule` decides which kind of native scan and stamps `scanImpl`; `CometExecRule` reads that and produces `CometNativeScanExec`. After `SCAN_NATIVE_ICEBERG_COMPAT` removal, `CometScanExec` has zero execution-time consumers. - **`CometBatchScanExec`**: same idea for V2 scans. CometPlanner stashes Iceberg metadata on the `ICEBERG_METADATA` tag and Phase 3 builds the final `CometIcebergNativeScanExec` directly from the raw `BatchScanExec`. - **`CometSinkPlaceHolder`** and **`CometScanWrapper`**: serde `createExec` returns these when the operator is JVM-orchestrated (`CometShuffleExchangeExec`, `CometBroadcastExchangeExec`, `CometCollectLimitExec`, S2C wrappers). The placeholder carries the protobuf so a parent can wire it as a child, then a separate post-pass strips the placeholder. CometPlanner's `runSerde` unwraps inline at emit time and writes the protobuf to the `NATIVE_OP` tag. No placeholder, no strip pass. ## What changes are included in this PR? New `org.apache.comet.planner` package: - `CometPlanner` (the rule). Pipeline: `prePass` (NaN/zero normalization, RewriteJoin) → Phase 1 → Phase 2 → Phase 3 → `revertOrphanedBroadcasts` → `cleanupLogicalLinks` → `convertBlocks` → `postPass` (subquery broadcast rewrite). - `phases/`: the five passes above. - `gates/`: `V1ScanGate`, `V2ScanClassifier`, `S2CGate`. Per-node classifiers returning ADTs. - `tags/CometTags.scala`: formal vocabulary for the cross-phase tags (`LIKELY_COMET`, `DECISION`, `COMET_CONVERTED`, `NATIVE_OP`, `ICEBERG_METADATA`). - `PlanningContext.scala`: state threaded through phases plus `BroadcastConsumerIndex`. Integration: - Both rule paths are registered; `spark.comet.planner.enabled` (default `true`) selects which one runs. Each rule asserts on the flag at entry to surface drift. - `CometNativeScanExec` no longer holds a `@transient CometScanExec` field. File partitions come from `originalPlan.inputRDD.asInstanceOf[FileScanRDD].filePartitions`. The `CometNativeScan` serde now operates on `FileSourceScanExec` directly. - `CometShuffleExchangeExec.isCometPlan` also recognizes the `NATIVE_OP` tag, so JVM-orchestrated Comet ops without `CometPlan` membership still register as native children. - `CometPlanAdaptiveDynamicPruningFilters` updated to convert `originalPlan.partitionFilters` instead of the removed `scan.partitionFilters` (same dual-filter pattern, different source). Two non-obvious mechanics worth flagging during review: - **`cleanupLogicalLinks`**: explicitly UNSETs `LOGICAL_PLAN_TAG` and `LOGICAL_PLAN_INHERITED_TAG` on Comet ops whose original Spark op had no logical link. Without this, `setLogicalLink` propagation leaves stale INHERITED links on exchanges, AQE's `replaceWithQueryStagesInLogicalPlan` picks the wrong physical match, and a previously-emitted `CometSortMergeJoin` survives a re-plan that should have replaced it. Mirrors the legacy "Set up logical links" pass. - **`convertBlocks` on the skip path**: when `COMET_EXEC_ENABLED=false` or the root is already `COMET_CONVERTED`, the planner still runs `convertBlocks`. AQE re-planning can graft a previously-emitted `CometNativeExec` interior node (with no `SerializedPlan`) into a freshly Spark-planned outer plan, and that node needs a serialized plan to execute as the new top of its block. Out of scope, follow-up PRs in dependency order: 1. Delete `CometScanRule` + `CometExecRule` after a stabilization period. 2. Remove `native_iceberg_compat` + `SCAN_AUTO` + `COMET_NATIVE_SCAN_IMPL`. 3. Delete `CometScanExec` and `CometBatchScanExec` (dead after step 2). 4. Remove `CometScanWrapper` / `CometSinkPlaceHolder` (no producer left after step 1). ## How are these changes tested? Existing tests. I've tested CometExecSuite, CometFuzzTestSuite, and CometIcebergNativeSuite locally. I want to see what CI does. Anything that explicitly enables `iceberg_compat` likely needs a config to use the old rules too. Rollback is `--conf spark.comet.planner.enabled=false`, which routes to the original `CometScanRule` + `CometExecRule` path unchanged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
