This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0ea9465a374 [fix][enhancement](filecache) filecache query limit
feature bugfix and enhancement (#55772)
0ea9465a374 is described below
commit 0ea9465a374e56e4ceaa12b751b5b1184a0b6eba
Author: Wen Zhenghu <[email protected]>
AuthorDate: Sat Jan 24 22:37:24 2026 +0800
[fix][enhancement](filecache) filecache query limit feature bugfix and
enhancement (#55772)
related issue https://github.com/apache/doris/issues/52299
### What problem does this PR solve?
The code framework for the FileCache Query-Level Quota feature already
exists in the master branch, but have the following issues:
1. **Missing feature entry point**, making it impossible to use
normally. This PR adds the usage entry point and includes session-level
switch parameters.
2. **Lack of independent quota control parameters**. Previously, quota
could only be configured in `file_cache_path`. This PR adds new
configuration items for independent quota control.
3. **Missing test cases**. This PR adds necessary unit test cases and
regression test cases.
---
For readers who prefer Chinese, please refer to the Chinese version of
this document.
[Apache Doris 文件缓存查询限制(query
limit)功能PR说明.pdf](https://github.com/user-attachments/files/24581992/Apache.Doris.query.limit.PR.pdf)
File Cache Query Limit New Parameter System and Implementation
Architecture
1. Configuration Parameter System
1.1 BE-Side Configuration Parameters
• enable_file_cache_query_limit: Boolean type, controls whether to
enable the file cache query limit feature.
◦ Function: Serves as the master switch on the BE side. Only when
enabled will the BE process the query limit parameters passed from the
FE .
1.2 FE-Side Configuration Parameters
• file_cache_query_limit_percent_soft: Integer type, default value is
100.
◦ Function: Soft limit threshold, used to validate the upper limit of
session variables' effectiveness .
1.3 FE-Side Session Variables
• file_cache_query_limit_percent: Integer type (1-100), sets the
percentage of file cache a query can use.
◦ Function: Global session-level file cache query limit percentage .
• policy_file_cache_query_limit_percent: Integer type (1-100),
policy-level file cache query limit percentage.
◦ Function: Policy-level file cache query limit, which can override the
global setting .
1.4 Interrelationships of the Three Parameters
file_cache_query_limit_percent_soft (FE Config)
↓ (Validates upper limit)
file_cache_query_limit_percent (Session Variable)
↓ (Can be overridden)
policy_file_cache_query_limit_percent (Policy Session Variable)
↓ (Final effective value passed to BE)
TQueryOptions.file_cache_query_limit_percent
Relationship Description:
1. Validation Relationship: The value of file_cache_query_limit_percent
must be between 0 and file_cache_query_limit_percent_soft .
3. Priority Relationship: policy_file_cache_query_limit_percent has
higher priority. When set, it overrides file_cache_query_limit_percent .
4. Effectiveness Mechanism: The FE ultimately passes the validated
percentage value to the BE for execution via the Thrift protocol .
2. Parameter Passing Mechanism
2.1 Thrift Protocol Definition
The parameter is defined within the Thrift structure used for query
options .
```
struct TQueryOptions {
...
1001: optional i32 file_cache_query_limit_percent = -1
...
}
```
2.2 Complete FE-to-BE Passing Process
1. Session Variable Setting Phase:
SET file_cache_query_limit_percent = 50;
SET policy_file_cache_query_limit_percent = 30; -- Optional, overrides
the above setting
2. Parameter Validation Phase:
◦ The FE validates if the set value is within the range [0,
file_cache_query_limit_percent_soft] .
◦ Validation logic, e.g., "setFileCacheQueryLimitPercent".
3. Query Options Construction Phase:
◦ The FE selects the final effective percentage value based on priority
rules .
◦ The selected value is set into the
TQueryOptions.file_cache_query_limit_percent field .
5. Thrift Transmission Phase:
◦ The TQueryOptions structure is passed to the BE via Thrift RPC .
6. BE-Side Reception and Processing Phase:
◦ The BE reads query_options.file_cache_query_limit_percent during query
context initialization .
◦ A query-level file cache context is created based on this value .
3. Core Implementation Architecture
3.1 Query Context Management
• Initialization Condition Check:
Within the "QueryContext" constructor, initialization occurs if specific
conditions are met, including the relevant BE configuration switches and
the passed query option value .
```
bool initialize_context_holder =
config::enable_file_cache && // BE
config switch
config::enable_file_cache_query_limit && // BE
query limit switch
query_options.__isset.enable_file_cache && //
FE-passed cache switch
query_options.enable_file_cache && // Cache
feature enabled
query_options.file_cache_query_limit_percent < 100; //
Percentage limit effective
```
• Context Creation:
```
if (initialize_context_holder) {
_query_context_holders =
io::FileCacheFactory::instance()->get_query_context_holders(
_query_id, query_options.file_cache_query_limit_percent);
}
```
3.2 Cache Capacity Calculation Mechanism
• Factory Method Call Chain: The factory method is responsible for
creating or retrieving the query context based on the query ID and the
limit percentage .
• Single Cache Instance Context Creation: The cache instance creates a
context holder for the query, utilizing the provided percentage .
3.3 Query Cache Capacity Allocation Strategy
• Capacity Calculation Logic:
The actual capacity allocated to a query is calculated as the maximum of
a percentage of the total cache capacity and a guaranteed minimum
capacity .
```
auto query_context = std::make_shared<QueryFileCacheContext>(
std::max(_capacity * file_cache_query_limit_percent / 100,
// Calculate by percentage
std::min(_capacity / 2, (size_t)268435456)));
// Minimum guaranteed capacity
```
• Allocation Strategy Description:
1. Percentage Calculation: Total Cache Capacity ×
file_cache_query_limit_percent ÷ 100.
2. Minimum Guarantee: Ensures each query gets at least the lesser of:
▪ 50% of the total cache capacity.
▪ 256MB (268,435,456 bytes).
3. Final Capacity: The greater of the percentage-calculated value and
the minimum guaranteed value .
3.4 Cache Reservation and Management Mechanism
• Query-Level Isolation:
◦ Each query is identified by a TUniqueId and has an independent cache
context .
◦ The query context manages the maximum cache capacity available for
that query .
◦ Lifecycle management is handled through QueryFileCacheContextHolder .
• Resource Cleanup:
◦ The cache context is automatically released when the query ends .
◦ This prevents memory leaks and resource waste .
◦ Supports resource reclamation upon query interruption .
---
*This implementation provides fine-grained control over FileCache usage
at the query level, preventing cache pollution while maintaining system
stability and performance.*
---------
Co-authored-by: chenhao xu <[email protected]>
Co-authored-by: xuchenhao <[email protected]>
---
be/src/common/config.cpp | 2 +-
be/src/common/config.h | 2 +-
be/src/io/cache/block_file_cache.cpp | 19 +-
be/src/io/cache/block_file_cache.h | 7 +-
be/src/io/cache/block_file_cache_factory.cpp | 6 +-
be/src/io/cache/block_file_cache_factory.h | 2 +-
be/src/runtime/query_context.cpp | 12 +
be/src/runtime/query_context.h | 2 +
be/test/io/cache/block_file_cache_test.cpp | 17 +-
.../main/java/org/apache/doris/common/Config.java | 6 +
.../java/org/apache/doris/qe/SessionVariable.java | 25 ++
gensrc/thrift/PaloInternalService.thrift | 1 +
.../cache/test_file_cache_features.groovy | 91 +++--
.../cache/test_file_cache_query_limit.groovy | 395 +++++++++++++++++++++
.../test_file_cache_query_limit_config.groovy | 124 +++++++
.../cache/test_file_cache_statistics.groovy | 309 ++++++++--------
.../cache/test_hive_warmup_select.groovy | 2 +-
17 files changed, 819 insertions(+), 203 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5e26d6c4ce2..3b9239a8b1a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1133,7 +1133,7 @@ DEFINE_String(file_cache_path,
"[{\"path\":\"${DORIS_HOME}/file_cache\"}]");
DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB
DEFINE_Bool(clear_file_cache, "false");
-DEFINE_Bool(enable_file_cache_query_limit, "false");
+DEFINE_mBool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "88");
DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 02e0542a579..b1cfc7cd604 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1172,7 +1172,7 @@ DECLARE_Bool(enable_file_cache);
DECLARE_String(file_cache_path);
DECLARE_Int64(file_cache_each_block_size);
DECLARE_Bool(clear_file_cache);
-DECLARE_Bool(enable_file_cache_query_limit);
+DECLARE_mBool(enable_file_cache_query_limit);
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
DECLARE_mBool(enable_evict_file_cache_in_advance);
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 97454a364ca..0aa4757c089 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -141,8 +141,7 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
const FileCacheSettings& cache_settings)
: _cache_base_path(cache_base_path),
_capacity(cache_settings.capacity),
- _max_file_block_size(cache_settings.max_file_block_size),
- _max_query_cache_size(cache_settings.max_query_cache_size) {
+ _max_file_block_size(cache_settings.max_file_block_size) {
_cur_cache_size_metrics =
std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
"file_cache_cache_size", 0);
_cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
@@ -383,7 +382,7 @@ UInt128Wrapper BlockFileCache::hash(const std::string&
path) {
}
BlockFileCache::QueryFileCacheContextHolderPtr
BlockFileCache::get_query_context_holder(
- const TUniqueId& query_id) {
+ const TUniqueId& query_id, int file_cache_query_limit_percent) {
SCOPED_CACHE_LOCK(_mutex, this);
if (!config::enable_file_cache_query_limit) {
return {};
@@ -391,7 +390,7 @@ BlockFileCache::QueryFileCacheContextHolderPtr
BlockFileCache::get_query_context
/// if enable_filesystem_query_cache_limit is true,
/// we create context query for current query.
- auto context = get_or_set_query_context(query_id, cache_lock);
+ auto context = get_or_set_query_context(query_id, cache_lock,
file_cache_query_limit_percent);
return std::make_unique<QueryFileCacheContextHolder>(query_id, this,
context);
}
@@ -411,7 +410,8 @@ void BlockFileCache::remove_query_context(const TUniqueId&
query_id) {
}
BlockFileCache::QueryFileCacheContextPtr
BlockFileCache::get_or_set_query_context(
- const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
+ const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
+ int file_cache_query_limit_percent) {
if (query_id.lo == 0 && query_id.hi == 0) {
return nullptr;
}
@@ -421,7 +421,14 @@ BlockFileCache::QueryFileCacheContextPtr
BlockFileCache::get_or_set_query_contex
return context;
}
- auto query_context =
std::make_shared<QueryFileCacheContext>(_max_query_cache_size);
+ size_t file_cache_query_limit_size = _capacity *
file_cache_query_limit_percent / 100;
+ if (file_cache_query_limit_size < 268435456) {
+ LOG(WARNING) << "The user-set file cache query limit (" <<
file_cache_query_limit_size
+ << " bytes) is less than the 256MB recommended minimum. "
+ << "Consider increasing the session variable
'file_cache_query_limit_percent'"
+ << " from its current value " <<
file_cache_query_limit_percent << "%.";
+ }
+ auto query_context =
std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
auto query_iter = _query_map.emplace(query_id, query_context).first;
return query_iter->second;
}
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index 50c6fe349a9..53b82d24fd9 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -347,7 +347,8 @@ public:
void remove_query_context(const TUniqueId& query_id);
QueryFileCacheContextPtr get_or_set_query_context(const TUniqueId&
query_id,
-
std::lock_guard<std::mutex>&);
+
std::lock_guard<std::mutex>& cache_lock,
+ int
file_cache_query_limit_percent);
/// Save a query context information, and adopt different cache policies
/// for different queries through the context cache layer.
@@ -373,7 +374,8 @@ public:
QueryFileCacheContextPtr context;
};
using QueryFileCacheContextHolderPtr =
std::unique_ptr<QueryFileCacheContextHolder>;
- QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId&
query_id);
+ QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId&
query_id,
+ int
file_cache_query_limit_percent);
int64_t approximate_available_cache_size() const {
return std::max<int64_t>(
@@ -501,7 +503,6 @@ private:
std::string _cache_base_path;
size_t _capacity = 0;
size_t _max_file_block_size = 0;
- size_t _max_query_cache_size = 0;
mutable std::mutex _mutex;
bool _close {false};
diff --git a/be/src/io/cache/block_file_cache_factory.cpp
b/be/src/io/cache/block_file_cache_factory.cpp
index afbbe69afaa..da7ef80e58a 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -232,10 +232,12 @@ BlockFileCache* FileCacheFactory::get_by_path(const
std::string& cache_base_path
}
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr>
-FileCacheFactory::get_query_context_holders(const TUniqueId& query_id) {
+FileCacheFactory::get_query_context_holders(const TUniqueId& query_id,
+ int
file_cache_query_limit_percent) {
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr> holders;
for (const auto& cache : _caches) {
- holders.push_back(cache->get_query_context_holder(query_id));
+ holders.push_back(
+ cache->get_query_context_holder(query_id,
file_cache_query_limit_percent));
}
return holders;
}
diff --git a/be/src/io/cache/block_file_cache_factory.h
b/be/src/io/cache/block_file_cache_factory.h
index 6b746f2a52d..c015f7bcf62 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -77,7 +77,7 @@ public:
BlockFileCache* get_by_path(const UInt128Wrapper& hash);
BlockFileCache* get_by_path(const std::string& cache_base_path);
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr>
get_query_context_holders(
- const TUniqueId& query_id);
+ const TUniqueId& query_id, int file_cache_query_limit_percent);
/**
* Clears data of all file cache instances
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index aa5fd4b564f..a0c271efd39 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -111,6 +111,18 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv*
exec_env,
_timeout_second = query_options.execution_timeout;
+ bool initialize_context_holder =
+ config::enable_file_cache && config::enable_file_cache_query_limit
&&
+ query_options.__isset.enable_file_cache &&
query_options.enable_file_cache &&
+ query_options.__isset.file_cache_query_limit_percent &&
+ query_options.file_cache_query_limit_percent < 100;
+
+ // Initialize file cache context holders
+ if (initialize_context_holder) {
+ _query_context_holders =
io::FileCacheFactory::instance()->get_query_context_holders(
+ _query_id, query_options.file_cache_query_limit_percent);
+ }
+
bool is_query_type_valid = query_options.query_type == TQueryType::SELECT
||
query_options.query_type == TQueryType::LOAD ||
query_options.query_type ==
TQueryType::EXTERNAL;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c504dcc66aa..5c77a427f59 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -388,6 +388,8 @@ private:
std::string _load_error_url;
std::string _first_error_msg;
+ // file cache context holders
+ std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr>
_query_context_holders;
// instance id + node id -> cte scan
std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*>
_cte_scan;
std::mutex _cte_scan_lock;
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 52182726edd..98970d26b39 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -983,6 +983,7 @@ TEST_F(BlockFileCacheTest, init) {
)");
cache_paths.clear();
EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
+ config::enable_file_cache_query_limit = false;
}
TEST_F(BlockFileCacheTest, normal) {
@@ -1257,7 +1258,7 @@ TEST_F(BlockFileCacheTest,
query_limit_heap_use_after_free) {
query_id.hi = 1;
query_id.lo = 1;
context.query_id = query_id;
- auto query_context_holder = cache.get_query_context_holder(query_id);
+ auto query_context_holder = cache.get_query_context_holder(query_id, 100);
{
auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9,
9]
auto blocks = fromHolder(holder);
@@ -1344,7 +1345,7 @@ TEST_F(BlockFileCacheTest, query_limit_dcheck) {
query_id.hi = 1;
query_id.lo = 1;
context.query_id = query_id;
- auto query_context_holder = cache.get_query_context_holder(query_id);
+ auto query_context_holder = cache.get_query_context_holder(query_id, 100);
{
auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9,
9]
auto blocks = fromHolder(holder);
@@ -3003,7 +3004,7 @@ TEST_F(BlockFileCacheTest, test_query_limit) {
}
ASSERT_LT(i, 1000);
auto query_context_holder =
-
FileCacheFactory::instance()->get_query_context_holders(query_id);
+
FileCacheFactory::instance()->get_query_context_holders(query_id, 50);
for (int64_t offset = 0; offset < 60; offset += 5) {
auto holder = cache->get_or_set(key, offset, 5, context);
auto blocks = fromHolder(holder);
@@ -3178,7 +3179,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- EXPECT_EQ(cache.get_query_context_holder(id), nullptr);
+ EXPECT_EQ(cache.get_query_context_holder(id, 50), nullptr);
}
config::enable_file_cache_query_limit = true;
io::BlockFileCache cache(cache_base_path, settings);
@@ -3190,9 +3191,9 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
id.hi = id.lo = 0;
- EXPECT_EQ(cache.get_query_context_holder(id)->context, nullptr);
+ EXPECT_EQ(cache.get_query_context_holder(id, 50)->context, nullptr);
id.hi = id.lo = 1;
- auto query_ctx_1 = cache.get_query_context_holder(id);
+ auto query_ctx_1 = cache.get_query_context_holder(id, 50);
ASSERT_NE(query_ctx_1, nullptr);
for (int64_t offset = 0; offset < 60; offset += 5) {
auto holder = cache.get_or_set(key, offset, 5, context);
@@ -3205,7 +3206,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::DOWNLOADED);
}
- auto query_ctx_2 = cache.get_query_context_holder(id);
+ auto query_ctx_2 = cache.get_query_context_holder(id, 50);
EXPECT_EQ(query_ctx_1->query_id, query_ctx_2->query_id);
std::lock_guard lock(cache._mutex);
EXPECT_EQ(query_ctx_1->context->get_cache_size(lock),
@@ -3248,7 +3249,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) {
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- auto query_ctx_1 = cache.get_query_context_holder(id);
+ auto query_ctx_1 = cache.get_query_context_holder(id, 50);
ASSERT_NE(query_ctx_1, nullptr);
{
auto holder = cache.get_or_set(key, 0, 5, context);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 16ab9a68c32..448c7747b46 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3771,6 +3771,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static String aws_credentials_provider_version = "v2";
+ @ConfField(mutable = true, description = {
+ "用户的单个查询能使用的 FILE_CACHE 比例的软上限(取值范围 1 到 100),100表示能够使用全量 FILE_CACHE",
+ "The soft upper limit of FILE_CACHE percent that a single query of a
user can use, (range: 1 to 100).",
+ "100 indicate that the full FILE_CACHE capacity can be used. "
+ })
+ public static int file_cache_query_limit_max_percent = 100;
@ConfField(description = {
"AWS SDK 用于调度异步重试、超时任务以及其他后台操作的线程池大小,全局共享",
"The thread pool size used by the AWS SDK to schedule asynchronous
retries, timeout tasks, "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 54ae4852c0c..f1897e325aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -476,6 +476,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String DISABLE_FILE_CACHE = "disable_file_cache";
+ public static final String FILE_CACHE_QUERY_LIMIT_PERCENT =
"file_cache_query_limit_percent";
+
public static final String FILE_CACHE_BASE_PATH = "file_cache_base_path";
public static final String ENABLE_INVERTED_INDEX_QUERY =
"enable_inverted_index_query";
@@ -2768,6 +2770,23 @@ public class SessionVariable implements Serializable,
Writable {
"Make the READ_SLICE_SIZE variable configurable to reduce the
impact caused by read amplification."})
public int mergeReadSliceSizeBytes = 8388608;
+ @VariableMgr.VarAttr(name = FILE_CACHE_QUERY_LIMIT_PERCENT, needForward =
true,
+ checker = "checkFileCacheQueryLimitPercent",
+ description = {"限制用户的单个查询能使用的 FILE_CACHE 比例 "
+ + "(用户设置,取值范围 1 到
Config.file_cache_query_limit_max_percent)。",
+ "Limit the FILE_CACHE percent that a single query of a
user can use "
+ + "(set by user via session variables, range: 1 to
Config.file_cache_query_limit_max_percent)."})
+ public int fileCacheQueryLimitPercent = -1;
+
+ public void checkFileCacheQueryLimitPercent(String
fileCacheQueryLimitPercentStr) {
+ int fileCacheQueryLimitPct =
Integer.valueOf(fileCacheQueryLimitPercentStr);
+ if (fileCacheQueryLimitPct < 1 || fileCacheQueryLimitPct >
Config.file_cache_query_limit_max_percent) {
+ throw new InvalidParameterException(
+ String.format("file_cache_query_limit_percent should be
between 1 and %d",
+ Config.file_cache_query_limit_max_percent));
+ }
+ }
+
public void setAggPhase(int phase) {
aggPhase = phase;
}
@@ -5101,6 +5120,12 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setIvfNprobe(ivfNprobe);
tResult.setMergeReadSliceSize(mergeReadSliceSizeBytes);
tResult.setEnableExtendedRegex(enableExtendedRegex);
+ if (fileCacheQueryLimitPercent > 0) {
+
tResult.setFileCacheQueryLimitPercent(Math.min(fileCacheQueryLimitPercent,
+ Config.file_cache_query_limit_max_percent));
+ } else {
+
tResult.setFileCacheQueryLimitPercent(Config.file_cache_query_limit_max_percent);
+ }
// Set Iceberg write target file size
tResult.setIcebergWriteTargetFileSizeBytes(icebergWriteTargetFileSizeBytes);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 9d7cc861a6d..133dce592e7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -429,6 +429,7 @@ struct TQueryOptions {
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
1000: optional bool disable_file_cache = false
+ 1001: optional i32 file_cache_query_limit_percent = -1
}
diff --git
a/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
b/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
index 5951b9c79c5..aad83103023 100644
---
a/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
+++
b/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
@@ -121,7 +121,17 @@ suite("test_file_cache_features",
"external_docker,hive,external_docker_hive,p0,
assertTrue(false, INITIAL_VALUES_NOT_ZERO_CHECK_FAILED_MSG +
"disk_resource_limit_mode: ${initialDiskResourceLimitMode},
need_evict_cache_in_advance: ${initialNeedEvictCacheInAdvance}")
}
-
+
+ def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend
config like 'file_cache_background_monitor_interval_ms';"""
+ logger.info("file_cache_background_monitor_interval_ms configuration: " +
fileCacheBackgroundMonitorIntervalMsResult)
+ assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 ||
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+ fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(),
"file_cache_background_monitor_interval_ms is empty or not set to true")
+
+ // brpc metrics will be updated at most 5 seconds
+ def totalWaitTime =
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toInteger() / 1000) as int
+ def interval = 1
+ def iterations = totalWaitTime / interval
+
// Set backend configuration parameters for testing
boolean diskResourceLimitModeTestPassed = true
setBeConfigTemporary([
@@ -134,26 +144,31 @@ suite("test_file_cache_features",
"external_docker,hive,external_docker_hive,p0,
// Wait for disk_resource_limit_mode metric to change to 1
try {
- Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until {
- def updatedDiskResourceLimitModeResult = sql """select
METRIC_VALUE from information_schema.file_cache_statistics
- where METRIC_NAME = 'disk_resource_limit_mode' limit 1;"""
- logger.info("Checking disk_resource_limit_mode result: " +
updatedDiskResourceLimitModeResult)
-
- if (updatedDiskResourceLimitModeResult.size() > 0) {
- double updatedDiskResourceLimitMode =
Double.valueOf(updatedDiskResourceLimitModeResult[0][0])
- logger.info("Current disk_resource_limit_mode value:
${updatedDiskResourceLimitMode}")
-
- if (updatedDiskResourceLimitMode == 1.0) {
- logger.info("Disk resource limit mode is now active
(value = 1)")
- return true
- } else {
- logger.info("Disk resource limit mode is not yet
active (value = ${updatedDiskResourceLimitMode}), waiting...")
- return false
- }
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for backend configuration update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ def updatedDiskResourceLimitModeResult = sql """select
METRIC_VALUE from information_schema.file_cache_statistics
+ where METRIC_NAME = 'disk_resource_limit_mode' limit 1;"""
+ logger.info("Checking disk_resource_limit_mode result: " +
updatedDiskResourceLimitModeResult)
+
+ if (updatedDiskResourceLimitModeResult.size() > 0) {
+ double updatedDiskResourceLimitMode =
Double.valueOf(updatedDiskResourceLimitModeResult[0][0])
+ logger.info("Current disk_resource_limit_mode value:
${updatedDiskResourceLimitMode}")
+
+ if (updatedDiskResourceLimitMode == 1.0) {
+ logger.info("Disk resource limit mode is now active (value
= 1)")
+ return true
} else {
- logger.info("Failed to get disk_resource_limit_mode
metric, waiting...")
+ logger.info("Disk resource limit mode is not yet active
(value = ${updatedDiskResourceLimitMode}), waiting...")
return false
}
+ } else {
+ logger.info("Failed to get disk_resource_limit_mode metric,
waiting...")
+ return false
}
} catch (Exception e) {
logger.info(DISK_RESOURCE_LIMIT_MODE_TEST_FAILED_MSG +
e.getMessage())
@@ -182,26 +197,31 @@ suite("test_file_cache_features",
"external_docker,hive,external_docker_hive,p0,
// Wait for need_evict_cache_in_advance metric to change to 1
try {
- Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until {
- def updatedNeedEvictCacheInAdvanceResult = sql """select
METRIC_VALUE from information_schema.file_cache_statistics
- where METRIC_NAME = 'need_evict_cache_in_advance' limit
1;"""
- logger.info("Checking need_evict_cache_in_advance result: " +
updatedNeedEvictCacheInAdvanceResult)
-
- if (updatedNeedEvictCacheInAdvanceResult.size() > 0) {
- double updatedNeedEvictCacheInAdvance =
Double.valueOf(updatedNeedEvictCacheInAdvanceResult[0][0])
- logger.info("Current need_evict_cache_in_advance value:
${updatedNeedEvictCacheInAdvance}")
-
- if (updatedNeedEvictCacheInAdvance == 1.0) {
- logger.info("Need evict cache in advance mode is now
active (value = 1)")
- return true
- } else {
- logger.info("Need evict cache in advance mode is not
yet active (value = ${updatedNeedEvictCacheInAdvance}), waiting...")
- return false
- }
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for backend configuration update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ def updatedNeedEvictCacheInAdvanceResult = sql """select
METRIC_VALUE from information_schema.file_cache_statistics
+ where METRIC_NAME = 'need_evict_cache_in_advance' limit 1;"""
+ logger.info("Checking need_evict_cache_in_advance result: " +
updatedNeedEvictCacheInAdvanceResult)
+
+ if (updatedNeedEvictCacheInAdvanceResult.size() > 0) {
+ double updatedNeedEvictCacheInAdvance =
Double.valueOf(updatedNeedEvictCacheInAdvanceResult[0][0])
+ logger.info("Current need_evict_cache_in_advance value:
${updatedNeedEvictCacheInAdvance}")
+
+ if (updatedNeedEvictCacheInAdvance == 1.0) {
+ logger.info("Need evict cache in advance mode is now
active (value = 1)")
+ return true
} else {
- logger.info("Failed to get need_evict_cache_in_advance
metric, waiting...")
+ logger.info("Need evict cache in advance mode is not yet
active (value = ${updatedNeedEvictCacheInAdvance}), waiting...")
return false
}
+ } else {
+ logger.info("Failed to get need_evict_cache_in_advance metric,
waiting...")
+ return false
}
} catch (Exception e) {
logger.info(NEED_EVICT_CACHE_IN_ADVANCE_TEST_FAILED_MSG +
e.getMessage())
@@ -219,4 +239,3 @@ suite("test_file_cache_features",
"external_docker,hive,external_docker_hive,p0,
sql """set global enable_file_cache=false"""
return true
}
-
diff --git
a/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
new file mode 100644
index 00000000000..a6ccf96c38c
--- /dev/null
+++
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.dgmimpl.arrays.LongArrayGetAtMetaMethod
+
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+
+// Constants for backend configuration check
+final String BACKEND_CONFIG_CHECK_FAILED_PREFIX = "Backend configuration check
failed: "
+final String ENABLE_FILE_CACHE_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache is empty or not set to
true"
+final String FILE_CACHE_BACKGROUND_MONITOR_INTERVAL_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_background_monitor_interval_ms
is empty or not set to true"
+final String FILE_CACHE_PATH_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_path is empty or not
configured"
+final String WEB_SERVER_PORT_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "webserver_port is empty or not configured"
+final String BRPC_PORT_CHECK_FAILED_MSG = BACKEND_CONFIG_CHECK_FAILED_PREFIX +
"brpc_port is empty or not configured"
+final String ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_FALSE_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache_query_limit is empty or
not set to false"
+final String ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_TRUE_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache_query_limit is empty or
not set to true"
+final String FILE_CACHE_QUERY_LIMIT_BYTES_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_query_limit_bytes is empty or
not configured"
+// Constants for cache query features check
+final String FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX = "File cache features
check failed: "
+final String BASE_NORMAL_QUEUE_CURR_SIZE_IS_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "base normal_queue_curr_size is 0"
+final String BASE_NORMAL_QUEUE_CURR_ELEMENTS_IS_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "base normal_queue_curr_elements is 0"
+final String TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "total_read_counts did not increase
after cache operation"
+final String INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_curr_size is
not 0"
+final String INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_curr_elements
is not 0"
+final String INITIAL_NORMAL_QUEUE_MAX_SIZE_IS_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_max_size is 0"
+final String INITIAL_NORMAL_QUEUE_MAX_ELEMENTS_IS_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_max_elements is
0"
+final String NORMAL_QUEUE_CURR_SIZE_NOT_GREATER_THAN_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_size is not
greater than 0 after cache operation"
+final String NORMAL_QUEUE_CURR_ELEMENTS_NOT_GREATER_THAN_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_elements is not
greater than 0 after cache operation"
+final String NORMAL_QUEUE_CURR_SIZE_GREATER_THAN_QUERY_CACHE_CAPACITY_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_size is greater
than query cache capacity"
+
+suite("test_file_cache_query_limit",
"external_docker,hive,external_docker_hive,p0,external,nonConcurrent") {
+ String enableHiveTest = context.config.otherConfigs.get("enableHiveTest")
+ if (enableHiveTest == null || !enableHiveTest.equalsIgnoreCase("true")) {
+ logger.info("disable hive test.")
+ return
+ }
+
+ sql """set enable_file_cache=true"""
+
+ // Check backend configuration prerequisites
+ // Note: This test case assumes a single backend scenario. Testing with
single backend is logically equivalent
+ // to testing with multiple backends having identical configurations, but
simpler in logic.
+ def enableFileCacheResult = sql """show backend config like
'enable_file_cache';"""
+ logger.info("enable_file_cache configuration: " + enableFileCacheResult)
+ assertFalse(enableFileCacheResult.size() == 0 ||
!enableFileCacheResult[0][3].equalsIgnoreCase("true"),
+ ENABLE_FILE_CACHE_CHECK_FAILED_MSG)
+
+ def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend
config like 'file_cache_background_monitor_interval_ms';"""
+ logger.info("file_cache_background_monitor_interval_ms configuration: " +
fileCacheBackgroundMonitorIntervalMsResult)
+ assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 ||
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+ fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(),
FILE_CACHE_BACKGROUND_MONITOR_INTERVAL_CHECK_FAILED_MSG)
+
+ String catalog_name = "test_file_cache_query_limit"
+ String ex_db_name = "tpch1_parquet"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
+ int queryCacheCapacity
+
+ sql """drop catalog if exists ${catalog_name} """
+
+ sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
+ 'hadoop.username' = 'hive'
+ );"""
+
+ String query_sql =
+ """select sum(l_quantity) as sum_qty,
+ sum(l_extendedprice) as sum_base_price,
+ sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+ sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as
sum_charge,
+ avg(l_quantity) as avg_qty,
+ avg(l_extendedprice) as avg_price,
+ avg(l_discount) as avg_disc,
+ count(*) as count_order
+ from ${catalog_name}.${ex_db_name}.lineitem
+ where l_shipdate <= date '1998-12-01' - interval '90' day
+ group by l_returnflag, l_linestatus
+ order by l_returnflag, l_linestatus;"""
+
+ def webserverPortResult = sql """SHOW BACKEND CONFIG LIKE
'webserver_port';"""
+ logger.info("webserver_port configuration: " + webserverPortResult)
+ assertFalse(webserverPortResult.size() == 0 || webserverPortResult[0][3]
== null || webserverPortResult[0][3].trim().isEmpty(),
+ WEB_SERVER_PORT_CHECK_FAILED_MSG)
+
+ String webserver_port = webserverPortResult[0][3]
+
+ def brpcPortResult = sql """SHOW BACKEND CONFIG LIKE 'brpc_port';"""
+ logger.info("brpcPortResult configuration: " + brpcPortResult)
+ assertFalse(brpcPortResult.size() == 0 || brpcPortResult[0][3] == null ||
brpcPortResult[0][3].trim().isEmpty(),
+ BRPC_PORT_CHECK_FAILED_MSG)
+
+ String brpc_port = brpcPortResult[0][3]
+
+ // Search file cache capacity
+ def command = ["curl", "-X", "POST", "${externalEnvIp}:${brpc_port}/vars"]
+ def stringCommand = command.collect{it.toString()}
+ def process = new ProcessBuilder(stringCommand as
String[]).redirectErrorStream(true).start()
+
+ def output = new StringBuilder()
+ def errorOutput = new StringBuilder()
+ process.inputStream.eachLine{line -> output.append(line).append("\n")}
+ process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+ // Wait for process completion and check exit status
+ def exitCode = process.waitFor()
+ def fileCacheCapacityResult = output.toString().split("\n").find {
it.contains("file_cache_capacity") }?.split(":")?.last()?.trim()
+
+ logger.info("File cache capacity: ${fileCacheCapacityResult}")
+ assertTrue(fileCacheCapacityResult != null, "Failed to find
file_cache_capacity in brpc metrics")
+ def fileCacheCapacity = Long.valueOf(fileCacheCapacityResult)
+
+ // Run file cache base test for setting the parameter
file_cache_query_limit_bytes
+ logger.info("========================= Start running file cache base test
========================")
+
+ // Clear file cache
+ command = ["curl", "-X", "POST",
"${externalEnvIp}:${webserver_port}/api/file_cache?op=clear&sync=true"]
+ stringCommand = command.collect{it.toString()}
+ process = new ProcessBuilder(stringCommand as
String[]).redirectErrorStream(true).start()
+
+ output = new StringBuilder()
+ errorOutput = new StringBuilder()
+ process.inputStream.eachLine{line -> output.append(line).append("\n")}
+ process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+ // Wait for process completion and check exit status
+ exitCode = process.waitFor()
+ logger.info("File cache clear command output: ${output.toString()}")
+ assertTrue(exitCode == 0, "File cache clear failed with exit code
${exitCode}. Error: ${errorOutput.toString()}")
+
+ // brpc metrics will be updated at most 5 seconds
+ def totalWaitTime =
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toLong() / 1000) as int
+ def interval = 1
+ def iterations = totalWaitTime / interval
+
+ // Waiting for file cache clearing
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache clearing ${elapsedSeconds} seconds,
${remainingSeconds} seconds remaining")
+ }
+
+ def initialNormalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " +
initialNormalQueueCurrSizeResult)
+ assertFalse(initialNormalQueueCurrSizeResult.size() == 0 ||
Double.valueOf(initialNormalQueueCurrSizeResult[0][0]) != 0.0,
+ INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG)
+
+ // Check normal queue current elements
+ def initialNormalQueueCurrElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
initialNormalQueueCurrElementsResult)
+ assertFalse(initialNormalQueueCurrElementsResult.size() == 0 ||
Double.valueOf(initialNormalQueueCurrElementsResult[0][0]) != 0.0,
+ INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG)
+
+ double initialNormalQueueCurrSize =
Double.valueOf(initialNormalQueueCurrSizeResult[0][0])
+ double initialNormalQueueCurrElements =
Double.valueOf(initialNormalQueueCurrElementsResult[0][0])
+
+ logger.info("Initial normal queue curr size and elements - size:
${initialNormalQueueCurrSize} , " +
+ "elements: ${initialNormalQueueCurrElements}")
+
+ setBeConfigTemporary([
+ "enable_file_cache_query_limit": "false"
+ ]) {
+ // Execute test logic with modified configuration for
file_cache_query_limit
+ logger.info("Backend configuration set -
enable_file_cache_query_limit: false")
+
+ // Waiting for backend configuration update
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for backend configuration update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // Check if the configuration is modified
+ def enableFileCacheQueryLimitResult = sql """SHOW BACKEND CONFIG LIKE
'enable_file_cache_query_limit';"""
+ logger.info("enable_file_cache_query_limit configuration: " +
enableFileCacheQueryLimitResult)
+ assertFalse(enableFileCacheQueryLimitResult.size() == 0 ||
enableFileCacheQueryLimitResult[0][3] == null ||
enableFileCacheQueryLimitResult[0][3] != "false",
+ ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_FALSE_FAILED_MSG)
+
+ sql """switch ${catalog_name}"""
+ // load the table into file cache
+ sql query_sql
+
+ // Waiting for file cache statistics update
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache statistics update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ def baseNormalQueueCurrElementsResult = sql """select METRIC_VALUE
from information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
baseNormalQueueCurrElementsResult)
+ assertFalse(baseNormalQueueCurrElementsResult.size() == 0 ||
Double.valueOf(baseNormalQueueCurrElementsResult[0][0]) == 0.0,
+ BASE_NORMAL_QUEUE_CURR_ELEMENTS_IS_ZERO_MSG)
+
+ def baseNormalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " +
baseNormalQueueCurrSizeResult)
+ assertFalse(baseNormalQueueCurrSizeResult.size() == 0 ||
Double.valueOf(baseNormalQueueCurrSizeResult[0][0]) == 0.0,
+ BASE_NORMAL_QUEUE_CURR_SIZE_IS_ZERO_MSG)
+
+ int baseNormalQueueCurrElements =
Double.valueOf(baseNormalQueueCurrElementsResult[0][0]) as Long
+ queryCacheCapacity =
Double.valueOf(baseNormalQueueCurrSizeResult[0][0]) as Long
+ }
+
+ // The parameter file_cache_query_limit_percent must be set smaller than
the cache capacity required by the query
+ def fileCacheQueryLimitPercent = (queryCacheCapacity / fileCacheCapacity)
* 100
+ logger.info("file_cache_query_limit_percent: " +
fileCacheQueryLimitPercent)
+
+ logger.info("========================== End running file cache base test
=========================")
+
+ logger.info("==================== Start running file cache query limit
test 1 ====================")
+
+ def fileCacheQueryLimitPercentTest1 = (fileCacheQueryLimitPercent / 2) as
Long
+ logger.info("file_cache_query_limit_percent_test1: " +
fileCacheQueryLimitPercentTest1)
+
+ // Clear file cache
+ process = new ProcessBuilder(stringCommand as
String[]).redirectErrorStream(true).start()
+
+ output = new StringBuilder()
+ errorOutput = new StringBuilder()
+ process.inputStream.eachLine{line -> output.append(line).append("\n")}
+ process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+ // Wait for process completion and check exit status
+ exitCode = process.waitFor()
+ logger.info("File cache clear command output: ${output.toString()}")
+ assertTrue(exitCode == 0, "File cache clear failed with exit code
${exitCode}. Error: ${errorOutput.toString()}")
+
+ // Waiting for file cache clearing
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache clearing ${elapsedSeconds} seconds,
${remainingSeconds} seconds remaining")
+ }
+
+ // ===== Normal Queue Metrics Check =====
+ // Check normal queue current size
+ initialNormalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " +
initialNormalQueueCurrSizeResult)
+ assertFalse(initialNormalQueueCurrSizeResult.size() == 0 ||
Double.valueOf(initialNormalQueueCurrSizeResult[0][0]) != 0.0,
+ INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG)
+
+ // Check normal queue current elements
+ initialNormalQueueCurrElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
initialNormalQueueCurrElementsResult)
+ assertFalse(initialNormalQueueCurrElementsResult.size() == 0 ||
Double.valueOf(initialNormalQueueCurrElementsResult[0][0]) != 0.0,
+ INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG)
+
+ // Check normal queue max size
+ def initialNormalQueueMaxSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
+ logger.info("normal_queue_max_size result: " +
initialNormalQueueMaxSizeResult)
+ assertFalse(initialNormalQueueMaxSizeResult.size() == 0 ||
Double.valueOf(initialNormalQueueMaxSizeResult[0][0]) == 0.0,
+ INITIAL_NORMAL_QUEUE_MAX_SIZE_IS_ZERO_MSG)
+
+ // Check normal queue max elements
+ def initialNormalQueueMaxElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
+ logger.info("normal_queue_max_elements result: " +
initialNormalQueueMaxElementsResult)
+ assertFalse(initialNormalQueueMaxElementsResult.size() == 0 ||
Double.valueOf(initialNormalQueueMaxElementsResult[0][0]) == 0.0,
+ INITIAL_NORMAL_QUEUE_MAX_ELEMENTS_IS_ZERO_MSG)
+
+ initialNormalQueueCurrSize =
Double.valueOf(initialNormalQueueCurrSizeResult[0][0])
+ initialNormalQueueCurrElements =
Double.valueOf(initialNormalQueueCurrElementsResult[0][0])
+ double initialNormalQueueMaxSize =
Double.valueOf(initialNormalQueueMaxSizeResult[0][0])
+ double initialNormalQueueMaxElements =
Double.valueOf(initialNormalQueueMaxElementsResult[0][0])
+
+ logger.info("Initial normal queue curr size and elements - size:
${initialNormalQueueCurrSize} , " +
+ "elements: ${initialNormalQueueCurrElements}")
+
+ logger.info("Initial normal queue max size and elements - size:
${initialNormalQueueMaxSize} , " +
+ "elements: ${initialNormalQueueMaxElements}")
+
+ // ===== Hit And Read Counts Metrics Check =====
+ // Get initial values for hit and read counts
+ def initialTotalHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_hit_counts' limit 1;"""
+ logger.info("Initial total_hit_counts result: " +
initialTotalHitCountsResult)
+
+ def initialTotalReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_read_counts' limit 1;"""
+ logger.info("Initial total_read_counts result: " +
initialTotalReadCountsResult)
+
+ // Store initial values
+ double initialTotalHitCounts =
Double.valueOf(initialTotalHitCountsResult[0][0])
+ double initialTotalReadCounts =
Double.valueOf(initialTotalReadCountsResult[0][0])
+
+ // Set backend configuration parameters for file_cache_query_limit test 1
+ setBeConfigTemporary([
+ "enable_file_cache_query_limit": "true"
+ ]) {
+ // Execute test logic with modified configuration for
file_cache_query_limit
+ logger.info("Backend configuration set -
enable_file_cache_query_limit: true")
+
+ sql """set file_cache_query_limit_percent =
${fileCacheQueryLimitPercentTest1}"""
+
+ // Waiting for backend configuration update
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for backend configuration update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // Check if the configuration is modified
+ def enableFileCacheQueryLimitResult = sql """SHOW BACKEND CONFIG LIKE
'enable_file_cache_query_limit';"""
+ logger.info("enable_file_cache_query_limit configuration: " +
enableFileCacheQueryLimitResult)
+ assertFalse(enableFileCacheQueryLimitResult.size() == 0 ||
enableFileCacheQueryLimitResult[0][3] == null ||
enableFileCacheQueryLimitResult[0][3] != "true",
+ ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_TRUE_FAILED_MSG)
+
+ sql """switch ${catalog_name}"""
+
+ // load the table into file cache
+ sql query_sql
+
+ // Waiting for file cache statistics update
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache statistics update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // Get updated value of normal queue current elements and max elements
after cache operations
+ def updatedNormalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " +
updatedNormalQueueCurrSizeResult)
+
+ def updatedNormalQueueCurrElementsResult = sql """select METRIC_VALUE
from information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
updatedNormalQueueCurrElementsResult)
+
+ // Check if updated values are greater than initial values
+ double updatedNormalQueueCurrSize =
Double.valueOf(updatedNormalQueueCurrSizeResult[0][0])
+ double updatedNormalQueueCurrElements =
Double.valueOf(updatedNormalQueueCurrElementsResult[0][0])
+
+ logger.info("Updated normal queue curr size and elements - size:
${updatedNormalQueueCurrSize} , " +
+ "elements: ${updatedNormalQueueCurrElements}")
+
+ assertTrue(updatedNormalQueueCurrSize > 0.0,
NORMAL_QUEUE_CURR_SIZE_NOT_GREATER_THAN_ZERO_MSG)
+ assertTrue(updatedNormalQueueCurrElements > 0.0,
NORMAL_QUEUE_CURR_ELEMENTS_NOT_GREATER_THAN_ZERO_MSG)
+
+ logger.info("Normal queue curr size and query cache capacity
comparison - normal queue curr size: ${updatedNormalQueueCurrSize as Long} , " +
+ "query cache capacity: ${fileCacheCapacity}")
+
+ assertTrue((updatedNormalQueueCurrSize as Long) <= queryCacheCapacity,
+ NORMAL_QUEUE_CURR_SIZE_GREATER_THAN_QUERY_CACHE_CAPACITY_MSG)
+
+ // Get updated values for hit and read counts after cache operations
+ def updatedTotalHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_hit_counts' limit 1;"""
+ logger.info("Updated total_hit_counts result: " +
updatedTotalHitCountsResult)
+
+ def updatedTotalReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_read_counts' limit 1;"""
+ logger.info("Updated total_read_counts result: " +
updatedTotalReadCountsResult)
+
+ // Check if updated values are greater than initial values
+ double updatedTotalHitCounts =
Double.valueOf(updatedTotalHitCountsResult[0][0])
+ double updatedTotalReadCounts =
Double.valueOf(updatedTotalReadCountsResult[0][0])
+
+ logger.info("Total hit and read counts comparison - hit counts:
${initialTotalHitCounts} -> " +
+ "${updatedTotalHitCounts} , read counts:
${initialTotalReadCounts} -> ${updatedTotalReadCounts}")
+
+ assertTrue(updatedTotalReadCounts > initialTotalReadCounts,
TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
+ }
+
+ logger.info("===================== End running file cache query limit test
1 =====================")
+
+ return true;
+}
\ No newline at end of file
diff --git
a/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
new file mode 100644
index 00000000000..f2e5d4c2f68
--- /dev/null
+++
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.dgmimpl.arrays.LongArrayGetAtMetaMethod
+
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+
+final String ERROR_SQL_SUCCEED_MSG = "SQL should have failed but succeeded"
+final String SET_SESSION_VARIABLE_FAILED_MSG = "SQL set session variable
failed"
+
+suite("test_file_cache_query_limit_config",
"external_docker,hive,external_docker_hive,p0,external,nonConcurrent") {
+
+ sql """set file_cache_query_limit_percent = 1"""
+ def fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 1.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ sql """set file_cache_query_limit_percent = 20"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 20.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ sql """set file_cache_query_limit_percent = 51"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 51.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+
+ sql """set file_cache_query_limit_percent = 100"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 100.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ try {
+ sql """set file_cache_query_limit_percent = -1"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 0"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 101"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 1000000"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ // Set frontend configuration parameters for
file_cache_query_limit_max_percent
+ setFeConfigTemporary([
+ "file_cache_query_limit_max_percent": "50"
+ ]) {
+ // Execute test logic with modified configuration for
file_cache_query_limit_max_percent
+ logger.info("Backend configuration set -
file_cache_query_limit_max_percent: 50")
+
+ sql """set file_cache_query_limit_percent = 1"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 1.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ sql """set file_cache_query_limit_percent = 20"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 20.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ try {
+ sql """set file_cache_query_limit_percent = -1"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 0"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 51"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+ }
+
+ return true;
+}
+
diff --git
a/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
b/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
index b0984445e5f..965dfdbcaa9 100644
---
a/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
+++
b/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
@@ -89,150 +89,171 @@ suite("test_file_cache_statistics",
"external_docker,hive,external_docker_hive,p
// do it twice to make sure the table block could hit the cache
order_qt_1 """select * from
${catalog_name}.${ex_db_name}.parquet_partition_table where l_orderkey=1 and
l_partkey=1534 limit 1;"""
- // brpc metrics will be updated at most 20 seconds
- Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until{
- // ===== Hit Ratio Metrics Check =====
- // Check overall hit ratio hits_ratio
- def hitsRatioResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio' limit
1;"""
- logger.info("hits_ratio result: " + hitsRatioResult)
-
- // Check 1-hour hit ratio hits_ratio_1h
- def hitsRatio1hResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_1h'
limit 1;"""
- logger.info("hits_ratio_1h result: " + hitsRatio1hResult)
-
- // Check 5-minute hit ratio hits_ratio_5m
- def hitsRatio5mResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_5m'
limit 1;"""
- logger.info("hits_ratio_5m result: " + hitsRatio5mResult)
-
- // Check if all three metrics exist and are greater than 0
- boolean hasHitsRatio = hitsRatioResult.size() > 0 &&
Double.valueOf(hitsRatioResult[0][0]) > 0
- boolean hasHitsRatio1h = hitsRatio1hResult.size() > 0 &&
Double.valueOf(hitsRatio1hResult[0][0]) > 0
- boolean hasHitsRatio5m = hitsRatio5mResult.size() > 0 &&
Double.valueOf(hitsRatio5mResult[0][0]) > 0
-
- logger.info("Hit ratio metrics check result - hits_ratio:
${hasHitsRatio}, hits_ratio_1h: ${hasHitsRatio1h}, hits_ratio_5m:
${hasHitsRatio5m}")
-
- // Return false if any metric is false, otherwise return true
- if (!hasHitsRatio) {
- logger.info(HIT_RATIO_METRIC_FALSE_MSG)
- assertTrue(false, HIT_RATIO_METRIC_FALSE_MSG)
- }
- if (!hasHitsRatio1h) {
- logger.info(HIT_RATIO_1H_METRIC_FALSE_MSG)
- assertTrue(false, HIT_RATIO_1H_METRIC_FALSE_MSG)
- }
- if (!hasHitsRatio5m) {
- logger.info(HIT_RATIO_5M_METRIC_FALSE_MSG)
- assertTrue(false, HIT_RATIO_5M_METRIC_FALSE_MSG)
- }
- // ===== End Hit Ratio Metrics Check =====
-
- // ===== Normal Queue Metrics Check =====
- // Check normal queue current size and max size
- def normalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
- logger.info("normal_queue_curr_size result: " +
normalQueueCurrSizeResult)
-
- def normalQueueMaxSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
- logger.info("normal_queue_max_size result: " +
normalQueueMaxSizeResult)
-
- // Check normal queue current elements and max elements
- def normalQueueCurrElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
- logger.info("normal_queue_curr_elements result: " +
normalQueueCurrElementsResult)
-
- def normalQueueMaxElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
- logger.info("normal_queue_max_elements result: " +
normalQueueMaxElementsResult)
-
- // Check normal queue size metrics
- boolean hasNormalQueueCurrSize = normalQueueCurrSizeResult.size() > 0
&&
- Double.valueOf(normalQueueCurrSizeResult[0][0]) > 0
- boolean hasNormalQueueMaxSize = normalQueueMaxSizeResult.size() > 0 &&
- Double.valueOf(normalQueueMaxSizeResult[0][0]) > 0
- boolean hasNormalQueueCurrElements =
normalQueueCurrElementsResult.size() > 0 &&
- Double.valueOf(normalQueueCurrElementsResult[0][0]) > 0
- boolean hasNormalQueueMaxElements =
normalQueueMaxElementsResult.size() > 0 &&
- Double.valueOf(normalQueueMaxElementsResult[0][0]) > 0
-
- // Check if current size is less than max size and current elements is
less than max elements
- boolean normalQueueSizeValid = hasNormalQueueCurrSize &&
hasNormalQueueMaxSize &&
- Double.valueOf(normalQueueCurrSizeResult[0][0]) <
Double.valueOf(normalQueueMaxSizeResult[0][0])
- boolean normalQueueElementsValid = hasNormalQueueCurrElements &&
hasNormalQueueMaxElements &&
- Double.valueOf(normalQueueCurrElementsResult[0][0]) <
Double.valueOf(normalQueueMaxElementsResult[0][0])
-
- logger.info("Normal queue metrics check result - size valid:
${normalQueueSizeValid}, " +
- "elements valid: ${normalQueueElementsValid}")
-
- if (!normalQueueSizeValid) {
- logger.info(NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
- assertTrue(false, NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
- }
- if (!normalQueueElementsValid) {
- logger.info(NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
- assertTrue(false, NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
- }
- // ===== End Normal Queue Metrics Check =====
-
- // ===== Hit and Read Counts Metrics Check =====
- // Get initial values for hit and read counts
- def initialHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'total_hit_counts' limit 1;"""
- logger.info("Initial total_hit_counts result: " +
initialHitCountsResult)
-
- def initialReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'total_read_counts' limit 1;"""
- logger.info("Initial total_read_counts result: " +
initialReadCountsResult)
-
- // Check if initial values exist and are greater than 0
- if (initialHitCountsResult.size() == 0 ||
Double.valueOf(initialHitCountsResult[0][0]) <= 0) {
- logger.info(INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
- assertTrue(false, INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
- }
- if (initialReadCountsResult.size() == 0 ||
Double.valueOf(initialReadCountsResult[0][0]) <= 0) {
- logger.info(INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
- assertTrue(false, INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
- }
-
- // Store initial values
- double initialHitCounts = Double.valueOf(initialHitCountsResult[0][0])
- double initialReadCounts =
Double.valueOf(initialReadCountsResult[0][0])
-
- // Execute the same query to trigger cache operations
- order_qt_2 """select * from
${catalog_name}.${ex_db_name}.parquet_partition_table
- where l_orderkey=1 and l_partkey=1534 limit 1;"""
-
- // Get updated values after cache operations
- def updatedHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'total_hit_counts' limit 1;"""
- logger.info("Updated total_hit_counts result: " +
updatedHitCountsResult)
-
- def updatedReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'total_read_counts' limit 1;"""
- logger.info("Updated total_read_counts result: " +
updatedReadCountsResult)
-
- // Check if updated values are greater than initial values
- double updatedHitCounts = Double.valueOf(updatedHitCountsResult[0][0])
- double updatedReadCounts =
Double.valueOf(updatedReadCountsResult[0][0])
-
- boolean hitCountsIncreased = updatedHitCounts > initialHitCounts
- boolean readCountsIncreased = updatedReadCounts > initialReadCounts
-
- logger.info("Hit and read counts comparison - hit_counts:
${initialHitCounts} -> " +
- "${updatedHitCounts} (increased: ${hitCountsIncreased}),
read_counts: ${initialReadCounts} -> " +
- "${updatedReadCounts} (increased: ${readCountsIncreased})")
-
- if (!hitCountsIncreased) {
- logger.info(TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
- assertTrue(false, TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
- }
- if (!readCountsIncreased) {
- logger.info(TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
- assertTrue(false, TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
- }
- // ===== End Hit and Read Counts Metrics Check =====
- sql """set global enable_file_cache=false"""
- return true
+ def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend
config like 'file_cache_background_monitor_interval_ms';"""
+ logger.info("file_cache_background_monitor_interval_ms configuration: " +
fileCacheBackgroundMonitorIntervalMsResult)
+ assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 ||
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+ fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(),
"file_cache_background_monitor_interval_ms is empty or not set to true")
+
+ // brpc metrics will be updated at most 5 seconds
+ def totalWaitTime =
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toInteger() / 1000) as int
+ def interval = 1
+ def iterations = totalWaitTime / interval
+
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache statistics update ${elapsedSeconds}
seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // ===== Hit Ratio Metrics Check =====
+ // Check overall hit ratio hits_ratio
+ def hitsRatioResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio' limit
1;"""
+ logger.info("hits_ratio result: " + hitsRatioResult)
+
+ // Check 1-hour hit ratio hits_ratio_1h
+ def hitsRatio1hResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_1h'
limit 1;"""
+ logger.info("hits_ratio_1h result: " + hitsRatio1hResult)
+
+ // Check 5-minute hit ratio hits_ratio_5m
+ def hitsRatio5mResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_5m'
limit 1;"""
+ logger.info("hits_ratio_5m result: " + hitsRatio5mResult)
+
+ // Check if all three metrics exist and are greater than 0
+ boolean hasHitsRatio = hitsRatioResult.size() > 0 &&
Double.valueOf(hitsRatioResult[0][0]) > 0
+ boolean hasHitsRatio1h = hitsRatio1hResult.size() > 0 &&
Double.valueOf(hitsRatio1hResult[0][0]) > 0
+ boolean hasHitsRatio5m = hitsRatio5mResult.size() > 0 &&
Double.valueOf(hitsRatio5mResult[0][0]) > 0
+
+ logger.info("Hit ratio metrics check result - hits_ratio: ${hasHitsRatio},
hits_ratio_1h: ${hasHitsRatio1h}, hits_ratio_5m: ${hasHitsRatio5m}")
+
+ // Return false if any metric is false, otherwise return true
+ if (!hasHitsRatio) {
+ logger.info(HIT_RATIO_METRIC_FALSE_MSG)
+ assertTrue(false, HIT_RATIO_METRIC_FALSE_MSG)
+ }
+ if (!hasHitsRatio1h) {
+ logger.info(HIT_RATIO_1H_METRIC_FALSE_MSG)
+ assertTrue(false, HIT_RATIO_1H_METRIC_FALSE_MSG)
+ }
+ if (!hasHitsRatio5m) {
+ logger.info(HIT_RATIO_5M_METRIC_FALSE_MSG)
+ assertTrue(false, HIT_RATIO_5M_METRIC_FALSE_MSG)
+ }
+ // ===== End Hit Ratio Metrics Check =====
+
+ // ===== Normal Queue Metrics Check =====
+ // Check normal queue current size and max size
+ def normalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " + normalQueueCurrSizeResult)
+
+ def normalQueueMaxSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
+ logger.info("normal_queue_max_size result: " + normalQueueMaxSizeResult)
+
+ // Check normal queue current elements and max elements
+ def normalQueueCurrElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
normalQueueCurrElementsResult)
+
+ def normalQueueMaxElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
+ logger.info("normal_queue_max_elements result: " +
normalQueueMaxElementsResult)
+
+ // Check normal queue size metrics
+ boolean hasNormalQueueCurrSize = normalQueueCurrSizeResult.size() > 0 &&
+ Double.valueOf(normalQueueCurrSizeResult[0][0]) > 0
+ boolean hasNormalQueueMaxSize = normalQueueMaxSizeResult.size() > 0 &&
+ Double.valueOf(normalQueueMaxSizeResult[0][0]) > 0
+ boolean hasNormalQueueCurrElements = normalQueueCurrElementsResult.size()
> 0 &&
+ Double.valueOf(normalQueueCurrElementsResult[0][0]) > 0
+ boolean hasNormalQueueMaxElements = normalQueueMaxElementsResult.size() >
0 &&
+ Double.valueOf(normalQueueMaxElementsResult[0][0]) > 0
+
+ // Check if current size is less than max size and current elements is
less than max elements
+ boolean normalQueueSizeValid = hasNormalQueueCurrSize &&
hasNormalQueueMaxSize &&
+ Double.valueOf(normalQueueCurrSizeResult[0][0]) <
Double.valueOf(normalQueueMaxSizeResult[0][0])
+ boolean normalQueueElementsValid = hasNormalQueueCurrElements &&
hasNormalQueueMaxElements &&
+ Double.valueOf(normalQueueCurrElementsResult[0][0]) <
Double.valueOf(normalQueueMaxElementsResult[0][0])
+
+ logger.info("Normal queue metrics check result - size valid:
${normalQueueSizeValid}, " +
+ "elements valid: ${normalQueueElementsValid}")
+
+ if (!normalQueueSizeValid) {
+ logger.info(NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
+ assertTrue(false, NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
+ }
+ if (!normalQueueElementsValid) {
+ logger.info(NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
+ assertTrue(false, NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
+ }
+ // ===== End Normal Queue Metrics Check =====
+
+ // ===== Hit and Read Counts Metrics Check =====
+ // Get initial values for hit and read counts
+ def initialHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_hit_counts' limit 1;"""
+ logger.info("Initial total_hit_counts result: " + initialHitCountsResult)
+
+ def initialReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_read_counts' limit 1;"""
+ logger.info("Initial total_read_counts result: " + initialReadCountsResult)
+
+ // Check if initial values exist and are greater than 0
+ if (initialHitCountsResult.size() == 0 ||
Double.valueOf(initialHitCountsResult[0][0]) <= 0) {
+ logger.info(INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
+ assertTrue(false, INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
+ }
+ if (initialReadCountsResult.size() == 0 ||
Double.valueOf(initialReadCountsResult[0][0]) <= 0) {
+ logger.info(INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
+ assertTrue(false, INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
+ }
+
+ // Store initial values
+ double initialHitCounts = Double.valueOf(initialHitCountsResult[0][0])
+ double initialReadCounts = Double.valueOf(initialReadCountsResult[0][0])
+
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache statistics update ${elapsedSeconds}
seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // Execute the same query to trigger cache operations
+ order_qt_2 """select * from
${catalog_name}.${ex_db_name}.parquet_partition_table
+ where l_orderkey=1 and l_partkey=1534 limit 1;"""
+
+ // Get updated values after cache operations
+ def updatedHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_hit_counts' limit 1;"""
+ logger.info("Updated total_hit_counts result: " + updatedHitCountsResult)
+
+ def updatedReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_read_counts' limit 1;"""
+ logger.info("Updated total_read_counts result: " + updatedReadCountsResult)
+
+ // Check if updated values are greater than initial values
+ double updatedHitCounts = Double.valueOf(updatedHitCountsResult[0][0])
+ double updatedReadCounts = Double.valueOf(updatedReadCountsResult[0][0])
+
+ boolean hitCountsIncreased = updatedHitCounts > initialHitCounts
+ boolean readCountsIncreased = updatedReadCounts > initialReadCounts
+
+ logger.info("Hit and read counts comparison - hit_counts:
${initialHitCounts} -> " +
+ "${updatedHitCounts} (increased: ${hitCountsIncreased}), read_counts:
${initialReadCounts} -> " +
+ "${updatedReadCounts} (increased: ${readCountsIncreased})")
+
+ if (!hitCountsIncreased) {
+ logger.info(TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
+ assertTrue(false, TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
+ }
+ if (!readCountsIncreased) {
+ logger.info(TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
+ assertTrue(false, TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
}
+ // ===== End Hit and Read Counts Metrics Check =====
+ sql """set global enable_file_cache=false"""
+ return true
}
diff --git
a/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
index 7d270ffe3b8..d1adc2b6b84 100644
---
a/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
+++
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_hive_warmup_select",
"p0,external,hive,external_docker,external_docker_hive") {
+suite("test_hive_warmup_select",
"p0,external,hive,external_docker,external_docker_hive,nonConcurrent") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable Hive test.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]