andygrove opened a new pull request, #3871:
URL: https://github.com/apache/datafusion-comet/pull/3871

   ## Which issue does this PR close?
   
   N/A - New experimental feature
   
   ## Rationale for this change
   
   This PR introduces a Comet-owned sort merge join operator 
(`CometSortMergeJoinExec`) that replaces DataFusion's `SortMergeJoinExec`. The 
motivations are:
   
   1. **Memory efficiency** — Spills entire buffered batches (including join 
key arrays) to Arrow IPC, fixing DataFusion's gap where join key arrays stay in 
memory during spills
   2. **Performance** — Implements Spark's key-reuse optimization: when 
consecutive streamed rows share the same join key, the buffered match group is 
reused without re-scanning
   3. **Decoupling** — Owns the join operator so Comet is not coupled to 
DataFusion's evolving SMJ API and can evolve independently
   
   A configuration toggle (`spark.comet.exec.sortMergeJoin.useNative`, default 
`true`) allows switching between the new and DataFusion implementations for A/B 
benchmarking.
   
   **This is experimental.** The implementation passes all existing tests but 
has known limitations listed below.
   
   ## What changes are included in this PR?
   
   ### New files (`native/core/src/execution/joins/`)
   - **`sort_merge_join.rs`** — `CometSortMergeJoinExec` implementing 
DataFusion's `ExecutionPlan` trait
   - **`sort_merge_join_stream.rs`** — Streaming state machine (Init, 
PollStreamed, PollBuffered, Comparing, CollectingBuffered, Joining, 
OutputReady, DrainUnmatched, DrainBuffered, Exhausted)
   - **`buffered_batch.rs`** — `BufferedMatchGroup` with batch-level Arrow IPC 
spilling via `SpillManager`
   - **`output_builder.rs`** — Batch materialization using Arrow 
`take`/`concat` kernels
   - **`filter.rs`** — Join filter evaluation with corrected masks for 
outer/semi/anti joins
   - **`metrics.rs`** — Metrics matching `CometMetricNode.scala` (input/output 
rows/batches, join_time, peak_mem, spill counts)
   - **`tests.rs`** — 11 unit tests covering all join types and spilling
   
   ### Modified files
   - **`CometConf.scala`** — New `spark.comet.exec.sortMergeJoin.useNative` 
config
   - **`planner.rs`** — Conditional operator construction based on config
   - **`jni_api.rs`** — Pass spark config through to planner
   - **`spark_config.rs`** — Config constant
   
   ### Supported join types
   All 6: Inner, LeftOuter, RightOuter, FullOuter, LeftSemi, LeftAnti
   
   ### Key design decisions
   - **Streamed/buffered assignment** matches Spark (RightOuter swaps sides, 
all others: left=streamed, right=buffered)
   - **Null handling** matches Spark (`NullEqualsNothing` — null keys never 
match for inner/semi, emit with null counterpart for outer/anti)
   - **Batch-level spilling** via DataFusion's `SpillManager` and Arrow IPC 
when `MemoryReservation::try_grow()` fails
   - **Key-reuse** caches streamed key as `OwnedRow` via Arrow's `RowConverter`
   
   ### Known limitations / future work
   - **Full outer + filter cross-group tracking**: Buffered rows from earlier 
key groups that were unmatched across all streamed rows will not be 
null-joined. Requires tracking unmatched buffered rows across key group 
boundaries.
   - **No codegen**: Spark has whole-stage codegen for SMJ; this implementation 
uses interpreted evaluation.
   - **A/B comparison test**: Automated Scala test comparing native vs 
DataFusion output not yet added.
   
   ## How are these changes tested?
   
   - **11 Rust unit tests** covering all 6 join types (inner, left/right/full 
outer, left semi, left anti), null key handling, duplicate keys (many-to-many), 
empty results, and forced spilling under 1KB memory limit
   - **Existing `CometJoinSuite`** (10 tests) passes with the new 
implementation as a drop-in replacement, including SortMergeJoin with and 
without join filters across all join types


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to