mbutrovich commented on code in PR #3845:
URL: https://github.com/apache/datafusion-comet/pull/3845#discussion_r3065775112
##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -232,14 +273,15 @@ independently compressed, allowing parallel decompression
during reads.
## Configuration
-| Config | Default | Description
|
-| ------------------------------------------------- | ------- |
---------------------------------------- |
-| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet
shuffle |
-| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode:
`native`, `jvm`, or `auto` |
-| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression
codec |
-| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd
compression level |
-| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer
size |
-| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows
per batch |
+| Config | Default |
Description |
+| ------------------------------------------------- | ----------- |
------------------------------------------- |
+| `spark.comet.exec.shuffle.enabled` | `true` | Enable
Comet shuffle |
+| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle
mode: `native`, `jvm`, or `auto` |
+| `spark.comet.exec.shuffle.partitionerMode` | `immediate` |
Partitioner mode: `immediate` or `buffered` |
Review Comment:
This does not reflect the default in CometConf.
##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -81,10 +81,18 @@ Native shuffle (`CometExchange`) is selected when all of
the following condition
└─────────────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
-┌───────────────────────────────────┐ ┌───────────────────────────────────┐
-│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │
-│ (hash/range partitioning) │ │ (single partition case) │
-└───────────────────────────────────┘ └───────────────────────────────────┘
+┌───────────────────────────────────────────────────────────────────────┐
+│ Partitioner Selection │
+│ Controlled by spark.comet.exec.shuffle.partitionerMode │
+├───────────────────────────┬───────────────────────────────────────────┤
+│ immediate │ buffered (default) │
+│ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │
+│ (hash/range/round-robin) │ (hash/range/round-robin) │
+│ Writes IPC blocks as │ Buffers all rows in memory │
Review Comment:
"Writes IPC blocks as batches arrive" is probably more accurate as
"Partitions batches as they arrive, buffers as IPC blocks"
##########
docs/source/user-guide/latest/tuning.md:
##########
@@ -144,6 +144,20 @@ Comet provides a fully native shuffle implementation,
which generally provides t
supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning` but
currently only supports primitive type
partitioning keys. Columns that are not partitioning keys may contain complex
types like maps, structs, and arrays.
+Native shuffle has two partitioner modes, configured via
+`spark.comet.exec.shuffle.partitionerMode`:
+
+- **`buffered`** (default): Buffers all input batches in memory, then uses
`interleave` to produce
+ partitioned output one partition at a time. Only one partition's output
batch is in memory at
+ a time during the write phase, so this mode scales well to large numbers of
partitions (1000+).
+ The trade-off is that it must hold all input data in memory (or spill it)
before writing begins.
+
+- **`immediate`**: Partitions incoming batches immediately using per-partition
Arrow array builders,
+ flushing compressed IPC blocks when they reach the target batch size. This
avoids buffering the
+ entire input in memory. However, because it maintains builders for all
partitions simultaneously
Review Comment:
Above you say it's "Proportional to `num_partitions × batch_size` "but here
you include num_columns. Unclear if batch_size encompasses the num_columns,
otherwise batch_size feels like it should be replaced by num_rows or something.
##########
native/shuffle/README.md:
##########
@@ -35,7 +35,7 @@ performance outside of Spark. It streams input data directly
from Parquet files.
cargo run --release --features shuffle-bench --bin shuffle_bench -- \
--input /data/tpch-sf100/lineitem/ \
--partitions 200 \
- --codec lz4 \
+ --codec zstd --zstd-level 1 \
Review Comment:
I think you mentioned this was an errant change. Just don't want you to miss
it.
##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -129,23 +139,33 @@ Native shuffle (`CometExchange`) is selected when all of
the following condition
2. **Native execution**: `CometExec.getCometIterator()` executes the plan in
Rust.
-3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the
appropriate partitioner:
- - `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin
partitioning
- - `SinglePartitionShufflePartitioner`: For single partition (simpler path)
+3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the
appropriate partitioner
+ based on the `partitionerMode` configuration:
+ - **Immediate mode** (`ImmediateModePartitioner`): For
hash/range/round-robin partitioning.
+ As each batch arrives, rows are scattered into per-partition Arrow array
builders. When a
+ partition's builder reaches the target batch size, it is flushed as a
compressed Arrow IPC
+ block to an in-memory buffer. Under memory pressure, these buffers are
spilled to
+ per-partition temporary files. This keeps memory usage much lower than
buffered mode since
+ data is encoded into compact IPC format eagerly rather than held as raw
Arrow arrays.
-4. **Buffering and spilling**: The partitioner buffers rows per partition.
When memory pressure
- exceeds the threshold, partitions spill to temporary files.
+ - **Buffered mode** (`MultiPartitionShuffleRepartitioner`): For
hash/range/round-robin
+ partitioning. Buffers all input `RecordBatch`es in memory, then
partitions and writes
+ them in a single pass. When memory pressure exceeds the threshold,
partitions spill to
Review Comment:
"partitions batches and spills to temporary files." As currently written, it
sounds like partitioning has already happened when it goes to spill.
##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -201,10 +221,31 @@ sizes.
## Memory Management
-Native shuffle uses DataFusion's memory management with spilling support:
+Native shuffle uses DataFusion's memory management. The memory characteristics
differ
+between the two partitioner modes:
+
+### Immediate Mode
+
+Immediate mode keeps memory usage low by partitioning and encoding data
eagerly as it arrives,
+rather than buffering all input rows before writing:
Review Comment:
"all input batches", not sure "rows" is relevant here.
##########
docs/source/contributor-guide/native_shuffle.md:
##########
@@ -201,10 +221,31 @@ sizes.
## Memory Management
-Native shuffle uses DataFusion's memory management with spilling support:
+Native shuffle uses DataFusion's memory management. The memory characteristics
differ
+between the two partitioner modes:
+
+### Immediate Mode
+
+Immediate mode keeps memory usage low by partitioning and encoding data
eagerly as it arrives,
+rather than buffering all input rows before writing:
+
+- **Per-partition builders**: Each partition has a set of Arrow array builders
sized to the
+ target batch size. When a builder fills up, it is flushed as a compressed
IPC block to an
+ in-memory buffer.
+- **Memory footprint**: Proportional to `num_partitions × batch_size` for the
builders, plus
+ the accumulated IPC buffers. This is typically much smaller than buffered
mode since IPC
+ encoding is more compact than raw Arrow arrays.
Review Comment:
It also releases any held references to potentially-sliced/filtered
RecordBatches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]