andygrove opened a new pull request, #3883:
URL: https://github.com/apache/datafusion-comet/pull/3883

   ## Which issue does this PR close?
   
   Closes #3882.
   
   ## Rationale for this change
   
   The current shuffle format writes each batch as an independent Arrow IPC 
stream, repeating the schema for every batch. With the default batch size of 
8192 rows, this causes ~50% larger shuffle files compared to Spark, and overall 
query performance is ~10% slower. Increasing batch size helps but shifts the 
problem to downstream memory usage.
   
   ## What changes are included in this PR?
   
   ### Write side (Rust)
   - **One `StreamWriter` per partition** instead of one per batch — schema is 
written once, N record batch messages are appended
   - **Arrow IPC built-in body compression** via `IpcWriteOptions` 
(LZ4_FRAME/ZSTD) replaces external compression wrappers (lz4_flex, zstd, snap)
   - All three partitioners updated: immediate mode, buffered mode, single 
partition
   - `ShuffleBlockWriter` replaced with slim `CompressionCodec` enum that wraps 
`IpcWriteOptions`
   - Sort-based shuffle path (`row.rs`) also uses persistent `StreamWriter`
   
   ### Read side (Rust + JVM)
   - **`JniInputStream`** — Rust struct implementing `std::io::Read` that pulls 
bytes from JVM `InputStream` via JNI callbacks with 64KB read-ahead buffer
   - **`ShuffleStreamReader`** — wraps Arrow `StreamReader<JniInputStream>`, 
handles concatenated IPC streams (from spills) and empty streams
   - **`NativeBatchDecoderIterator`** (JVM) — simplified to native handle 
pattern (`open`/`next`/`close`), removed all manual header parsing
   - **`ShuffleScanExec`** (Rust) — uses `ShuffleStreamReader` directly instead 
of `CometShuffleBlockIterator`
   
   ### Removed
   - Custom 20-byte header format (8-byte length + 8-byte field count + 4-byte 
codec tag)
   - `ShuffleBlockWriter` struct and per-batch IPC stream creation
   - `CometShuffleBlockIterator.java` and its JNI bridge
   - `read_ipc_compressed` legacy read function
   - Snappy shuffle codec support (Arrow IPC only supports LZ4_FRAME and ZSTD)
   - External compression dependencies (`snap`, `lz4_flex`, `zstd`) from 
shuffle crate
   
   ### Breaking change
   This changes the shuffle wire format. Old writers and new readers (or vice 
versa) are incompatible. This is acceptable because shuffle data is ephemeral 
and never persisted across jobs.
   
   ## How are these changes tested?
   
   - All 26 Rust shuffle unit tests pass (updated to use `StreamReader` for 
roundtrip verification)
   - All 41 `CometShuffleSuite` JVM tests pass
   - All 21 `CometNativeShuffleSuite` JVM tests pass
   - Clippy clean with `-D warnings`
   - Tests cover: multiple compression codecs, spilling, coalescing, 
concatenated IPC streams, empty partitions, dictionary-encoded columns


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to