martin-g commented on code in PR #1636:
URL:
https://github.com/apache/datafusion-ballista/pull/1636#discussion_r3172390587
##########
ballista/core/src/execution_plans/sort_shuffle/writer.rs:
##########
@@ -262,7 +267,14 @@ impl SortShuffleWriterExec {
let after = buffered.indices_allocated_size();
growth += after.saturating_sub(before);
- if reservation.try_grow(growth).is_err() {
+ // Mirror the growth in the runtime pool reservation so the
pool
+ // sees this writer's memory usage. Best-effort: if the pool is
+ // bounded and rejects the grow, that's fine — the absolute
+ // counter below still triggers a spill.
+ let _ = reservation.try_grow(growth);
Review Comment:
What bothers me here is that a user may expect that setting up a memory pool
limit will keep the process RSS memory close to that limit, but here it is
ignored and depending on the value of the new config the process may consume
much more memory than the pool limit and this will raise questions.
##########
ballista/core/src/config.rs:
##########
@@ -140,6 +144,13 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String,
ConfigEntry>> = LazyLock::new(||
"Target batch size in rows for coalescing small
batches in sort shuffle".to_string(),
DataType::UInt64,
Some((8192).to_string())),
+
ConfigEntry::new(BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT_PER_TASK_BYTES.to_string(),
+ "Per-task buffered-bytes budget at which the sort
shuffle writer spills its \
+ in-memory batches to disk. Counted independently of
the runtime memory pool, so \
+ spilling kicks in even when the pool is unbounded.
Total worst-case sort shuffle \
+ memory per executor is approximately concurrent_tasks
* this value.".to_string(),
+ DataType::UInt64,
+ Some((256 * 1024 * 1024).to_string())),
Review Comment:
`256 * 1024 * 1024` is used both here and at sort_shuffle/config.rs. They
may drift.
Maybe declare a constant and reuse it ?
##########
ballista/core/src/execution_plans/sort_shuffle/config.rs:
##########
@@ -37,19 +41,27 @@ impl Default for SortShuffleConfig {
enabled: false,
compression: CompressionType::LZ4_FRAME,
batch_size: 8192,
+ memory_limit_per_task_bytes: 256 * 1024 * 1024,
}
}
}
impl SortShuffleConfig {
- /// Creates a new configuration.
+ /// Creates a new configuration with the default per-task memory limit.
pub fn new(enabled: bool, compression: CompressionType, batch_size: usize)
-> Self {
Self {
enabled,
compression,
batch_size,
+ memory_limit_per_task_bytes:
Self::default().memory_limit_per_task_bytes,
Review Comment:
This is API preserving but error-prone.
https://github.com/apache/datafusion-ballista/blob/0e352d10175e8a5ac9e51d0adf30e82563460a56/ballista/core/src/serde/mod.rs#L384-L388
silently uses the default value instead of any custom value set with
`.with_memory_limit_per_task_bytes()` on the sender side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]