This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ea9465a374 [fix][enhancement](filecache) filecache query limit 
feature bugfix and enhancement (#55772)
0ea9465a374 is described below

commit 0ea9465a374e56e4ceaa12b751b5b1184a0b6eba
Author: Wen Zhenghu <[email protected]>
AuthorDate: Sat Jan 24 22:37:24 2026 +0800

    [fix][enhancement](filecache) filecache query limit feature bugfix and 
enhancement (#55772)
    
    related issue https://github.com/apache/doris/issues/52299
    
    ### What problem does this PR solve?
    
    The code framework for the FileCache Query-Level Quota feature already
    exists in the master branch, but have the following issues:
    
    1. **Missing feature entry point**, making it impossible to use
    normally. This PR adds the usage entry point and includes session-level
    switch parameters.
    
    2. **Lack of independent quota control parameters**. Previously, quota
    could only be configured in `file_cache_path`. This PR adds new
    configuration items for independent quota control.
    
    3. **Missing test cases**. This PR adds necessary unit test cases and
    regression test cases.
    
    
    ---
    For readers who prefer Chinese, please refer to the Chinese version of
    this document.
    [Apache Doris 文件缓存查询限制(query
    
limit)功能PR说明.pdf](https://github.com/user-attachments/files/24581992/Apache.Doris.query.limit.PR.pdf)
    
    
    File Cache Query Limit New Parameter System and Implementation
    Architecture
    
    1. Configuration Parameter System
    
    1.1 BE-Side Configuration Parameters
    • enable_file_cache_query_limit: Boolean type, controls whether to
    enable the file cache query limit feature.
    ◦ Function: Serves as the master switch on the BE side. Only when
    enabled will the BE process the query limit parameters passed from the
    FE .
    
    1.2 FE-Side Configuration Parameters
    • file_cache_query_limit_percent_soft: Integer type, default value is
    100.
    ◦ Function: Soft limit threshold, used to validate the upper limit of
    session variables' effectiveness .
    
    1.3 FE-Side Session Variables
    • file_cache_query_limit_percent: Integer type (1-100), sets the
    percentage of file cache a query can use.
    ◦ Function: Global session-level file cache query limit percentage .
    • policy_file_cache_query_limit_percent: Integer type (1-100),
    policy-level file cache query limit percentage.
    ◦ Function: Policy-level file cache query limit, which can override the
    global setting .
    
    1.4 Interrelationships of the Three Parameters
    
    file_cache_query_limit_percent_soft (FE Config)
        ↓ (Validates upper limit)
    file_cache_query_limit_percent (Session Variable)
        ↓ (Can be overridden)
    policy_file_cache_query_limit_percent (Policy Session Variable)
        ↓ (Final effective value passed to BE)
    TQueryOptions.file_cache_query_limit_percent
    
    Relationship Description:
    1. Validation Relationship: The value of file_cache_query_limit_percent
    must be between 0 and file_cache_query_limit_percent_soft .
    3. Priority Relationship: policy_file_cache_query_limit_percent has
    higher priority. When set, it overrides file_cache_query_limit_percent .
    4. Effectiveness Mechanism: The FE ultimately passes the validated
    percentage value to the BE for execution via the Thrift protocol .
    
    2. Parameter Passing Mechanism
    
    2.1 Thrift Protocol Definition
    The parameter is defined within the Thrift structure used for query
    options .
    ```
    struct TQueryOptions {
        ...
        1001: optional i32 file_cache_query_limit_percent = -1
        ...
    }
    ```
    
    2.2 Complete FE-to-BE Passing Process
    1.  Session Variable Setting Phase:
        SET file_cache_query_limit_percent = 50;
    SET policy_file_cache_query_limit_percent = 30; -- Optional, overrides
    the above setting
    
    2.  Parameter Validation Phase:
    ◦ The FE validates if the set value is within the range [0,
    file_cache_query_limit_percent_soft] .
        ◦   Validation logic, e.g., "setFileCacheQueryLimitPercent".
    
    3.  Query Options Construction Phase:
    ◦ The FE selects the final effective percentage value based on priority
    rules .
    ◦ The selected value is set into the
    TQueryOptions.file_cache_query_limit_percent field .
    
    5.  Thrift Transmission Phase:
        ◦   The TQueryOptions structure is passed to the BE via Thrift RPC .
    
    6.  BE-Side Reception and Processing Phase:
    ◦ The BE reads query_options.file_cache_query_limit_percent during query
    context initialization .
    ◦ A query-level file cache context is created based on this value .
    
    3. Core Implementation Architecture
    
    3.1 Query Context Management
    •   Initialization Condition Check:
    Within the "QueryContext" constructor, initialization occurs if specific
    conditions are met, including the relevant BE configuration switches and
    the passed query option value .
    ```
        bool initialize_context_holder =
                config::enable_file_cache &&                           // BE 
config switch
                config::enable_file_cache_query_limit &&              // BE 
query limit switch
                query_options.__isset.enable_file_cache &&             // 
FE-passed cache switch
                query_options.enable_file_cache &&                     // Cache 
feature enabled
                query_options.file_cache_query_limit_percent < 100;    // 
Percentage limit effective
    ```
    
    •   Context Creation:
    ```
        if (initialize_context_holder) {
            _query_context_holders = 
io::FileCacheFactory::instance()->get_query_context_holders(
                    _query_id, query_options.file_cache_query_limit_percent);
        }
    ```
    
    
    3.2 Cache Capacity Calculation Mechanism
    • Factory Method Call Chain: The factory method is responsible for
    creating or retrieving the query context based on the query ID and the
    limit percentage .
    • Single Cache Instance Context Creation: The cache instance creates a
    context holder for the query, utilizing the provided percentage .
    
    3.3 Query Cache Capacity Allocation Strategy
    •   Capacity Calculation Logic:
    The actual capacity allocated to a query is calculated as the maximum of
    a percentage of the total cache capacity and a guaranteed minimum
    capacity .
    
    ```
        auto query_context = std::make_shared<QueryFileCacheContext>(
                std::max(_capacity * file_cache_query_limit_percent / 100,    
// Calculate by percentage
                         std::min(_capacity / 2, (size_t)268435456)));         
// Minimum guaranteed capacity
    ```
    
    •   Allocation Strategy Description:
    1. Percentage Calculation: Total Cache Capacity ×
    file_cache_query_limit_percent ÷ 100.
    2. Minimum Guarantee: Ensures each query gets at least the lesser of:
            ▪   50% of the total cache capacity.
            ▪   256MB (268,435,456 bytes).
    3. Final Capacity: The greater of the percentage-calculated value and
    the minimum guaranteed value .
    
    3.4 Cache Reservation and Management Mechanism
    
    •   Query-Level Isolation:
    ◦ Each query is identified by a TUniqueId and has an independent cache
    context .
    ◦ The query context manages the maximum cache capacity available for
    that query .
    ◦ Lifecycle management is handled through QueryFileCacheContextHolder .
    
    •   Resource Cleanup:
    ◦ The cache context is automatically released when the query ends .
        ◦   This prevents memory leaks and resource waste .
        ◦   Supports resource reclamation upon query interruption .
    
    ---
    
    *This implementation provides fine-grained control over FileCache usage
    at the query level, preventing cache pollution while maintaining system
    stability and performance.*
    
    ---------
    
    Co-authored-by: chenhao xu <[email protected]>
    Co-authored-by: xuchenhao <[email protected]>
---
 be/src/common/config.cpp                           |   2 +-
 be/src/common/config.h                             |   2 +-
 be/src/io/cache/block_file_cache.cpp               |  19 +-
 be/src/io/cache/block_file_cache.h                 |   7 +-
 be/src/io/cache/block_file_cache_factory.cpp       |   6 +-
 be/src/io/cache/block_file_cache_factory.h         |   2 +-
 be/src/runtime/query_context.cpp                   |  12 +
 be/src/runtime/query_context.h                     |   2 +
 be/test/io/cache/block_file_cache_test.cpp         |  17 +-
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  25 ++
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 .../cache/test_file_cache_features.groovy          |  91 +++--
 .../cache/test_file_cache_query_limit.groovy       | 395 +++++++++++++++++++++
 .../test_file_cache_query_limit_config.groovy      | 124 +++++++
 .../cache/test_file_cache_statistics.groovy        | 309 ++++++++--------
 .../cache/test_hive_warmup_select.groovy           |   2 +-
 17 files changed, 819 insertions(+), 203 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5e26d6c4ce2..3b9239a8b1a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1133,7 +1133,7 @@ DEFINE_String(file_cache_path, 
"[{\"path\":\"${DORIS_HOME}/file_cache\"}]");
 DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB
 
 DEFINE_Bool(clear_file_cache, "false");
-DEFINE_Bool(enable_file_cache_query_limit, "false");
+DEFINE_mBool(enable_file_cache_query_limit, "false");
 DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
 DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "88");
 DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 02e0542a579..b1cfc7cd604 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1172,7 +1172,7 @@ DECLARE_Bool(enable_file_cache);
 DECLARE_String(file_cache_path);
 DECLARE_Int64(file_cache_each_block_size);
 DECLARE_Bool(clear_file_cache);
-DECLARE_Bool(enable_file_cache_query_limit);
+DECLARE_mBool(enable_file_cache_query_limit);
 DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
 DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
 DECLARE_mBool(enable_evict_file_cache_in_advance);
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index 97454a364ca..0aa4757c089 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -141,8 +141,7 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
                                const FileCacheSettings& cache_settings)
         : _cache_base_path(cache_base_path),
           _capacity(cache_settings.capacity),
-          _max_file_block_size(cache_settings.max_file_block_size),
-          _max_query_cache_size(cache_settings.max_query_cache_size) {
+          _max_file_block_size(cache_settings.max_file_block_size) {
     _cur_cache_size_metrics = 
std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
                                                                      
"file_cache_cache_size", 0);
     _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
@@ -383,7 +382,7 @@ UInt128Wrapper BlockFileCache::hash(const std::string& 
path) {
 }
 
 BlockFileCache::QueryFileCacheContextHolderPtr 
BlockFileCache::get_query_context_holder(
-        const TUniqueId& query_id) {
+        const TUniqueId& query_id, int file_cache_query_limit_percent) {
     SCOPED_CACHE_LOCK(_mutex, this);
     if (!config::enable_file_cache_query_limit) {
         return {};
@@ -391,7 +390,7 @@ BlockFileCache::QueryFileCacheContextHolderPtr 
BlockFileCache::get_query_context
 
     /// if enable_filesystem_query_cache_limit is true,
     /// we create context query for current query.
-    auto context = get_or_set_query_context(query_id, cache_lock);
+    auto context = get_or_set_query_context(query_id, cache_lock, 
file_cache_query_limit_percent);
     return std::make_unique<QueryFileCacheContextHolder>(query_id, this, 
context);
 }
 
@@ -411,7 +410,8 @@ void BlockFileCache::remove_query_context(const TUniqueId& 
query_id) {
 }
 
 BlockFileCache::QueryFileCacheContextPtr 
BlockFileCache::get_or_set_query_context(
-        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
+        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
+        int file_cache_query_limit_percent) {
     if (query_id.lo == 0 && query_id.hi == 0) {
         return nullptr;
     }
@@ -421,7 +421,14 @@ BlockFileCache::QueryFileCacheContextPtr 
BlockFileCache::get_or_set_query_contex
         return context;
     }
 
-    auto query_context = 
std::make_shared<QueryFileCacheContext>(_max_query_cache_size);
+    size_t file_cache_query_limit_size = _capacity * 
file_cache_query_limit_percent / 100;
+    if (file_cache_query_limit_size < 268435456) {
+        LOG(WARNING) << "The user-set file cache query limit (" << 
file_cache_query_limit_size
+                     << " bytes) is less than the 256MB recommended minimum. "
+                     << "Consider increasing the session variable 
'file_cache_query_limit_percent'"
+                     << " from its current value " << 
file_cache_query_limit_percent << "%.";
+    }
+    auto query_context = 
std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
     auto query_iter = _query_map.emplace(query_id, query_context).first;
     return query_iter->second;
 }
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index 50c6fe349a9..53b82d24fd9 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -347,7 +347,8 @@ public:
     void remove_query_context(const TUniqueId& query_id);
 
     QueryFileCacheContextPtr get_or_set_query_context(const TUniqueId& 
query_id,
-                                                      
std::lock_guard<std::mutex>&);
+                                                      
std::lock_guard<std::mutex>& cache_lock,
+                                                      int 
file_cache_query_limit_percent);
 
     /// Save a query context information, and adopt different cache policies
     /// for different queries through the context cache layer.
@@ -373,7 +374,8 @@ public:
         QueryFileCacheContextPtr context;
     };
     using QueryFileCacheContextHolderPtr = 
std::unique_ptr<QueryFileCacheContextHolder>;
-    QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& 
query_id);
+    QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& 
query_id,
+                                                            int 
file_cache_query_limit_percent);
 
     int64_t approximate_available_cache_size() const {
         return std::max<int64_t>(
@@ -501,7 +503,6 @@ private:
     std::string _cache_base_path;
     size_t _capacity = 0;
     size_t _max_file_block_size = 0;
-    size_t _max_query_cache_size = 0;
 
     mutable std::mutex _mutex;
     bool _close {false};
diff --git a/be/src/io/cache/block_file_cache_factory.cpp 
b/be/src/io/cache/block_file_cache_factory.cpp
index afbbe69afaa..da7ef80e58a 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -232,10 +232,12 @@ BlockFileCache* FileCacheFactory::get_by_path(const 
std::string& cache_base_path
 }
 
 std::vector<BlockFileCache::QueryFileCacheContextHolderPtr>
-FileCacheFactory::get_query_context_holders(const TUniqueId& query_id) {
+FileCacheFactory::get_query_context_holders(const TUniqueId& query_id,
+                                            int 
file_cache_query_limit_percent) {
     std::vector<BlockFileCache::QueryFileCacheContextHolderPtr> holders;
     for (const auto& cache : _caches) {
-        holders.push_back(cache->get_query_context_holder(query_id));
+        holders.push_back(
+                cache->get_query_context_holder(query_id, 
file_cache_query_limit_percent));
     }
     return holders;
 }
diff --git a/be/src/io/cache/block_file_cache_factory.h 
b/be/src/io/cache/block_file_cache_factory.h
index 6b746f2a52d..c015f7bcf62 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -77,7 +77,7 @@ public:
     BlockFileCache* get_by_path(const UInt128Wrapper& hash);
     BlockFileCache* get_by_path(const std::string& cache_base_path);
     std::vector<BlockFileCache::QueryFileCacheContextHolderPtr> 
get_query_context_holders(
-            const TUniqueId& query_id);
+            const TUniqueId& query_id, int file_cache_query_limit_percent);
 
     /**
      * Clears data of all file cache instances
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index aa5fd4b564f..a0c271efd39 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -111,6 +111,18 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
 
     _timeout_second = query_options.execution_timeout;
 
+    bool initialize_context_holder =
+            config::enable_file_cache && config::enable_file_cache_query_limit 
&&
+            query_options.__isset.enable_file_cache && 
query_options.enable_file_cache &&
+            query_options.__isset.file_cache_query_limit_percent &&
+            query_options.file_cache_query_limit_percent < 100;
+
+    // Initialize file cache context holders
+    if (initialize_context_holder) {
+        _query_context_holders = 
io::FileCacheFactory::instance()->get_query_context_holders(
+                _query_id, query_options.file_cache_query_limit_percent);
+    }
+
     bool is_query_type_valid = query_options.query_type == TQueryType::SELECT 
||
                                query_options.query_type == TQueryType::LOAD ||
                                query_options.query_type == 
TQueryType::EXTERNAL;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c504dcc66aa..5c77a427f59 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -388,6 +388,8 @@ private:
     std::string _load_error_url;
     std::string _first_error_msg;
 
+    // file cache context holders
+    std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr> 
_query_context_holders;
     // instance id + node id -> cte scan
     std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> 
_cte_scan;
     std::mutex _cte_scan_lock;
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 52182726edd..98970d26b39 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -983,6 +983,7 @@ TEST_F(BlockFileCacheTest, init) {
         )");
     cache_paths.clear();
     EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
+    config::enable_file_cache_query_limit = false;
 }
 
 TEST_F(BlockFileCacheTest, normal) {
@@ -1257,7 +1258,7 @@ TEST_F(BlockFileCacheTest, 
query_limit_heap_use_after_free) {
     query_id.hi = 1;
     query_id.lo = 1;
     context.query_id = query_id;
-    auto query_context_holder = cache.get_query_context_holder(query_id);
+    auto query_context_holder = cache.get_query_context_holder(query_id, 100);
     {
         auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9, 
9]
         auto blocks = fromHolder(holder);
@@ -1344,7 +1345,7 @@ TEST_F(BlockFileCacheTest, query_limit_dcheck) {
     query_id.hi = 1;
     query_id.lo = 1;
     context.query_id = query_id;
-    auto query_context_holder = cache.get_query_context_holder(query_id);
+    auto query_context_holder = cache.get_query_context_holder(query_id, 100);
     {
         auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9, 
9]
         auto blocks = fromHolder(holder);
@@ -3003,7 +3004,7 @@ TEST_F(BlockFileCacheTest, test_query_limit) {
         }
         ASSERT_LT(i, 1000);
         auto query_context_holder =
-                
FileCacheFactory::instance()->get_query_context_holders(query_id);
+                
FileCacheFactory::instance()->get_query_context_holders(query_id, 50);
         for (int64_t offset = 0; offset < 60; offset += 5) {
             auto holder = cache->get_or_set(key, offset, 5, context);
             auto blocks = fromHolder(holder);
@@ -3178,7 +3179,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
             };
             std::this_thread::sleep_for(std::chrono::milliseconds(1));
         }
-        EXPECT_EQ(cache.get_query_context_holder(id), nullptr);
+        EXPECT_EQ(cache.get_query_context_holder(id, 50), nullptr);
     }
     config::enable_file_cache_query_limit = true;
     io::BlockFileCache cache(cache_base_path, settings);
@@ -3190,9 +3191,9 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
         std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
     id.hi = id.lo = 0;
-    EXPECT_EQ(cache.get_query_context_holder(id)->context, nullptr);
+    EXPECT_EQ(cache.get_query_context_holder(id, 50)->context, nullptr);
     id.hi = id.lo = 1;
-    auto query_ctx_1 = cache.get_query_context_holder(id);
+    auto query_ctx_1 = cache.get_query_context_holder(id, 50);
     ASSERT_NE(query_ctx_1, nullptr);
     for (int64_t offset = 0; offset < 60; offset += 5) {
         auto holder = cache.get_or_set(key, offset, 5, context);
@@ -3205,7 +3206,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
         assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 4),
                      io::FileBlock::State::DOWNLOADED);
     }
-    auto query_ctx_2 = cache.get_query_context_holder(id);
+    auto query_ctx_2 = cache.get_query_context_holder(id, 50);
     EXPECT_EQ(query_ctx_1->query_id, query_ctx_2->query_id);
     std::lock_guard lock(cache._mutex);
     EXPECT_EQ(query_ctx_1->context->get_cache_size(lock),
@@ -3248,7 +3249,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) {
         };
         std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
-    auto query_ctx_1 = cache.get_query_context_holder(id);
+    auto query_ctx_1 = cache.get_query_context_holder(id, 50);
     ASSERT_NE(query_ctx_1, nullptr);
     {
         auto holder = cache.get_or_set(key, 0, 5, context);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 16ab9a68c32..448c7747b46 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3771,6 +3771,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static String aws_credentials_provider_version = "v2";
 
+    @ConfField(mutable = true, description = {
+        "用户的单个查询能使用的 FILE_CACHE 比例的软上限(取值范围 1 到 100),100表示能够使用全量 FILE_CACHE",
+        "The soft upper limit of FILE_CACHE percent that a single query of a 
user can use, (range: 1 to 100).",
+        "100 indicate that the full FILE_CACHE capacity can be used. "
+    })
+    public static int file_cache_query_limit_max_percent = 100;
     @ConfField(description = {
             "AWS SDK 用于调度异步重试、超时任务以及其他后台操作的线程池大小,全局共享",
             "The thread pool size used by the AWS SDK to schedule asynchronous 
retries, timeout tasks, "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 54ae4852c0c..f1897e325aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -476,6 +476,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String DISABLE_FILE_CACHE = "disable_file_cache";
 
+    public static final String FILE_CACHE_QUERY_LIMIT_PERCENT = 
"file_cache_query_limit_percent";
+
     public static final String FILE_CACHE_BASE_PATH = "file_cache_base_path";
 
     public static final String ENABLE_INVERTED_INDEX_QUERY = 
"enable_inverted_index_query";
@@ -2768,6 +2770,23 @@ public class SessionVariable implements Serializable, 
Writable {
             "Make the READ_SLICE_SIZE variable configurable to reduce the 
impact caused by read amplification."})
     public int mergeReadSliceSizeBytes = 8388608;
 
+    @VariableMgr.VarAttr(name = FILE_CACHE_QUERY_LIMIT_PERCENT, needForward = 
true,
+            checker = "checkFileCacheQueryLimitPercent",
+            description = {"限制用户的单个查询能使用的 FILE_CACHE 比例 "
+                    + "(用户设置,取值范围 1 到 
Config.file_cache_query_limit_max_percent)。",
+                    "Limit the FILE_CACHE percent that a single query of a 
user can use "
+                    + "(set by user via session variables, range: 1 to 
Config.file_cache_query_limit_max_percent)."})
+    public int fileCacheQueryLimitPercent = -1;
+
+    public void checkFileCacheQueryLimitPercent(String 
fileCacheQueryLimitPercentStr) {
+        int fileCacheQueryLimitPct = 
Integer.valueOf(fileCacheQueryLimitPercentStr);
+        if (fileCacheQueryLimitPct < 1 || fileCacheQueryLimitPct > 
Config.file_cache_query_limit_max_percent) {
+            throw new InvalidParameterException(
+                String.format("file_cache_query_limit_percent should be 
between 1 and %d",
+                Config.file_cache_query_limit_max_percent));
+        }
+    }
+
     public void setAggPhase(int phase) {
         aggPhase = phase;
     }
@@ -5101,6 +5120,12 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setIvfNprobe(ivfNprobe);
         tResult.setMergeReadSliceSize(mergeReadSliceSizeBytes);
         tResult.setEnableExtendedRegex(enableExtendedRegex);
+        if (fileCacheQueryLimitPercent > 0) {
+            
tResult.setFileCacheQueryLimitPercent(Math.min(fileCacheQueryLimitPercent,
+                    Config.file_cache_query_limit_max_percent));
+        } else {
+            
tResult.setFileCacheQueryLimitPercent(Config.file_cache_query_limit_max_percent);
+        }
 
         // Set Iceberg write target file size
         
tResult.setIcebergWriteTargetFileSizeBytes(icebergWriteTargetFileSizeBytes);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 9d7cc861a6d..133dce592e7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -429,6 +429,7 @@ struct TQueryOptions {
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.
   1000: optional bool disable_file_cache = false
+  1001: optional i32 file_cache_query_limit_percent = -1
 }
 
 
diff --git 
a/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
 
b/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
index 5951b9c79c5..aad83103023 100644
--- 
a/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
+++ 
b/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
@@ -121,7 +121,17 @@ suite("test_file_cache_features", 
"external_docker,hive,external_docker_hive,p0,
         assertTrue(false, INITIAL_VALUES_NOT_ZERO_CHECK_FAILED_MSG +
             "disk_resource_limit_mode: ${initialDiskResourceLimitMode}, 
need_evict_cache_in_advance: ${initialNeedEvictCacheInAdvance}")
     }
-    
+
+    def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend 
config like 'file_cache_background_monitor_interval_ms';"""
+    logger.info("file_cache_background_monitor_interval_ms configuration: " + 
fileCacheBackgroundMonitorIntervalMsResult)
+    assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 || 
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+            fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(), 
"file_cache_background_monitor_interval_ms is empty or not set to true")
+
+    // brpc metrics will be updated at most 5 seconds
+    def totalWaitTime = 
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toInteger() / 1000) as int
+    def interval = 1
+    def iterations = totalWaitTime / interval
+
     // Set backend configuration parameters for testing
     boolean diskResourceLimitModeTestPassed = true
     setBeConfigTemporary([
@@ -134,26 +144,31 @@ suite("test_file_cache_features", 
"external_docker,hive,external_docker_hive,p0,
         
         // Wait for disk_resource_limit_mode metric to change to 1
         try {
-            Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until {
-                def updatedDiskResourceLimitModeResult = sql """select 
METRIC_VALUE from information_schema.file_cache_statistics 
-                    where METRIC_NAME = 'disk_resource_limit_mode' limit 1;"""
-                logger.info("Checking disk_resource_limit_mode result: " + 
updatedDiskResourceLimitModeResult)
-                
-                if (updatedDiskResourceLimitModeResult.size() > 0) {
-                    double updatedDiskResourceLimitMode = 
Double.valueOf(updatedDiskResourceLimitModeResult[0][0])
-                    logger.info("Current disk_resource_limit_mode value: 
${updatedDiskResourceLimitMode}")
-                    
-                    if (updatedDiskResourceLimitMode == 1.0) {
-                        logger.info("Disk resource limit mode is now active 
(value = 1)")
-                        return true
-                    } else {
-                        logger.info("Disk resource limit mode is not yet 
active (value = ${updatedDiskResourceLimitMode}), waiting...")
-                        return false
-                    }
+            (1..iterations).each { count ->
+                Thread.sleep(interval * 1000)
+                def elapsedSeconds = count * interval
+                def remainingSeconds = totalWaitTime - elapsedSeconds
+                logger.info("Waited for backend configuration update 
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+            }
+
+            def updatedDiskResourceLimitModeResult = sql """select 
METRIC_VALUE from information_schema.file_cache_statistics
+                where METRIC_NAME = 'disk_resource_limit_mode' limit 1;"""
+            logger.info("Checking disk_resource_limit_mode result: " + 
updatedDiskResourceLimitModeResult)
+
+            if (updatedDiskResourceLimitModeResult.size() > 0) {
+                double updatedDiskResourceLimitMode = 
Double.valueOf(updatedDiskResourceLimitModeResult[0][0])
+                logger.info("Current disk_resource_limit_mode value: 
${updatedDiskResourceLimitMode}")
+
+                if (updatedDiskResourceLimitMode == 1.0) {
+                    logger.info("Disk resource limit mode is now active (value 
= 1)")
+                    return true
                 } else {
-                    logger.info("Failed to get disk_resource_limit_mode 
metric, waiting...")
+                    logger.info("Disk resource limit mode is not yet active 
(value = ${updatedDiskResourceLimitMode}), waiting...")
                     return false
                 }
+            } else {
+                logger.info("Failed to get disk_resource_limit_mode metric, 
waiting...")
+                return false
             }
         } catch (Exception e) {
             logger.info(DISK_RESOURCE_LIMIT_MODE_TEST_FAILED_MSG + 
e.getMessage())
@@ -182,26 +197,31 @@ suite("test_file_cache_features", 
"external_docker,hive,external_docker_hive,p0,
         
         // Wait for need_evict_cache_in_advance metric to change to 1
         try {
-            Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until {
-                def updatedNeedEvictCacheInAdvanceResult = sql """select 
METRIC_VALUE from information_schema.file_cache_statistics 
-                    where METRIC_NAME = 'need_evict_cache_in_advance' limit 
1;"""
-                logger.info("Checking need_evict_cache_in_advance result: " + 
updatedNeedEvictCacheInAdvanceResult)
-                
-                if (updatedNeedEvictCacheInAdvanceResult.size() > 0) {
-                    double updatedNeedEvictCacheInAdvance = 
Double.valueOf(updatedNeedEvictCacheInAdvanceResult[0][0])
-                    logger.info("Current need_evict_cache_in_advance value: 
${updatedNeedEvictCacheInAdvance}")
-                    
-                    if (updatedNeedEvictCacheInAdvance == 1.0) {
-                        logger.info("Need evict cache in advance mode is now 
active (value = 1)")
-                        return true
-                    } else {
-                        logger.info("Need evict cache in advance mode is not 
yet active (value = ${updatedNeedEvictCacheInAdvance}), waiting...")
-                        return false
-                    }
+            (1..iterations).each { count ->
+                Thread.sleep(interval * 1000)
+                def elapsedSeconds = count * interval
+                def remainingSeconds = totalWaitTime - elapsedSeconds
+                logger.info("Waited for backend configuration update 
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+            }
+
+            def updatedNeedEvictCacheInAdvanceResult = sql """select 
METRIC_VALUE from information_schema.file_cache_statistics
+                where METRIC_NAME = 'need_evict_cache_in_advance' limit 1;"""
+            logger.info("Checking need_evict_cache_in_advance result: " + 
updatedNeedEvictCacheInAdvanceResult)
+
+            if (updatedNeedEvictCacheInAdvanceResult.size() > 0) {
+                double updatedNeedEvictCacheInAdvance = 
Double.valueOf(updatedNeedEvictCacheInAdvanceResult[0][0])
+                logger.info("Current need_evict_cache_in_advance value: 
${updatedNeedEvictCacheInAdvance}")
+
+                if (updatedNeedEvictCacheInAdvance == 1.0) {
+                    logger.info("Need evict cache in advance mode is now 
active (value = 1)")
+                    return true
                 } else {
-                    logger.info("Failed to get need_evict_cache_in_advance 
metric, waiting...")
+                    logger.info("Need evict cache in advance mode is not yet 
active (value = ${updatedNeedEvictCacheInAdvance}), waiting...")
                     return false
                 }
+            } else {
+                logger.info("Failed to get need_evict_cache_in_advance metric, 
waiting...")
+                return false
             }
         } catch (Exception e) {
             logger.info(NEED_EVICT_CACHE_IN_ADVANCE_TEST_FAILED_MSG + 
e.getMessage())
@@ -219,4 +239,3 @@ suite("test_file_cache_features", 
"external_docker,hive,external_docker_hive,p0,
     sql """set global enable_file_cache=false"""
     return true
 }
-
diff --git 
a/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
 
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
new file mode 100644
index 00000000000..a6ccf96c38c
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.dgmimpl.arrays.LongArrayGetAtMetaMethod
+
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+
+// Constants for backend configuration check
+final String BACKEND_CONFIG_CHECK_FAILED_PREFIX = "Backend configuration check 
failed: "
+final String ENABLE_FILE_CACHE_CHECK_FAILED_MSG = 
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache is empty or not set to 
true"
+final String FILE_CACHE_BACKGROUND_MONITOR_INTERVAL_CHECK_FAILED_MSG = 
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_background_monitor_interval_ms 
is empty or not set to true"
+final String FILE_CACHE_PATH_CHECK_FAILED_MSG = 
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_path is empty or not 
configured"
+final String WEB_SERVER_PORT_CHECK_FAILED_MSG = 
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "webserver_port is empty or not configured"
+final String BRPC_PORT_CHECK_FAILED_MSG = BACKEND_CONFIG_CHECK_FAILED_PREFIX + 
"brpc_port is empty or not configured"
+final String ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_FALSE_FAILED_MSG = 
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache_query_limit is empty or 
not set to false"
+final String ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_TRUE_FAILED_MSG = 
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache_query_limit is empty or 
not set to true"
+final String FILE_CACHE_QUERY_LIMIT_BYTES_CHECK_FAILED_MSG = 
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_query_limit_bytes is empty or 
not configured"
+// Constants for cache query features check
+final String FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX = "File cache features 
check failed: "
+final String BASE_NORMAL_QUEUE_CURR_SIZE_IS_ZERO_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "base normal_queue_curr_size is 0"
+final String BASE_NORMAL_QUEUE_CURR_ELEMENTS_IS_ZERO_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "base normal_queue_curr_elements is 0"
+final String TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "total_read_counts did not increase 
after cache operation"
+final String INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_curr_size is 
not 0"
+final String INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_curr_elements 
is not 0"
+final String INITIAL_NORMAL_QUEUE_MAX_SIZE_IS_ZERO_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_max_size is 0"
+final String INITIAL_NORMAL_QUEUE_MAX_ELEMENTS_IS_ZERO_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_max_elements is 
0"
+final String NORMAL_QUEUE_CURR_SIZE_NOT_GREATER_THAN_ZERO_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_size is not 
greater than 0 after cache operation"
+final String NORMAL_QUEUE_CURR_ELEMENTS_NOT_GREATER_THAN_ZERO_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_elements is not 
greater than 0 after cache operation"
+final String NORMAL_QUEUE_CURR_SIZE_GREATER_THAN_QUERY_CACHE_CAPACITY_MSG = 
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_size is greater 
than query cache capacity"
+
+suite("test_file_cache_query_limit", 
"external_docker,hive,external_docker_hive,p0,external,nonConcurrent") {
+    String enableHiveTest = context.config.otherConfigs.get("enableHiveTest")
+    if (enableHiveTest == null || !enableHiveTest.equalsIgnoreCase("true")) {
+        logger.info("disable hive test.")
+        return
+    }
+
+    sql """set enable_file_cache=true"""
+
+    // Check backend configuration prerequisites
+    // Note: This test case assumes a single backend scenario. Testing with 
single backend is logically equivalent
+    // to testing with multiple backends having identical configurations, but 
simpler in logic.
+    def enableFileCacheResult = sql """show backend config like 
'enable_file_cache';"""
+    logger.info("enable_file_cache configuration: " + enableFileCacheResult)
+    assertFalse(enableFileCacheResult.size() == 0 || 
!enableFileCacheResult[0][3].equalsIgnoreCase("true"),
+            ENABLE_FILE_CACHE_CHECK_FAILED_MSG)
+
+    def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend 
config like 'file_cache_background_monitor_interval_ms';"""
+    logger.info("file_cache_background_monitor_interval_ms configuration: " + 
fileCacheBackgroundMonitorIntervalMsResult)
+    assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 || 
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+            fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(), 
FILE_CACHE_BACKGROUND_MONITOR_INTERVAL_CHECK_FAILED_MSG)
+
+    String catalog_name = "test_file_cache_query_limit"
+    String ex_db_name = "tpch1_parquet"
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
+    int queryCacheCapacity
+
+    sql """drop catalog if exists ${catalog_name} """
+
+    sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='hms',
+        'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
+        'hadoop.username' = 'hive'
+    );"""
+
+    String query_sql =
+            """select sum(l_quantity) as sum_qty,
+            sum(l_extendedprice) as sum_base_price,
+            sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+            sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as 
sum_charge,
+            avg(l_quantity) as avg_qty,
+            avg(l_extendedprice) as avg_price,
+            avg(l_discount) as avg_disc,
+            count(*) as count_order
+            from ${catalog_name}.${ex_db_name}.lineitem
+            where l_shipdate <= date '1998-12-01' - interval '90' day
+            group by l_returnflag, l_linestatus
+            order by l_returnflag, l_linestatus;"""
+
+    def webserverPortResult = sql """SHOW BACKEND CONFIG LIKE 
'webserver_port';"""
+    logger.info("webserver_port configuration: " + webserverPortResult)
+    assertFalse(webserverPortResult.size() == 0 || webserverPortResult[0][3] 
== null || webserverPortResult[0][3].trim().isEmpty(),
+            WEB_SERVER_PORT_CHECK_FAILED_MSG)
+
+    String webserver_port = webserverPortResult[0][3]
+
+    def brpcPortResult = sql """SHOW BACKEND CONFIG LIKE 'brpc_port';"""
+    logger.info("brpcPortResult configuration: " + brpcPortResult)
+    assertFalse(brpcPortResult.size() == 0 || brpcPortResult[0][3] == null || 
brpcPortResult[0][3].trim().isEmpty(),
+            BRPC_PORT_CHECK_FAILED_MSG)
+
+    String brpc_port = brpcPortResult[0][3]
+
+    // Search file cache capacity
+    def command = ["curl", "-X", "POST", "${externalEnvIp}:${brpc_port}/vars"]
+    def stringCommand = command.collect{it.toString()}
+    def process = new ProcessBuilder(stringCommand as 
String[]).redirectErrorStream(true).start()
+
+    def output = new StringBuilder()
+    def errorOutput = new StringBuilder()
+    process.inputStream.eachLine{line -> output.append(line).append("\n")}
+    process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+    // Wait for process completion and check exit status
+    def exitCode = process.waitFor()
+    def fileCacheCapacityResult = output.toString().split("\n").find { 
it.contains("file_cache_capacity") }?.split(":")?.last()?.trim()
+
+    logger.info("File cache capacity: ${fileCacheCapacityResult}")
+    assertTrue(fileCacheCapacityResult != null, "Failed to find 
file_cache_capacity in brpc metrics")
+    def fileCacheCapacity = Long.valueOf(fileCacheCapacityResult)
+
+    // Run file cache base test for setting the parameter 
file_cache_query_limit_bytes
+    logger.info("========================= Start running file cache base test 
========================")
+
+    // Clear file cache
+    command = ["curl", "-X", "POST", 
"${externalEnvIp}:${webserver_port}/api/file_cache?op=clear&sync=true"]
+    stringCommand = command.collect{it.toString()}
+    process = new ProcessBuilder(stringCommand as 
String[]).redirectErrorStream(true).start()
+
+    output = new StringBuilder()
+    errorOutput = new StringBuilder()
+    process.inputStream.eachLine{line -> output.append(line).append("\n")}
+    process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+    // Wait for process completion and check exit status
+    exitCode = process.waitFor()
+    logger.info("File cache clear command output: ${output.toString()}")
+    assertTrue(exitCode == 0, "File cache clear failed with exit code 
${exitCode}. Error: ${errorOutput.toString()}")
+
+    // brpc metrics will be updated at most 5 seconds
+    def totalWaitTime = 
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toLong() / 1000) as int
+    def interval = 1
+    def iterations = totalWaitTime / interval
+
+    // Waiting for file cache clearing
+    (1..iterations).each { count ->
+        Thread.sleep(interval * 1000)
+        def elapsedSeconds = count * interval
+        def remainingSeconds = totalWaitTime - elapsedSeconds
+        logger.info("Waited for file cache clearing ${elapsedSeconds} seconds, 
${remainingSeconds} seconds remaining")
+    }
+
+    def initialNormalQueueCurrSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+    logger.info("normal_queue_curr_size result: " + 
initialNormalQueueCurrSizeResult)
+    assertFalse(initialNormalQueueCurrSizeResult.size() == 0 || 
Double.valueOf(initialNormalQueueCurrSizeResult[0][0]) != 0.0,
+            INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG)
+
+    // Check normal queue current elements
+    def initialNormalQueueCurrElementsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+    logger.info("normal_queue_curr_elements result: " + 
initialNormalQueueCurrElementsResult)
+    assertFalse(initialNormalQueueCurrElementsResult.size() == 0 || 
Double.valueOf(initialNormalQueueCurrElementsResult[0][0]) != 0.0,
+            INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG)
+
+    double initialNormalQueueCurrSize = 
Double.valueOf(initialNormalQueueCurrSizeResult[0][0])
+    double initialNormalQueueCurrElements = 
Double.valueOf(initialNormalQueueCurrElementsResult[0][0])
+
+    logger.info("Initial normal queue curr size and elements - size: 
${initialNormalQueueCurrSize} , " +
+            "elements: ${initialNormalQueueCurrElements}")
+
+    setBeConfigTemporary([
+            "enable_file_cache_query_limit": "false"
+    ]) {
+        // Execute test logic with modified configuration for 
file_cache_query_limit
+        logger.info("Backend configuration set - 
enable_file_cache_query_limit: false")
+
+        // Waiting for backend configuration update
+        (1..iterations).each { count ->
+            Thread.sleep(interval * 1000)
+            def elapsedSeconds = count * interval
+            def remainingSeconds = totalWaitTime - elapsedSeconds
+            logger.info("Waited for backend configuration update 
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+        }
+
+        // Check if the configuration is modified
+        def enableFileCacheQueryLimitResult = sql """SHOW BACKEND CONFIG LIKE 
'enable_file_cache_query_limit';"""
+        logger.info("enable_file_cache_query_limit configuration: " + 
enableFileCacheQueryLimitResult)
+        assertFalse(enableFileCacheQueryLimitResult.size() == 0 || 
enableFileCacheQueryLimitResult[0][3] == null || 
enableFileCacheQueryLimitResult[0][3] != "false",
+                ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_FALSE_FAILED_MSG)
+
+        sql """switch ${catalog_name}"""
+        // load the table into file cache
+        sql query_sql
+
+        // Waiting for file cache statistics update
+        (1..iterations).each { count ->
+            Thread.sleep(interval * 1000)
+            def elapsedSeconds = count * interval
+            def remainingSeconds = totalWaitTime - elapsedSeconds
+            logger.info("Waited for file cache statistics update 
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+        }
+
+        def baseNormalQueueCurrElementsResult = sql """select METRIC_VALUE 
from information_schema.file_cache_statistics
+            where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+        logger.info("normal_queue_curr_elements result: " + 
baseNormalQueueCurrElementsResult)
+        assertFalse(baseNormalQueueCurrElementsResult.size() == 0 || 
Double.valueOf(baseNormalQueueCurrElementsResult[0][0]) == 0.0,
+                BASE_NORMAL_QUEUE_CURR_ELEMENTS_IS_ZERO_MSG)
+
+        def baseNormalQueueCurrSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+        logger.info("normal_queue_curr_size result: " + 
baseNormalQueueCurrSizeResult)
+        assertFalse(baseNormalQueueCurrSizeResult.size() == 0 || 
Double.valueOf(baseNormalQueueCurrSizeResult[0][0]) == 0.0,
+                BASE_NORMAL_QUEUE_CURR_SIZE_IS_ZERO_MSG)
+
+        int baseNormalQueueCurrElements = 
Double.valueOf(baseNormalQueueCurrElementsResult[0][0]) as Long
+        queryCacheCapacity = 
Double.valueOf(baseNormalQueueCurrSizeResult[0][0]) as Long
+    }
+
+    // The parameter file_cache_query_limit_percent must be set smaller than 
the cache capacity required by the query
+    def fileCacheQueryLimitPercent = (queryCacheCapacity / fileCacheCapacity) 
* 100
+    logger.info("file_cache_query_limit_percent: " + 
fileCacheQueryLimitPercent)
+
+    logger.info("========================== End running file cache base test 
=========================")
+
+    logger.info("==================== Start running file cache query limit 
test 1 ====================")
+
+    def fileCacheQueryLimitPercentTest1 = (fileCacheQueryLimitPercent / 2) as 
Long
+    logger.info("file_cache_query_limit_percent_test1: " + 
fileCacheQueryLimitPercentTest1)
+
+    // Clear file cache
+    process = new ProcessBuilder(stringCommand as 
String[]).redirectErrorStream(true).start()
+
+    output = new StringBuilder()
+    errorOutput = new StringBuilder()
+    process.inputStream.eachLine{line -> output.append(line).append("\n")}
+    process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+    // Wait for process completion and check exit status
+    exitCode = process.waitFor()
+    logger.info("File cache clear command output: ${output.toString()}")
+    assertTrue(exitCode == 0, "File cache clear failed with exit code 
${exitCode}. Error: ${errorOutput.toString()}")
+
+    // Waiting for file cache clearing
+    (1..iterations).each { count ->
+        Thread.sleep(interval * 1000)
+        def elapsedSeconds = count * interval
+        def remainingSeconds = totalWaitTime - elapsedSeconds
+        logger.info("Waited for file cache clearing ${elapsedSeconds} seconds, 
${remainingSeconds} seconds remaining")
+    }
+
+    // ===== Normal Queue Metrics Check =====
+    // Check normal queue current size
+    initialNormalQueueCurrSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+    logger.info("normal_queue_curr_size result: " + 
initialNormalQueueCurrSizeResult)
+    assertFalse(initialNormalQueueCurrSizeResult.size() == 0 || 
Double.valueOf(initialNormalQueueCurrSizeResult[0][0]) != 0.0,
+            INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG)
+
+    // Check normal queue current elements
+    initialNormalQueueCurrElementsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+    logger.info("normal_queue_curr_elements result: " + 
initialNormalQueueCurrElementsResult)
+    assertFalse(initialNormalQueueCurrElementsResult.size() == 0 || 
Double.valueOf(initialNormalQueueCurrElementsResult[0][0]) != 0.0,
+            INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG)
+
+    // Check normal queue max size
+    def initialNormalQueueMaxSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
+    logger.info("normal_queue_max_size result: " + 
initialNormalQueueMaxSizeResult)
+    assertFalse(initialNormalQueueMaxSizeResult.size() == 0 || 
Double.valueOf(initialNormalQueueMaxSizeResult[0][0]) == 0.0,
+            INITIAL_NORMAL_QUEUE_MAX_SIZE_IS_ZERO_MSG)
+
+    // Check normal queue max elements
+    def initialNormalQueueMaxElementsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
+    logger.info("normal_queue_max_elements result: " + 
initialNormalQueueMaxElementsResult)
+    assertFalse(initialNormalQueueMaxElementsResult.size() == 0 || 
Double.valueOf(initialNormalQueueMaxElementsResult[0][0]) == 0.0,
+            INITIAL_NORMAL_QUEUE_MAX_ELEMENTS_IS_ZERO_MSG)
+
+    initialNormalQueueCurrSize = 
Double.valueOf(initialNormalQueueCurrSizeResult[0][0])
+    initialNormalQueueCurrElements = 
Double.valueOf(initialNormalQueueCurrElementsResult[0][0])
+    double initialNormalQueueMaxSize = 
Double.valueOf(initialNormalQueueMaxSizeResult[0][0])
+    double initialNormalQueueMaxElements = 
Double.valueOf(initialNormalQueueMaxElementsResult[0][0])
+
+    logger.info("Initial normal queue curr size and elements - size: 
${initialNormalQueueCurrSize} , " +
+                "elements: ${initialNormalQueueCurrElements}")
+
+    logger.info("Initial normal queue max size and elements - size: 
${initialNormalQueueMaxSize} , " +
+                "elements: ${initialNormalQueueMaxElements}")
+
+    // ===== Hit And Read Counts Metrics Check =====
+    // Get initial values for hit and read counts
+    def initialTotalHitCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'total_hit_counts' limit 1;"""
+    logger.info("Initial total_hit_counts result: " + 
initialTotalHitCountsResult)
+
+    def initialTotalReadCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+            where METRIC_NAME = 'total_read_counts' limit 1;"""
+    logger.info("Initial total_read_counts result: " + 
initialTotalReadCountsResult)
+
+    // Store initial values
+    double initialTotalHitCounts = 
Double.valueOf(initialTotalHitCountsResult[0][0])
+    double initialTotalReadCounts = 
Double.valueOf(initialTotalReadCountsResult[0][0])
+
+    // Set backend configuration parameters for file_cache_query_limit test 1
+    setBeConfigTemporary([
+            "enable_file_cache_query_limit": "true"
+    ]) {
+        // Execute test logic with modified configuration for 
file_cache_query_limit
+        logger.info("Backend configuration set - 
enable_file_cache_query_limit: true")
+
+        sql """set file_cache_query_limit_percent =  
${fileCacheQueryLimitPercentTest1}"""
+
+        // Waiting for backend configuration update
+        (1..iterations).each { count ->
+            Thread.sleep(interval * 1000)
+            def elapsedSeconds = count * interval
+            def remainingSeconds = totalWaitTime - elapsedSeconds
+            logger.info("Waited for backend configuration update 
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+        }
+
+        // Check if the configuration is modified
+        def enableFileCacheQueryLimitResult = sql """SHOW BACKEND CONFIG LIKE 
'enable_file_cache_query_limit';"""
+        logger.info("enable_file_cache_query_limit configuration: " + 
enableFileCacheQueryLimitResult)
+        assertFalse(enableFileCacheQueryLimitResult.size() == 0 || 
enableFileCacheQueryLimitResult[0][3] == null || 
enableFileCacheQueryLimitResult[0][3] != "true",
+                ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_TRUE_FAILED_MSG)
+
+        sql """switch ${catalog_name}"""
+
+        // load the table into file cache
+        sql query_sql
+
+        // Waiting for file cache statistics update
+        (1..iterations).each { count ->
+            Thread.sleep(interval * 1000)
+            def elapsedSeconds = count * interval
+            def remainingSeconds = totalWaitTime - elapsedSeconds
+            logger.info("Waited for file cache statistics update 
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+        }
+
+        // Get updated value of normal queue current elements and max elements 
after cache operations
+        def updatedNormalQueueCurrSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+                where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+        logger.info("normal_queue_curr_size result: " + 
updatedNormalQueueCurrSizeResult)
+
+        def updatedNormalQueueCurrElementsResult = sql """select METRIC_VALUE 
from information_schema.file_cache_statistics
+                where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+        logger.info("normal_queue_curr_elements result: " + 
updatedNormalQueueCurrElementsResult)
+
+        // Check if updated values are greater than initial values
+        double updatedNormalQueueCurrSize = 
Double.valueOf(updatedNormalQueueCurrSizeResult[0][0])
+        double updatedNormalQueueCurrElements = 
Double.valueOf(updatedNormalQueueCurrElementsResult[0][0])
+
+        logger.info("Updated normal queue curr size and elements - size: 
${updatedNormalQueueCurrSize} , " +
+                "elements: ${updatedNormalQueueCurrElements}")
+
+        assertTrue(updatedNormalQueueCurrSize > 0.0, 
NORMAL_QUEUE_CURR_SIZE_NOT_GREATER_THAN_ZERO_MSG)
+        assertTrue(updatedNormalQueueCurrElements > 0.0, 
NORMAL_QUEUE_CURR_ELEMENTS_NOT_GREATER_THAN_ZERO_MSG)
+
+        logger.info("Normal queue curr size and query cache capacity 
comparison - normal queue curr size: ${updatedNormalQueueCurrSize as Long} , " +
+                "query cache capacity: ${fileCacheCapacity}")
+
+        assertTrue((updatedNormalQueueCurrSize as Long) <= queryCacheCapacity,
+                NORMAL_QUEUE_CURR_SIZE_GREATER_THAN_QUERY_CACHE_CAPACITY_MSG)
+
+        // Get updated values for hit and read counts after cache operations
+        def updatedTotalHitCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+                where METRIC_NAME = 'total_hit_counts' limit 1;"""
+        logger.info("Updated total_hit_counts result: " + 
updatedTotalHitCountsResult)
+
+        def updatedTotalReadCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+                where METRIC_NAME = 'total_read_counts' limit 1;"""
+        logger.info("Updated total_read_counts result: " + 
updatedTotalReadCountsResult)
+
+        // Check if updated values are greater than initial values
+        double updatedTotalHitCounts = 
Double.valueOf(updatedTotalHitCountsResult[0][0])
+        double updatedTotalReadCounts = 
Double.valueOf(updatedTotalReadCountsResult[0][0])
+
+        logger.info("Total hit and read counts comparison - hit counts: 
${initialTotalHitCounts} -> " +
+                "${updatedTotalHitCounts} , read counts: 
${initialTotalReadCounts} -> ${updatedTotalReadCounts}")
+
+        assertTrue(updatedTotalReadCounts > initialTotalReadCounts, 
TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
+    }
+
+    logger.info("===================== End running file cache query limit test 
1 =====================")
+
+    return true;
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
 
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
new file mode 100644
index 00000000000..f2e5d4c2f68
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.dgmimpl.arrays.LongArrayGetAtMetaMethod
+
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+
+final String ERROR_SQL_SUCCEED_MSG = "SQL should have failed but succeeded"
+final String SET_SESSION_VARIABLE_FAILED_MSG = "SQL set session variable 
failed"
+
+suite("test_file_cache_query_limit_config", 
"external_docker,hive,external_docker_hive,p0,external,nonConcurrent") {
+
+    sql """set file_cache_query_limit_percent = 1"""
+    def fileCacheQueryLimitPercentResult = sql """show variables like 
'file_cache_query_limit_percent';"""
+    logger.info("file_cache_query_limit_percent configuration: " + 
fileCacheQueryLimitPercentResult)
+    assertFalse(fileCacheQueryLimitPercentResult.size() == 0 || 
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 1.0,
+            SET_SESSION_VARIABLE_FAILED_MSG)
+
+    sql """set file_cache_query_limit_percent = 20"""
+    fileCacheQueryLimitPercentResult = sql """show variables like 
'file_cache_query_limit_percent';"""
+    logger.info("file_cache_query_limit_percent configuration: " + 
fileCacheQueryLimitPercentResult)
+    assertFalse(fileCacheQueryLimitPercentResult.size() == 0 || 
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 20.0,
+            SET_SESSION_VARIABLE_FAILED_MSG)
+
+    sql """set file_cache_query_limit_percent = 51"""
+    fileCacheQueryLimitPercentResult = sql """show variables like 
'file_cache_query_limit_percent';"""
+    logger.info("file_cache_query_limit_percent configuration: " + 
fileCacheQueryLimitPercentResult)
+    assertFalse(fileCacheQueryLimitPercentResult.size() == 0 || 
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 51.0,
+            SET_SESSION_VARIABLE_FAILED_MSG)
+
+
+    sql """set file_cache_query_limit_percent = 100"""
+    fileCacheQueryLimitPercentResult = sql """show variables like 
'file_cache_query_limit_percent';"""
+    logger.info("file_cache_query_limit_percent configuration: " + 
fileCacheQueryLimitPercentResult)
+    assertFalse(fileCacheQueryLimitPercentResult.size() == 0 || 
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 100.0,
+            SET_SESSION_VARIABLE_FAILED_MSG)
+
+    try {
+        sql """set file_cache_query_limit_percent = -1"""
+        assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+    } catch (Exception e) {
+        logger.info("SQL failed as expected: ${e.message}")
+    }
+
+    try {
+        sql """set file_cache_query_limit_percent = 0"""
+        assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+    } catch (Exception e) {
+        logger.info("SQL failed as expected: ${e.message}")
+    }
+
+    try {
+        sql """set file_cache_query_limit_percent = 101"""
+        assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+    } catch (Exception e) {
+        logger.info("SQL failed as expected: ${e.message}")
+    }
+
+    try {
+        sql """set file_cache_query_limit_percent = 1000000"""
+        assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+    } catch (Exception e) {
+        logger.info("SQL failed as expected: ${e.message}")
+    }
+
+    // Set frontend configuration parameters for 
file_cache_query_limit_max_percent
+    setFeConfigTemporary([
+            "file_cache_query_limit_max_percent": "50"
+    ]) {
+        // Execute test logic with modified configuration for 
file_cache_query_limit_max_percent
+        logger.info("Backend configuration set - 
file_cache_query_limit_max_percent: 50")
+
+        sql """set file_cache_query_limit_percent = 1"""
+        fileCacheQueryLimitPercentResult = sql """show variables like 
'file_cache_query_limit_percent';"""
+        logger.info("file_cache_query_limit_percent configuration: " + 
fileCacheQueryLimitPercentResult)
+        assertFalse(fileCacheQueryLimitPercentResult.size() == 0 || 
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 1.0,
+                SET_SESSION_VARIABLE_FAILED_MSG)
+
+        sql """set file_cache_query_limit_percent = 20"""
+        fileCacheQueryLimitPercentResult = sql """show variables like 
'file_cache_query_limit_percent';"""
+        logger.info("file_cache_query_limit_percent configuration: " + 
fileCacheQueryLimitPercentResult)
+        assertFalse(fileCacheQueryLimitPercentResult.size() == 0 || 
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 20.0,
+                SET_SESSION_VARIABLE_FAILED_MSG)
+
+        try {
+            sql """set file_cache_query_limit_percent = -1"""
+            assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+        } catch (Exception e) {
+            logger.info("SQL failed as expected: ${e.message}")
+        }
+
+        try {
+            sql """set file_cache_query_limit_percent = 0"""
+            assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+        } catch (Exception e) {
+            logger.info("SQL failed as expected: ${e.message}")
+        }
+
+        try {
+            sql """set file_cache_query_limit_percent = 51"""
+            assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+        } catch (Exception e) {
+            logger.info("SQL failed as expected: ${e.message}")
+        }
+    }
+
+    return true;
+}
+
diff --git 
a/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
 
b/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
index b0984445e5f..965dfdbcaa9 100644
--- 
a/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
+++ 
b/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
@@ -89,150 +89,171 @@ suite("test_file_cache_statistics", 
"external_docker,hive,external_docker_hive,p
     // do it twice to make sure the table block could hit the cache
     order_qt_1 """select * from 
${catalog_name}.${ex_db_name}.parquet_partition_table where l_orderkey=1 and 
l_partkey=1534 limit 1;"""
 
-    // brpc metrics will be updated at most 20 seconds
-    Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until{
-        // ===== Hit Ratio Metrics Check =====
-        // Check overall hit ratio hits_ratio
-        def hitsRatioResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio' limit 
1;"""
-        logger.info("hits_ratio result: " + hitsRatioResult)
-        
-        // Check 1-hour hit ratio hits_ratio_1h
-        def hitsRatio1hResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_1h' 
limit 1;"""
-        logger.info("hits_ratio_1h result: " + hitsRatio1hResult)
-        
-        // Check 5-minute hit ratio hits_ratio_5m
-        def hitsRatio5mResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_5m' 
limit 1;"""
-        logger.info("hits_ratio_5m result: " + hitsRatio5mResult)
-        
-        // Check if all three metrics exist and are greater than 0
-        boolean hasHitsRatio = hitsRatioResult.size() > 0 && 
Double.valueOf(hitsRatioResult[0][0]) > 0
-        boolean hasHitsRatio1h = hitsRatio1hResult.size() > 0 && 
Double.valueOf(hitsRatio1hResult[0][0]) > 0
-        boolean hasHitsRatio5m = hitsRatio5mResult.size() > 0 && 
Double.valueOf(hitsRatio5mResult[0][0]) > 0
-        
-        logger.info("Hit ratio metrics check result - hits_ratio: 
${hasHitsRatio}, hits_ratio_1h: ${hasHitsRatio1h}, hits_ratio_5m: 
${hasHitsRatio5m}")
-        
-        // Return false if any metric is false, otherwise return true
-        if (!hasHitsRatio) {
-            logger.info(HIT_RATIO_METRIC_FALSE_MSG)
-            assertTrue(false, HIT_RATIO_METRIC_FALSE_MSG)
-        }
-        if (!hasHitsRatio1h) {
-            logger.info(HIT_RATIO_1H_METRIC_FALSE_MSG)
-            assertTrue(false, HIT_RATIO_1H_METRIC_FALSE_MSG)
-        }
-        if (!hasHitsRatio5m) {
-            logger.info(HIT_RATIO_5M_METRIC_FALSE_MSG)
-            assertTrue(false, HIT_RATIO_5M_METRIC_FALSE_MSG)
-        }
-        // ===== End Hit Ratio Metrics Check =====
-        
-        // ===== Normal Queue Metrics Check =====
-        // Check normal queue current size and max size
-        def normalQueueCurrSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics 
-            where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
-        logger.info("normal_queue_curr_size result: " + 
normalQueueCurrSizeResult)
-        
-        def normalQueueMaxSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics 
-            where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
-        logger.info("normal_queue_max_size result: " + 
normalQueueMaxSizeResult)
-        
-        // Check normal queue current elements and max elements
-        def normalQueueCurrElementsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics 
-            where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
-        logger.info("normal_queue_curr_elements result: " + 
normalQueueCurrElementsResult)
-        
-        def normalQueueMaxElementsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics 
-            where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
-        logger.info("normal_queue_max_elements result: " + 
normalQueueMaxElementsResult)
-        
-        // Check normal queue size metrics
-        boolean hasNormalQueueCurrSize = normalQueueCurrSizeResult.size() > 0 
&& 
-            Double.valueOf(normalQueueCurrSizeResult[0][0]) > 0
-        boolean hasNormalQueueMaxSize = normalQueueMaxSizeResult.size() > 0 && 
-            Double.valueOf(normalQueueMaxSizeResult[0][0]) > 0
-        boolean hasNormalQueueCurrElements = 
normalQueueCurrElementsResult.size() > 0 && 
-            Double.valueOf(normalQueueCurrElementsResult[0][0]) > 0
-        boolean hasNormalQueueMaxElements = 
normalQueueMaxElementsResult.size() > 0 && 
-            Double.valueOf(normalQueueMaxElementsResult[0][0]) > 0
-        
-        // Check if current size is less than max size and current elements is 
less than max elements
-        boolean normalQueueSizeValid = hasNormalQueueCurrSize && 
hasNormalQueueMaxSize && 
-            Double.valueOf(normalQueueCurrSizeResult[0][0]) < 
Double.valueOf(normalQueueMaxSizeResult[0][0])
-        boolean normalQueueElementsValid = hasNormalQueueCurrElements && 
hasNormalQueueMaxElements && 
-            Double.valueOf(normalQueueCurrElementsResult[0][0]) < 
Double.valueOf(normalQueueMaxElementsResult[0][0])
-        
-        logger.info("Normal queue metrics check result - size valid: 
${normalQueueSizeValid}, " +
-            "elements valid: ${normalQueueElementsValid}")
-        
-        if (!normalQueueSizeValid) {
-            logger.info(NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
-            assertTrue(false, NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
-        }
-        if (!normalQueueElementsValid) {
-            logger.info(NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
-            assertTrue(false, NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
-        }
-        // ===== End Normal Queue Metrics Check =====
-        
-        // ===== Hit and Read Counts Metrics Check =====
-        // Get initial values for hit and read counts
-        def initialHitCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics 
-            where METRIC_NAME = 'total_hit_counts' limit 1;"""
-        logger.info("Initial total_hit_counts result: " + 
initialHitCountsResult)
-        
-        def initialReadCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics 
-            where METRIC_NAME = 'total_read_counts' limit 1;"""
-        logger.info("Initial total_read_counts result: " + 
initialReadCountsResult)
-        
-        // Check if initial values exist and are greater than 0
-        if (initialHitCountsResult.size() == 0 || 
Double.valueOf(initialHitCountsResult[0][0]) <= 0) {
-            logger.info(INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
-            assertTrue(false, INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
-        }
-        if (initialReadCountsResult.size() == 0 || 
Double.valueOf(initialReadCountsResult[0][0]) <= 0) {
-            logger.info(INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
-            assertTrue(false, INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
-        }
-        
-        // Store initial values
-        double initialHitCounts = Double.valueOf(initialHitCountsResult[0][0])
-        double initialReadCounts = 
Double.valueOf(initialReadCountsResult[0][0])
-        
-        // Execute the same query to trigger cache operations
-        order_qt_2 """select * from 
${catalog_name}.${ex_db_name}.parquet_partition_table 
-            where l_orderkey=1 and l_partkey=1534 limit 1;"""
-        
-        // Get updated values after cache operations
-        def updatedHitCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics 
-            where METRIC_NAME = 'total_hit_counts' limit 1;"""
-        logger.info("Updated total_hit_counts result: " + 
updatedHitCountsResult)
-        
-        def updatedReadCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics 
-            where METRIC_NAME = 'total_read_counts' limit 1;"""
-        logger.info("Updated total_read_counts result: " + 
updatedReadCountsResult)
-        
-        // Check if updated values are greater than initial values
-        double updatedHitCounts = Double.valueOf(updatedHitCountsResult[0][0])
-        double updatedReadCounts = 
Double.valueOf(updatedReadCountsResult[0][0])
-        
-        boolean hitCountsIncreased = updatedHitCounts > initialHitCounts
-        boolean readCountsIncreased = updatedReadCounts > initialReadCounts
-        
-        logger.info("Hit and read counts comparison - hit_counts: 
${initialHitCounts} -> " +
-            "${updatedHitCounts} (increased: ${hitCountsIncreased}), 
read_counts: ${initialReadCounts} -> " +
-            "${updatedReadCounts} (increased: ${readCountsIncreased})")
-        
-        if (!hitCountsIncreased) {
-            logger.info(TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
-            assertTrue(false, TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
-        }
-        if (!readCountsIncreased) {
-            logger.info(TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
-            assertTrue(false, TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
-        }
-        // ===== End Hit and Read Counts Metrics Check =====
-        sql """set global enable_file_cache=false"""
-        return true
+    def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend 
config like 'file_cache_background_monitor_interval_ms';"""
+    logger.info("file_cache_background_monitor_interval_ms configuration: " + 
fileCacheBackgroundMonitorIntervalMsResult)
+    assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 || 
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+            fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(), 
"file_cache_background_monitor_interval_ms is empty or not set to true")
+
+    // brpc metrics will be updated at most 5 seconds
+    def totalWaitTime = 
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toInteger() / 1000) as int
+    def interval = 1
+    def iterations = totalWaitTime / interval
+
+    (1..iterations).each { count ->
+        Thread.sleep(interval * 1000)
+        def elapsedSeconds = count * interval
+        def remainingSeconds = totalWaitTime - elapsedSeconds
+        logger.info("Waited for file cache statistics update ${elapsedSeconds} 
seconds, ${remainingSeconds} seconds remaining")
+    }
+
+    // ===== Hit Ratio Metrics Check =====
+    // Check overall hit ratio hits_ratio
+    def hitsRatioResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio' limit 
1;"""
+    logger.info("hits_ratio result: " + hitsRatioResult)
+
+    // Check 1-hour hit ratio hits_ratio_1h
+    def hitsRatio1hResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_1h' 
limit 1;"""
+    logger.info("hits_ratio_1h result: " + hitsRatio1hResult)
+
+    // Check 5-minute hit ratio hits_ratio_5m
+    def hitsRatio5mResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_5m' 
limit 1;"""
+    logger.info("hits_ratio_5m result: " + hitsRatio5mResult)
+
+    // Check if all three metrics exist and are greater than 0
+    boolean hasHitsRatio = hitsRatioResult.size() > 0 && 
Double.valueOf(hitsRatioResult[0][0]) > 0
+    boolean hasHitsRatio1h = hitsRatio1hResult.size() > 0 && 
Double.valueOf(hitsRatio1hResult[0][0]) > 0
+    boolean hasHitsRatio5m = hitsRatio5mResult.size() > 0 && 
Double.valueOf(hitsRatio5mResult[0][0]) > 0
+
+    logger.info("Hit ratio metrics check result - hits_ratio: ${hasHitsRatio}, 
hits_ratio_1h: ${hasHitsRatio1h}, hits_ratio_5m: ${hasHitsRatio5m}")
+
+    // Return false if any metric is false, otherwise return true
+    if (!hasHitsRatio) {
+        logger.info(HIT_RATIO_METRIC_FALSE_MSG)
+        assertTrue(false, HIT_RATIO_METRIC_FALSE_MSG)
+    }
+    if (!hasHitsRatio1h) {
+        logger.info(HIT_RATIO_1H_METRIC_FALSE_MSG)
+        assertTrue(false, HIT_RATIO_1H_METRIC_FALSE_MSG)
+    }
+    if (!hasHitsRatio5m) {
+        logger.info(HIT_RATIO_5M_METRIC_FALSE_MSG)
+        assertTrue(false, HIT_RATIO_5M_METRIC_FALSE_MSG)
+    }
+    // ===== End Hit Ratio Metrics Check =====
+
+    // ===== Normal Queue Metrics Check =====
+    // Check normal queue current size and max size
+    def normalQueueCurrSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+        where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+    logger.info("normal_queue_curr_size result: " + normalQueueCurrSizeResult)
+
+    def normalQueueMaxSizeResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+        where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
+    logger.info("normal_queue_max_size result: " + normalQueueMaxSizeResult)
+
+    // Check normal queue current elements and max elements
+    def normalQueueCurrElementsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+        where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+    logger.info("normal_queue_curr_elements result: " + 
normalQueueCurrElementsResult)
+
+    def normalQueueMaxElementsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+        where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
+    logger.info("normal_queue_max_elements result: " + 
normalQueueMaxElementsResult)
+
+    // Check normal queue size metrics
+    boolean hasNormalQueueCurrSize = normalQueueCurrSizeResult.size() > 0 &&
+        Double.valueOf(normalQueueCurrSizeResult[0][0]) > 0
+    boolean hasNormalQueueMaxSize = normalQueueMaxSizeResult.size() > 0 &&
+        Double.valueOf(normalQueueMaxSizeResult[0][0]) > 0
+    boolean hasNormalQueueCurrElements = normalQueueCurrElementsResult.size() 
> 0 &&
+        Double.valueOf(normalQueueCurrElementsResult[0][0]) > 0
+    boolean hasNormalQueueMaxElements = normalQueueMaxElementsResult.size() > 
0 &&
+        Double.valueOf(normalQueueMaxElementsResult[0][0]) > 0
+
+    // Check if current size is less than max size and current elements is 
less than max elements
+    boolean normalQueueSizeValid = hasNormalQueueCurrSize && 
hasNormalQueueMaxSize &&
+        Double.valueOf(normalQueueCurrSizeResult[0][0]) < 
Double.valueOf(normalQueueMaxSizeResult[0][0])
+    boolean normalQueueElementsValid = hasNormalQueueCurrElements && 
hasNormalQueueMaxElements &&
+        Double.valueOf(normalQueueCurrElementsResult[0][0]) < 
Double.valueOf(normalQueueMaxElementsResult[0][0])
+
+    logger.info("Normal queue metrics check result - size valid: 
${normalQueueSizeValid}, " +
+        "elements valid: ${normalQueueElementsValid}")
+
+    if (!normalQueueSizeValid) {
+        logger.info(NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
+        assertTrue(false, NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
+    }
+    if (!normalQueueElementsValid) {
+        logger.info(NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
+        assertTrue(false, NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
+    }
+    // ===== End Normal Queue Metrics Check =====
+
+    // ===== Hit and Read Counts Metrics Check =====
+    // Get initial values for hit and read counts
+    def initialHitCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+        where METRIC_NAME = 'total_hit_counts' limit 1;"""
+    logger.info("Initial total_hit_counts result: " + initialHitCountsResult)
+
+    def initialReadCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+        where METRIC_NAME = 'total_read_counts' limit 1;"""
+    logger.info("Initial total_read_counts result: " + initialReadCountsResult)
+
+    // Check if initial values exist and are greater than 0
+    if (initialHitCountsResult.size() == 0 || 
Double.valueOf(initialHitCountsResult[0][0]) <= 0) {
+        logger.info(INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
+        assertTrue(false, INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
+    }
+    if (initialReadCountsResult.size() == 0 || 
Double.valueOf(initialReadCountsResult[0][0]) <= 0) {
+        logger.info(INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
+        assertTrue(false, INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
+    }
+
+    // Store initial values
+    double initialHitCounts = Double.valueOf(initialHitCountsResult[0][0])
+    double initialReadCounts = Double.valueOf(initialReadCountsResult[0][0])
+
+    (1..iterations).each { count ->
+        Thread.sleep(interval * 1000)
+        def elapsedSeconds = count * interval
+        def remainingSeconds = totalWaitTime - elapsedSeconds
+        logger.info("Waited for file cache statistics update ${elapsedSeconds} 
seconds, ${remainingSeconds} seconds remaining")
+    }
+
+    // Execute the same query to trigger cache operations
+    order_qt_2 """select * from 
${catalog_name}.${ex_db_name}.parquet_partition_table
+        where l_orderkey=1 and l_partkey=1534 limit 1;"""
+
+    // Get updated values after cache operations
+    def updatedHitCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+        where METRIC_NAME = 'total_hit_counts' limit 1;"""
+    logger.info("Updated total_hit_counts result: " + updatedHitCountsResult)
+
+    def updatedReadCountsResult = sql """select METRIC_VALUE from 
information_schema.file_cache_statistics
+        where METRIC_NAME = 'total_read_counts' limit 1;"""
+    logger.info("Updated total_read_counts result: " + updatedReadCountsResult)
+
+    // Check if updated values are greater than initial values
+    double updatedHitCounts = Double.valueOf(updatedHitCountsResult[0][0])
+    double updatedReadCounts = Double.valueOf(updatedReadCountsResult[0][0])
+
+    boolean hitCountsIncreased = updatedHitCounts > initialHitCounts
+    boolean readCountsIncreased = updatedReadCounts > initialReadCounts
+
+    logger.info("Hit and read counts comparison - hit_counts: 
${initialHitCounts} -> " +
+        "${updatedHitCounts} (increased: ${hitCountsIncreased}), read_counts: 
${initialReadCounts} -> " +
+        "${updatedReadCounts} (increased: ${readCountsIncreased})")
+
+    if (!hitCountsIncreased) {
+        logger.info(TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
+        assertTrue(false, TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
+    }
+    if (!readCountsIncreased) {
+        logger.info(TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
+        assertTrue(false, TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
     }
+    // ===== End Hit and Read Counts Metrics Check =====
+    sql """set global enable_file_cache=false"""
+    return true
 }
 
diff --git 
a/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy 
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
index 7d270ffe3b8..d1adc2b6b84 100644
--- 
a/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
+++ 
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_hive_warmup_select", 
"p0,external,hive,external_docker,external_docker_hive") {
+suite("test_hive_warmup_select", 
"p0,external,hive,external_docker,external_docker_hive,nonConcurrent") {
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled == null || !enabled.equalsIgnoreCase("true")) {
         logger.info("disable Hive test.")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to