martin-g commented on code in PR #1595:
URL: 
https://github.com/apache/datafusion-ballista/pull/1595#discussion_r3149728372


##########
docs/source/user-guide/tuning-guide.md:
##########
@@ -66,6 +66,57 @@ decreasing the number of concurrent tasks may help.
 In the future, Ballista will have better support for tracking memory usage and 
allocating tasks based on available
 memory, as well as supporting spill-to-disk to reduce memory pressure.
 
+## Shuffle Implementation
+
+Ballista exchanges data between query stages by writing the output of each
+upstream task to local files, which downstream tasks read either from disk
+(when co-located) or over Arrow Flight. Two shuffle implementations are
+available, with different trade-offs around file count, memory use, and
+write latency.
+
+### Hash-based shuffle (default)
+
+The default writer hashes each incoming `RecordBatch` and immediately
+encodes the per-partition slices to Arrow IPC, streaming them into one
+file per `(input_partition, output_partition)` pair. Nothing is buffered
+in memory across batches.
+
+This is simple and low latency, but for `N` input partitions and `M`
+output partitions it produces `N × M` files. Wide shuffles can therefore
+generate a very large number of small files.

Review Comment:
   ```suggestion
   generate a large number of small files.
   ```



##########
docs/source/user-guide/configs.md:
##########
@@ -88,6 +88,23 @@ let expected = [
 ];
 ```
 
+## Shuffle Settings
+
+The following session-level keys control Ballista's shuffle behavior. See
+the [tuning guide](tuning-guide.md#shuffle-implementation) for an
+explanation of the hash-based (default) and sort-based shuffle writers.
+
+| key                                                | type    | default     | 
description                                                                     
                                                             |
+| -------------------------------------------------- | ------- | ----------- | 
--------------------------------------------------------------------------------------------------------------------------------------------
 |
+| ballista.shuffle.max_concurrent_read_requests      | UInt64  | 64          | 
Maximum number of concurrent fetch requests the shuffle reader will issue.      
                                                             |
+| ballista.shuffle.force_remote_read                 | Boolean | false       | 
Forces the shuffle reader to fetch every partition through Arrow Flight, even 
when the data is local. Intended for testing.                  |
+| ballista.shuffle.remote_read_prefer_flight         | Boolean | false       | 
For remote reads, prefer the Arrow Flight reader over the block reader. The 
block reader is generally faster.                                |
+| ballista.shuffle.sort_based.enabled                | Boolean | false       | 
Enables the sort-based shuffle writer (consolidated data file per input 
partition with an index, instead of one file per output partition).  |
+| ballista.shuffle.sort_based.buffer_size            | UInt64  | 1048576     | 
Per-partition buffer size in bytes for the sort-based writer (1 MiB default).   
                                                             |
+| ballista.shuffle.sort_based.memory_limit           | UInt64  | 268435456   | 
Total in-memory budget across all output-partition buffers for the sort-based 
writer (256 MiB default).                                      |
+| ballista.shuffle.sort_based.spill_threshold        | Utf8    | 0.8         | 
Fraction of `memory_limit` at which the largest buffers spill to disk. Must be 
in the range 0–1.                                             |

Review Comment:
   ```suggestion
   | ballista.shuffle.sort_based.spill_threshold        | Utf8    | "0.8"       
  | Fraction of `memory_limit` at which the largest buffers spill to disk. Must 
be in the range 0–1.                                             |
   ```



##########
docs/source/user-guide/configs.md:
##########
@@ -88,6 +88,23 @@ let expected = [
 ];
 ```
 
+## Shuffle Settings
+
+The following session-level keys control Ballista's shuffle behavior. See
+the [tuning guide](tuning-guide.md#shuffle-implementation) for an
+explanation of the hash-based (default) and sort-based shuffle writers.
+
+| key                                                | type    | default     | 
description                                                                     
                                                             |
+| -------------------------------------------------- | ------- | ----------- | 
--------------------------------------------------------------------------------------------------------------------------------------------
 |
+| ballista.shuffle.max_concurrent_read_requests      | UInt64  | 64          | 
Maximum number of concurrent fetch requests the shuffle reader will issue.      
                                                             |
+| ballista.shuffle.force_remote_read                 | Boolean | false       | 
Forces the shuffle reader to fetch every partition through Arrow Flight, even 
when the data is local. Intended for testing.                  |
+| ballista.shuffle.remote_read_prefer_flight         | Boolean | false       | 
For remote reads, prefer the Arrow Flight reader over the block reader. The 
block reader is generally faster.                                |
+| ballista.shuffle.sort_based.enabled                | Boolean | false       | 
Enables the sort-based shuffle writer (consolidated data file per input 
partition with an index, instead of one file per output partition).  |

Review Comment:
   ```suggestion
   | ballista.shuffle.sort_based.enabled                | Boolean | false       
| Enables the sort-based shuffle writer (consolidated data file per input 
partition with an index, instead of one file per (input partition, output 
partition) pair).  |
   ```



##########
docs/source/user-guide/tuning-guide.md:
##########
@@ -66,6 +66,57 @@ decreasing the number of concurrent tasks may help.
 In the future, Ballista will have better support for tracking memory usage and 
allocating tasks based on available
 memory, as well as supporting spill-to-disk to reduce memory pressure.
 
+## Shuffle Implementation
+
+Ballista exchanges data between query stages by writing the output of each
+upstream task to local files, which downstream tasks read either from disk
+(when co-located) or over Arrow Flight. Two shuffle implementations are

Review Comment:
   This mentions only Arrow Flight for remote reads, but 
https://github.com/apache/datafusion-ballista/pull/1595/changes#diff-67d4de1dbb021d35281a8770ecbfb2261f751b5a4daac2084e0b8ef8b7b12a60R101
 mentions also the block reader and even promotes it as the faster one.



##########
docs/source/user-guide/tuning-guide.md:
##########
@@ -66,6 +66,57 @@ decreasing the number of concurrent tasks may help.
 In the future, Ballista will have better support for tracking memory usage and 
allocating tasks based on available
 memory, as well as supporting spill-to-disk to reduce memory pressure.
 
+## Shuffle Implementation
+
+Ballista exchanges data between query stages by writing the output of each
+upstream task to local files, which downstream tasks read either from disk
+(when co-located) or over Arrow Flight. Two shuffle implementations are
+available, with different trade-offs around file count, memory use, and
+write latency.
+
+### Hash-based shuffle (default)
+
+The default writer hashes each incoming `RecordBatch` and immediately
+encodes the per-partition slices to Arrow IPC, streaming them into one
+file per `(input_partition, output_partition)` pair. Nothing is buffered
+in memory across batches.
+
+This is simple and low latency, but for `N` input partitions and `M`
+output partitions it produces `N × M` files. Wide shuffles can therefore
+generate a very large number of small files.
+
+### Sort-based shuffle (opt-in)
+
+The sort-based writer accumulates batches in a per-output-partition
+in-memory buffer. When the total buffered size crosses a threshold, the
+largest buffers are spilled to disk. After the input stream finishes, the
+remaining in-memory data and any spilled batches are merged and written
+into a single consolidated Arrow IPC file per input partition, alongside
+an index file that lets readers seek directly to a given output partition.
+
+This produces `2 × N` files instead of `N × M`, coalesces small batches
+to a target size before writing, and bounds shuffle memory use via
+spilling — at the cost of higher write latency than the hash writer.
+Consider enabling it for queries with high partition fan-out or when
+file-count pressure on local storage is a concern.
+
+The sort-based writer is disabled by default and is enabled per session:
+
+```rust
+let session_config = SessionConfig::new_with_ballista()
+    .set_bool("ballista.shuffle.sort_based.enabled", true);
+```
+
+The following session-level keys tune its behavior:
+
+| key                                       | type    | default     | 
description                                                                     
                           |
+| ----------------------------------------- | ------- | ----------- | 
----------------------------------------------------------------------------------------------------------
 |
+| ballista.shuffle.sort_based.enabled       | Boolean | false       | Enables 
the sort-based shuffle writer.                                                  
                   |
+| ballista.shuffle.sort_based.buffer_size   | UInt64  | 1048576     | 
Per-partition buffer size in bytes (1 MiB default).                             
                           |
+| ballista.shuffle.sort_based.memory_limit  | UInt64  | 268435456   | Total 
in-memory budget across all output-partition buffers (256 MiB default).         
                     |
+| ballista.shuffle.sort_based.spill_threshold | Utf8  | 0.8         | Fraction 
of `memory_limit` at which the largest buffers begin spilling to disk. Must be 
in the range 0–1. |

Review Comment:
   ```suggestion
   | ballista.shuffle.sort_based.spill_threshold | Utf8  | "0.8"         | 
Fraction of `memory_limit` at which the largest buffers begin spilling to disk. 
Must be in the range 0–1. |
   ```
   Would this be a more correct way to display the default ? To match the used 
data type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to