mbutrovich commented on code in PR #3845:
URL: https://github.com/apache/datafusion-comet/pull/3845#discussion_r3065775112


##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -232,14 +273,15 @@ independently compressed, allowing parallel decompression 
during reads.
 
 ## Configuration
 
-| Config                                            | Default | Description    
                          |
-| ------------------------------------------------- | ------- | 
---------------------------------------- |
-| `spark.comet.exec.shuffle.enabled`                | `true`  | Enable Comet 
shuffle                     |
-| `spark.comet.exec.shuffle.mode`                   | `auto`  | Shuffle mode: 
`native`, `jvm`, or `auto` |
-| `spark.comet.exec.shuffle.compression.codec`      | `zstd`  | Compression 
codec                        |
-| `spark.comet.exec.shuffle.compression.zstd.level` | `1`     | Zstd 
compression level                   |
-| `spark.comet.shuffle.write.buffer.size`           | `1MB`   | Write buffer 
size                        |
-| `spark.comet.columnar.shuffle.batch.size`         | `8192`  | Target rows 
per batch                    |
+| Config                                            | Default     | 
Description                                 |
+| ------------------------------------------------- | ----------- | 
------------------------------------------- |
+| `spark.comet.exec.shuffle.enabled`                | `true`      | Enable 
Comet shuffle                        |
+| `spark.comet.exec.shuffle.mode`                   | `auto`      | Shuffle 
mode: `native`, `jvm`, or `auto`    |
+| `spark.comet.exec.shuffle.partitionerMode`        | `immediate` | 
Partitioner mode: `immediate` or `buffered` |

Review Comment:
   This does not reflect the default in CometConf.



##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -81,10 +81,18 @@ Native shuffle (`CometExchange`) is selected when all of 
the following condition
 └─────────────────────────────────────────────────────────────────────────────┘
                     │                                     │
                     ▼                                     ▼
-┌───────────────────────────────────┐   ┌───────────────────────────────────┐
-│ MultiPartitionShuffleRepartitioner │   │ SinglePartitionShufflePartitioner │
-│ (hash/range partitioning)          │   │ (single partition case)           │
-└───────────────────────────────────┘   └───────────────────────────────────┘
+┌───────────────────────────────────────────────────────────────────────┐
+│                        Partitioner Selection                          │
+│  Controlled by spark.comet.exec.shuffle.partitionerMode               │
+├───────────────────────────┬───────────────────────────────────────────┤
+│  immediate                │  buffered (default)                        │
+│  ImmediateModePartitioner │  MultiPartitionShuffleRepartitioner       │
+│  (hash/range/round-robin) │  (hash/range/round-robin)                 │
+│  Writes IPC blocks as     │  Buffers all rows in memory               │

Review Comment:
   "Writes IPC blocks as batches arrive" is probably more accurate as 
"Partitions batches as they arrive, buffers as IPC blocks"



##########
docs/source/user-guide/latest/tuning.md:
##########
@@ -144,6 +144,20 @@ Comet provides a fully native shuffle implementation, 
which generally provides t
 supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning` but 
currently only supports primitive type
 partitioning keys. Columns that are not partitioning keys may contain complex 
types like maps, structs, and arrays.
 
+Native shuffle has two partitioner modes, configured via
+`spark.comet.exec.shuffle.partitionerMode`:
+
+- **`buffered`** (default): Buffers all input batches in memory, then uses 
`interleave` to produce
+  partitioned output one partition at a time. Only one partition's output 
batch is in memory at
+  a time during the write phase, so this mode scales well to large numbers of 
partitions (1000+).
+  The trade-off is that it must hold all input data in memory (or spill it) 
before writing begins.
+
+- **`immediate`**: Partitions incoming batches immediately using per-partition 
Arrow array builders,
+  flushing compressed IPC blocks when they reach the target batch size. This 
avoids buffering the
+  entire input in memory. However, because it maintains builders for all 
partitions simultaneously

Review Comment:
   Above you say it's "Proportional to `num_partitions × batch_size` "but here 
you include num_columns. Unclear if batch_size encompasses the num_columns, 
otherwise batch_size feels like it should be replaced by num_rows or something.



##########
native/shuffle/README.md:
##########
@@ -35,7 +35,7 @@ performance outside of Spark. It streams input data directly 
from Parquet files.
 cargo run --release --features shuffle-bench --bin shuffle_bench -- \
   --input /data/tpch-sf100/lineitem/ \
   --partitions 200 \
-  --codec lz4 \
+  --codec zstd --zstd-level 1 \

Review Comment:
   I think you mentioned this was an errant change. Just don't want you to miss 
it.



##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -129,23 +139,33 @@ Native shuffle (`CometExchange`) is selected when all of 
the following condition
 
 2. **Native execution**: `CometExec.getCometIterator()` executes the plan in 
Rust.
 
-3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the 
appropriate partitioner:
-   - `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin 
partitioning
-   - `SinglePartitionShufflePartitioner`: For single partition (simpler path)
+3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the 
appropriate partitioner
+   based on the `partitionerMode` configuration:
+   - **Immediate mode** (`ImmediateModePartitioner`): For 
hash/range/round-robin partitioning.
+     As each batch arrives, rows are scattered into per-partition Arrow array 
builders. When a
+     partition's builder reaches the target batch size, it is flushed as a 
compressed Arrow IPC
+     block to an in-memory buffer. Under memory pressure, these buffers are 
spilled to
+     per-partition temporary files. This keeps memory usage much lower than 
buffered mode since
+     data is encoded into compact IPC format eagerly rather than held as raw 
Arrow arrays.
 
-4. **Buffering and spilling**: The partitioner buffers rows per partition. 
When memory pressure
-   exceeds the threshold, partitions spill to temporary files.
+   - **Buffered mode** (`MultiPartitionShuffleRepartitioner`): For 
hash/range/round-robin
+     partitioning. Buffers all input `RecordBatch`es in memory, then 
partitions and writes
+     them in a single pass. When memory pressure exceeds the threshold, 
partitions spill to

Review Comment:
   "partitions batches and spills to temporary files." As currently written, it 
sounds like partitioning has already happened when it goes to spill.



##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -201,10 +221,31 @@ sizes.
 
 ## Memory Management
 
-Native shuffle uses DataFusion's memory management with spilling support:
+Native shuffle uses DataFusion's memory management. The memory characteristics 
differ
+between the two partitioner modes:
+
+### Immediate Mode
+
+Immediate mode keeps memory usage low by partitioning and encoding data 
eagerly as it arrives,
+rather than buffering all input rows before writing:

Review Comment:
   "all input batches", not sure "rows" is relevant here.



##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -201,10 +221,31 @@ sizes.
 
 ## Memory Management
 
-Native shuffle uses DataFusion's memory management with spilling support:
+Native shuffle uses DataFusion's memory management. The memory characteristics 
differ
+between the two partitioner modes:
+
+### Immediate Mode
+
+Immediate mode keeps memory usage low by partitioning and encoding data 
eagerly as it arrives,
+rather than buffering all input rows before writing:
+
+- **Per-partition builders**: Each partition has a set of Arrow array builders 
sized to the
+  target batch size. When a builder fills up, it is flushed as a compressed 
IPC block to an
+  in-memory buffer.
+- **Memory footprint**: Proportional to `num_partitions × batch_size` for the 
builders, plus
+  the accumulated IPC buffers. This is typically much smaller than buffered 
mode since IPC
+  encoding is more compact than raw Arrow arrays.

Review Comment:
   It also releases any held references to potentially-sliced/filtered 
RecordBatches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to