This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new 2f2bf3215d [branch-53] fix: Provide more generic API for the capacity 
limit parsing (#20372) (#20893)
2f2bf3215d is described below

commit 2f2bf3215db2c8d24e5b11ca0fae92a672a9f3c9
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 11:56:43 2026 -0400

    [branch-53] fix: Provide more generic API for the capacity limit parsing 
(#20372) (#20893)
    
    - Part of https://github.com/apache/datafusion/issues/19692
    - Closes https://github.com/apache/datafusion/issues/20371 on branch-53
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20372 from
    @erenavsarogullari to the branch-53 line
    
    Co-authored-by: Eren Avsarogullari <[email protected]>
    Co-authored-by: Martin Grigorov <[email protected]>
---
 benchmarks/src/util/options.rs                     |  41 +++++--
 datafusion/core/src/execution/context/mod.rs       | 132 ++++++++++++++++++++-
 datafusion/core/tests/sql/runtime_config.rs        |  23 +++-
 .../sqllogictest/test_files/set_variable.slt       |   3 +
 4 files changed, 180 insertions(+), 19 deletions(-)

diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs
index 6f7267eabb..add8ff17fb 100644
--- a/benchmarks/src/util/options.rs
+++ b/benchmarks/src/util/options.rs
@@ -50,12 +50,12 @@ pub struct CommonOpt {
 
     /// Memory limit (e.g. '100M', '1.5G'). If not specified, run all 
pre-defined memory limits for given query
     /// if there's any, otherwise run with no memory limit.
-    #[arg(long = "memory-limit", value_parser = parse_memory_limit)]
+    #[arg(long = "memory-limit", value_parser = parse_capacity_limit)]
     pub memory_limit: Option<usize>,
 
     /// The amount of memory to reserve for sort spill operations. 
DataFusion's default value will be used
     /// if not specified.
-    #[arg(long = "sort-spill-reservation-bytes", value_parser = 
parse_memory_limit)]
+    #[arg(long = "sort-spill-reservation-bytes", value_parser = 
parse_capacity_limit)]
     pub sort_spill_reservation_bytes: Option<usize>,
 
     /// Activate debug mode to see more details
@@ -116,20 +116,26 @@ impl CommonOpt {
     }
 }
 
-/// Parse memory limit from string to number of bytes
-/// e.g. '1.5G', '100M' -> 1572864
-fn parse_memory_limit(limit: &str) -> Result<usize, String> {
+/// Parse capacity limit from string to number of bytes by allowing units: K, 
M and G.
+/// Supports formats like '1.5G' -> 1610612736, '100M' -> 104857600
+fn parse_capacity_limit(limit: &str) -> Result<usize, String> {
+    if limit.trim().is_empty() {
+        return Err("Capacity limit cannot be empty".to_string());
+    }
     let (number, unit) = limit.split_at(limit.len() - 1);
     let number: f64 = number
         .parse()
-        .map_err(|_| format!("Failed to parse number from memory limit 
'{limit}'"))?;
+        .map_err(|_| format!("Failed to parse number from capacity limit 
'{limit}'"))?;
+    if number.is_sign_negative() || number.is_infinite() {
+        return Err("Limit value should be positive finite number".to_string());
+    }
 
     match unit {
         "K" => Ok((number * 1024.0) as usize),
         "M" => Ok((number * 1024.0 * 1024.0) as usize),
         "G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
         _ => Err(format!(
-            "Unsupported unit '{unit}' in memory limit '{limit}'"
+            "Unsupported unit '{unit}' in capacity limit '{limit}'. Unit must 
be one of: 'K', 'M', 'G'"
         )),
     }
 }
@@ -139,16 +145,25 @@ mod tests {
     use super::*;
 
     #[test]
-    fn test_parse_memory_limit_all() {
+    fn test_parse_capacity_limit_all() {
         // Test valid inputs
-        assert_eq!(parse_memory_limit("100K").unwrap(), 102400);
-        assert_eq!(parse_memory_limit("1.5M").unwrap(), 1572864);
-        assert_eq!(parse_memory_limit("2G").unwrap(), 2147483648);
+        assert_eq!(parse_capacity_limit("100K").unwrap(), 102400);
+        assert_eq!(parse_capacity_limit("1.5M").unwrap(), 1572864);
+        assert_eq!(parse_capacity_limit("2G").unwrap(), 2147483648);
 
         // Test invalid unit
-        assert!(parse_memory_limit("500X").is_err());
+        assert!(parse_capacity_limit("500X").is_err());
 
         // Test invalid number
-        assert!(parse_memory_limit("abcM").is_err());
+        assert!(parse_capacity_limit("abcM").is_err());
+
+        // Test negative number
+        assert!(parse_capacity_limit("-1M").is_err());
+
+        // Test infinite number
+        assert!(parse_capacity_limit("infM").is_err());
+
+        // Test negative infinite number
+        assert!(parse_capacity_limit("-infM").is_err());
     }
 }
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index b6c606ff46..cdc50167d1 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1167,20 +1167,20 @@ impl SessionContext {
         let mut builder = 
RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
         builder = match key {
             "memory_limit" => {
-                let memory_limit = Self::parse_memory_limit(value)?;
+                let memory_limit = Self::parse_capacity_limit(variable, 
value)?;
                 builder.with_memory_limit(memory_limit, 1.0)
             }
             "max_temp_directory_size" => {
-                let directory_size = Self::parse_memory_limit(value)?;
+                let directory_size = Self::parse_capacity_limit(variable, 
value)?;
                 builder.with_max_temp_directory_size(directory_size as u64)
             }
             "temp_directory" => builder.with_temp_file_path(value),
             "metadata_cache_limit" => {
-                let limit = Self::parse_memory_limit(value)?;
+                let limit = Self::parse_capacity_limit(variable, value)?;
                 builder.with_metadata_cache_limit(limit)
             }
             "list_files_cache_limit" => {
-                let limit = Self::parse_memory_limit(value)?;
+                let limit = Self::parse_capacity_limit(variable, value)?;
                 builder.with_object_list_cache_limit(limit)
             }
             "list_files_cache_ttl" => {
@@ -1252,11 +1252,23 @@ impl SessionContext {
     ///     (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
     /// );
     /// ```
+    #[deprecated(
+        since = "53.0.0",
+        note = "please use `parse_capacity_limit` function instead."
+    )]
     pub fn parse_memory_limit(limit: &str) -> Result<usize> {
+        if limit.trim().is_empty() {
+            return Err(plan_datafusion_err!("Empty limit value found!"));
+        }
         let (number, unit) = limit.split_at(limit.len() - 1);
         let number: f64 = number.parse().map_err(|_| {
             plan_datafusion_err!("Failed to parse number from memory limit 
'{limit}'")
         })?;
+        if number.is_sign_negative() || number.is_infinite() {
+            return Err(plan_datafusion_err!(
+                "Limit value should be positive finite number"
+            ));
+        }
 
         match unit {
             "K" => Ok((number * 1024.0) as usize),
@@ -1266,6 +1278,51 @@ impl SessionContext {
         }
     }
 
+    /// Parse capacity limit from string to number of bytes by allowing units: 
K, M and G.
+    /// Supports formats like '1.5G', '100M', '512K'
+    ///
+    /// # Examples
+    /// ```
+    /// use datafusion::execution::context::SessionContext;
+    ///
+    /// assert_eq!(
+    ///     
SessionContext::parse_capacity_limit("datafusion.runtime.memory_limit", 
"1M").unwrap(),
+    ///     1024 * 1024
+    /// );
+    /// assert_eq!(
+    ///     
SessionContext::parse_capacity_limit("datafusion.runtime.memory_limit", 
"1.5G").unwrap(),
+    ///     (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
+    /// );
+    /// ```
+    pub fn parse_capacity_limit(config_name: &str, limit: &str) -> 
Result<usize> {
+        if limit.trim().is_empty() {
+            return Err(plan_datafusion_err!(
+                "Empty limit value found for '{config_name}'"
+            ));
+        }
+        let (number, unit) = limit.split_at(limit.len() - 1);
+        let number: f64 = number.parse().map_err(|_| {
+            plan_datafusion_err!(
+                "Failed to parse number from '{config_name}', limit '{limit}'"
+            )
+        })?;
+        if number.is_sign_negative() || number.is_infinite() {
+            return Err(plan_datafusion_err!(
+                "Limit value should be positive finite number for 
'{config_name}'"
+            ));
+        }
+
+        match unit {
+            "K" => Ok((number * 1024.0) as usize),
+            "M" => Ok((number * 1024.0 * 1024.0) as usize),
+            "G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
+            _ => plan_err!(
+                "Unsupported unit '{unit}' in '{config_name}', limit 
'{limit}'. \
+            Unit must be one of: 'K', 'M', 'G'"
+            ),
+        }
+    }
+
     fn parse_duration(duration: &str) -> Result<Duration> {
         let mut minutes = None;
         let mut seconds = None;
@@ -2759,4 +2816,71 @@ mod tests {
             assert!(have.is_err());
         }
     }
+
+    #[test]
+    fn test_parse_memory_limit() {
+        // Valid memory_limit
+        for (limit, want) in [
+            ("1.5K", (1.5 * 1024.0) as usize),
+            ("2M", (2f64 * 1024.0 * 1024.0) as usize),
+            ("1G", (1f64 * 1024.0 * 1024.0 * 1024.0) as usize),
+        ] {
+            #[expect(deprecated)]
+            let have = SessionContext::parse_memory_limit(limit).unwrap();
+            assert_eq!(want, have);
+        }
+
+        // Invalid memory_limit
+        for limit in [
+            "1B",
+            "1T",
+            "",
+            " ",
+            "XYZG",
+            "-1G",
+            "infG",
+            "-infG",
+            "G",
+            "1024B",
+            "invalid_size",
+        ] {
+            #[expect(deprecated)]
+            let have = SessionContext::parse_memory_limit(limit);
+            assert!(have.is_err());
+        }
+    }
+
+    #[test]
+    fn test_parse_capacity_limit() {
+        const MEMORY_LIMIT: &str = "datafusion.runtime.memory_limit";
+
+        // Valid capacity_limit
+        for (limit, want) in [
+            ("1.5K", (1.5 * 1024.0) as usize),
+            ("2M", (2f64 * 1024.0 * 1024.0) as usize),
+            ("1G", (1f64 * 1024.0 * 1024.0 * 1024.0) as usize),
+        ] {
+            let have = SessionContext::parse_capacity_limit(MEMORY_LIMIT, 
limit).unwrap();
+            assert_eq!(want, have);
+        }
+
+        // Invalid capacity_limit
+        for limit in [
+            "1B",
+            "1T",
+            "",
+            " ",
+            "XYZG",
+            "-1G",
+            "infG",
+            "-infG",
+            "G",
+            "1024B",
+            "invalid_size",
+        ] {
+            let have = SessionContext::parse_capacity_limit(MEMORY_LIMIT, 
limit);
+            assert!(have.is_err());
+            assert!(have.unwrap_err().to_string().contains(MEMORY_LIMIT));
+        }
+    }
 }
diff --git a/datafusion/core/tests/sql/runtime_config.rs 
b/datafusion/core/tests/sql/runtime_config.rs
index d85892c254..cf5237d725 100644
--- a/datafusion/core/tests/sql/runtime_config.rs
+++ b/datafusion/core/tests/sql/runtime_config.rs
@@ -145,7 +145,7 @@ async fn test_memory_limit_enforcement() {
 }
 
 #[tokio::test]
-async fn test_invalid_memory_limit() {
+async fn test_invalid_memory_limit_when_unit_is_invalid() {
     let ctx = SessionContext::new();
 
     let result = ctx
@@ -154,7 +154,26 @@ async fn test_invalid_memory_limit() {
 
     assert!(result.is_err());
     let error_message = result.unwrap_err().to_string();
-    assert!(error_message.contains("Unsupported unit 'X'"));
+    assert!(
+        error_message
+            .contains("Unsupported unit 'X' in 
'datafusion.runtime.memory_limit'")
+            && error_message.contains("Unit must be one of: 'K', 'M', 'G'")
+    );
+}
+
+#[tokio::test]
+async fn test_invalid_memory_limit_when_limit_is_not_numeric() {
+    let ctx = SessionContext::new();
+
+    let result = ctx
+        .sql("SET datafusion.runtime.memory_limit = 'invalid_memory_limit'")
+        .await;
+
+    assert!(result.is_err());
+    let error_message = result.unwrap_err().to_string();
+    assert!(error_message.contains(
+        "Failed to parse number from 'datafusion.runtime.memory_limit', limit 
'invalid_memory_limit'"
+    ));
 }
 
 #[tokio::test]
diff --git a/datafusion/sqllogictest/test_files/set_variable.slt 
b/datafusion/sqllogictest/test_files/set_variable.slt
index c444128b18..7be353f057 100644
--- a/datafusion/sqllogictest/test_files/set_variable.slt
+++ b/datafusion/sqllogictest/test_files/set_variable.slt
@@ -447,3 +447,6 @@ datafusion.runtime.max_temp_directory_size
 datafusion.runtime.memory_limit
 datafusion.runtime.metadata_cache_limit
 datafusion.runtime.temp_directory
+
+statement error DataFusion error: Error during planning: Unsupported value Null
+SET datafusion.runtime.memory_limit = NULL


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to