This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new 2f2bf3215d [branch-53] fix: Provide more generic API for the capacity
limit parsing (#20372) (#20893)
2f2bf3215d is described below
commit 2f2bf3215db2c8d24e5b11ca0fae92a672a9f3c9
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 11:56:43 2026 -0400
[branch-53] fix: Provide more generic API for the capacity limit parsing
(#20372) (#20893)
- Part of https://github.com/apache/datafusion/issues/19692
- Closes https://github.com/apache/datafusion/issues/20371 on branch-53
This PR:
- Backports https://github.com/apache/datafusion/pull/20372 from
@erenavsarogullari to the branch-53 line
Co-authored-by: Eren Avsarogullari <[email protected]>
Co-authored-by: Martin Grigorov <[email protected]>
---
benchmarks/src/util/options.rs | 41 +++++--
datafusion/core/src/execution/context/mod.rs | 132 ++++++++++++++++++++-
datafusion/core/tests/sql/runtime_config.rs | 23 +++-
.../sqllogictest/test_files/set_variable.slt | 3 +
4 files changed, 180 insertions(+), 19 deletions(-)
diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs
index 6f7267eabb..add8ff17fb 100644
--- a/benchmarks/src/util/options.rs
+++ b/benchmarks/src/util/options.rs
@@ -50,12 +50,12 @@ pub struct CommonOpt {
/// Memory limit (e.g. '100M', '1.5G'). If not specified, run all
pre-defined memory limits for given query
/// if there's any, otherwise run with no memory limit.
- #[arg(long = "memory-limit", value_parser = parse_memory_limit)]
+ #[arg(long = "memory-limit", value_parser = parse_capacity_limit)]
pub memory_limit: Option<usize>,
/// The amount of memory to reserve for sort spill operations.
DataFusion's default value will be used
/// if not specified.
- #[arg(long = "sort-spill-reservation-bytes", value_parser =
parse_memory_limit)]
+ #[arg(long = "sort-spill-reservation-bytes", value_parser =
parse_capacity_limit)]
pub sort_spill_reservation_bytes: Option<usize>,
/// Activate debug mode to see more details
@@ -116,20 +116,26 @@ impl CommonOpt {
}
}
-/// Parse memory limit from string to number of bytes
-/// e.g. '1.5G', '100M' -> 1572864
-fn parse_memory_limit(limit: &str) -> Result<usize, String> {
+/// Parse capacity limit from string to number of bytes by allowing units: K,
M and G.
+/// Supports formats like '1.5G' -> 1610612736, '100M' -> 104857600
+fn parse_capacity_limit(limit: &str) -> Result<usize, String> {
+ if limit.trim().is_empty() {
+ return Err("Capacity limit cannot be empty".to_string());
+ }
let (number, unit) = limit.split_at(limit.len() - 1);
let number: f64 = number
.parse()
- .map_err(|_| format!("Failed to parse number from memory limit
'{limit}'"))?;
+ .map_err(|_| format!("Failed to parse number from capacity limit
'{limit}'"))?;
+ if number.is_sign_negative() || number.is_infinite() {
+ return Err("Limit value should be positive finite number".to_string());
+ }
match unit {
"K" => Ok((number * 1024.0) as usize),
"M" => Ok((number * 1024.0 * 1024.0) as usize),
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
_ => Err(format!(
- "Unsupported unit '{unit}' in memory limit '{limit}'"
+ "Unsupported unit '{unit}' in capacity limit '{limit}'. Unit must
be one of: 'K', 'M', 'G'"
)),
}
}
@@ -139,16 +145,25 @@ mod tests {
use super::*;
#[test]
- fn test_parse_memory_limit_all() {
+ fn test_parse_capacity_limit_all() {
// Test valid inputs
- assert_eq!(parse_memory_limit("100K").unwrap(), 102400);
- assert_eq!(parse_memory_limit("1.5M").unwrap(), 1572864);
- assert_eq!(parse_memory_limit("2G").unwrap(), 2147483648);
+ assert_eq!(parse_capacity_limit("100K").unwrap(), 102400);
+ assert_eq!(parse_capacity_limit("1.5M").unwrap(), 1572864);
+ assert_eq!(parse_capacity_limit("2G").unwrap(), 2147483648);
// Test invalid unit
- assert!(parse_memory_limit("500X").is_err());
+ assert!(parse_capacity_limit("500X").is_err());
// Test invalid number
- assert!(parse_memory_limit("abcM").is_err());
+ assert!(parse_capacity_limit("abcM").is_err());
+
+ // Test negative number
+ assert!(parse_capacity_limit("-1M").is_err());
+
+ // Test infinite number
+ assert!(parse_capacity_limit("infM").is_err());
+
+ // Test negative infinite number
+ assert!(parse_capacity_limit("-infM").is_err());
}
}
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index b6c606ff46..cdc50167d1 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1167,20 +1167,20 @@ impl SessionContext {
let mut builder =
RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
builder = match key {
"memory_limit" => {
- let memory_limit = Self::parse_memory_limit(value)?;
+ let memory_limit = Self::parse_capacity_limit(variable,
value)?;
builder.with_memory_limit(memory_limit, 1.0)
}
"max_temp_directory_size" => {
- let directory_size = Self::parse_memory_limit(value)?;
+ let directory_size = Self::parse_capacity_limit(variable,
value)?;
builder.with_max_temp_directory_size(directory_size as u64)
}
"temp_directory" => builder.with_temp_file_path(value),
"metadata_cache_limit" => {
- let limit = Self::parse_memory_limit(value)?;
+ let limit = Self::parse_capacity_limit(variable, value)?;
builder.with_metadata_cache_limit(limit)
}
"list_files_cache_limit" => {
- let limit = Self::parse_memory_limit(value)?;
+ let limit = Self::parse_capacity_limit(variable, value)?;
builder.with_object_list_cache_limit(limit)
}
"list_files_cache_ttl" => {
@@ -1252,11 +1252,23 @@ impl SessionContext {
/// (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
/// );
/// ```
+ #[deprecated(
+ since = "53.0.0",
+ note = "please use `parse_capacity_limit` function instead."
+ )]
pub fn parse_memory_limit(limit: &str) -> Result<usize> {
+ if limit.trim().is_empty() {
+ return Err(plan_datafusion_err!("Empty limit value found!"));
+ }
let (number, unit) = limit.split_at(limit.len() - 1);
let number: f64 = number.parse().map_err(|_| {
plan_datafusion_err!("Failed to parse number from memory limit
'{limit}'")
})?;
+ if number.is_sign_negative() || number.is_infinite() {
+ return Err(plan_datafusion_err!(
+ "Limit value should be positive finite number"
+ ));
+ }
match unit {
"K" => Ok((number * 1024.0) as usize),
@@ -1266,6 +1278,51 @@ impl SessionContext {
}
}
+ /// Parse capacity limit from string to number of bytes by allowing units:
K, M and G.
+ /// Supports formats like '1.5G', '100M', '512K'
+ ///
+ /// # Examples
+ /// ```
+ /// use datafusion::execution::context::SessionContext;
+ ///
+ /// assert_eq!(
+ ///
SessionContext::parse_capacity_limit("datafusion.runtime.memory_limit",
"1M").unwrap(),
+ /// 1024 * 1024
+ /// );
+ /// assert_eq!(
+ ///
SessionContext::parse_capacity_limit("datafusion.runtime.memory_limit",
"1.5G").unwrap(),
+ /// (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
+ /// );
+ /// ```
+ pub fn parse_capacity_limit(config_name: &str, limit: &str) ->
Result<usize> {
+ if limit.trim().is_empty() {
+ return Err(plan_datafusion_err!(
+ "Empty limit value found for '{config_name}'"
+ ));
+ }
+ let (number, unit) = limit.split_at(limit.len() - 1);
+ let number: f64 = number.parse().map_err(|_| {
+ plan_datafusion_err!(
+ "Failed to parse number from '{config_name}', limit '{limit}'"
+ )
+ })?;
+ if number.is_sign_negative() || number.is_infinite() {
+ return Err(plan_datafusion_err!(
+ "Limit value should be positive finite number for
'{config_name}'"
+ ));
+ }
+
+ match unit {
+ "K" => Ok((number * 1024.0) as usize),
+ "M" => Ok((number * 1024.0 * 1024.0) as usize),
+ "G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
+ _ => plan_err!(
+ "Unsupported unit '{unit}' in '{config_name}', limit
'{limit}'. \
+ Unit must be one of: 'K', 'M', 'G'"
+ ),
+ }
+ }
+
fn parse_duration(duration: &str) -> Result<Duration> {
let mut minutes = None;
let mut seconds = None;
@@ -2759,4 +2816,71 @@ mod tests {
assert!(have.is_err());
}
}
+
+ #[test]
+ fn test_parse_memory_limit() {
+ // Valid memory_limit
+ for (limit, want) in [
+ ("1.5K", (1.5 * 1024.0) as usize),
+ ("2M", (2f64 * 1024.0 * 1024.0) as usize),
+ ("1G", (1f64 * 1024.0 * 1024.0 * 1024.0) as usize),
+ ] {
+ #[expect(deprecated)]
+ let have = SessionContext::parse_memory_limit(limit).unwrap();
+ assert_eq!(want, have);
+ }
+
+ // Invalid memory_limit
+ for limit in [
+ "1B",
+ "1T",
+ "",
+ " ",
+ "XYZG",
+ "-1G",
+ "infG",
+ "-infG",
+ "G",
+ "1024B",
+ "invalid_size",
+ ] {
+ #[expect(deprecated)]
+ let have = SessionContext::parse_memory_limit(limit);
+ assert!(have.is_err());
+ }
+ }
+
+ #[test]
+ fn test_parse_capacity_limit() {
+ const MEMORY_LIMIT: &str = "datafusion.runtime.memory_limit";
+
+ // Valid capacity_limit
+ for (limit, want) in [
+ ("1.5K", (1.5 * 1024.0) as usize),
+ ("2M", (2f64 * 1024.0 * 1024.0) as usize),
+ ("1G", (1f64 * 1024.0 * 1024.0 * 1024.0) as usize),
+ ] {
+ let have = SessionContext::parse_capacity_limit(MEMORY_LIMIT,
limit).unwrap();
+ assert_eq!(want, have);
+ }
+
+ // Invalid capacity_limit
+ for limit in [
+ "1B",
+ "1T",
+ "",
+ " ",
+ "XYZG",
+ "-1G",
+ "infG",
+ "-infG",
+ "G",
+ "1024B",
+ "invalid_size",
+ ] {
+ let have = SessionContext::parse_capacity_limit(MEMORY_LIMIT,
limit);
+ assert!(have.is_err());
+ assert!(have.unwrap_err().to_string().contains(MEMORY_LIMIT));
+ }
+ }
}
diff --git a/datafusion/core/tests/sql/runtime_config.rs
b/datafusion/core/tests/sql/runtime_config.rs
index d85892c254..cf5237d725 100644
--- a/datafusion/core/tests/sql/runtime_config.rs
+++ b/datafusion/core/tests/sql/runtime_config.rs
@@ -145,7 +145,7 @@ async fn test_memory_limit_enforcement() {
}
#[tokio::test]
-async fn test_invalid_memory_limit() {
+async fn test_invalid_memory_limit_when_unit_is_invalid() {
let ctx = SessionContext::new();
let result = ctx
@@ -154,7 +154,26 @@ async fn test_invalid_memory_limit() {
assert!(result.is_err());
let error_message = result.unwrap_err().to_string();
- assert!(error_message.contains("Unsupported unit 'X'"));
+ assert!(
+ error_message
+ .contains("Unsupported unit 'X' in
'datafusion.runtime.memory_limit'")
+ && error_message.contains("Unit must be one of: 'K', 'M', 'G'")
+ );
+}
+
+#[tokio::test]
+async fn test_invalid_memory_limit_when_limit_is_not_numeric() {
+ let ctx = SessionContext::new();
+
+ let result = ctx
+ .sql("SET datafusion.runtime.memory_limit = 'invalid_memory_limit'")
+ .await;
+
+ assert!(result.is_err());
+ let error_message = result.unwrap_err().to_string();
+ assert!(error_message.contains(
+ "Failed to parse number from 'datafusion.runtime.memory_limit', limit
'invalid_memory_limit'"
+ ));
}
#[tokio::test]
diff --git a/datafusion/sqllogictest/test_files/set_variable.slt
b/datafusion/sqllogictest/test_files/set_variable.slt
index c444128b18..7be353f057 100644
--- a/datafusion/sqllogictest/test_files/set_variable.slt
+++ b/datafusion/sqllogictest/test_files/set_variable.slt
@@ -447,3 +447,6 @@ datafusion.runtime.max_temp_directory_size
datafusion.runtime.memory_limit
datafusion.runtime.metadata_cache_limit
datafusion.runtime.temp_directory
+
+statement error DataFusion error: Error during planning: Unsupported value Null
+SET datafusion.runtime.memory_limit = NULL
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]