This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 63d936f32fa branch-4.0: [enhancement](spilldisk)Cancel query fast when
reserver memory failed and could not find revocable tasks #59330 (#59440)
63d936f32fa is described below
commit 63d936f32fa6adaaa0d98d222b17d17138266233
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 30 14:09:37 2025 +0800
branch-4.0: [enhancement](spilldisk)Cancel query fast when reserver memory
failed and could not find revocable tasks #59330 (#59440)
Cherry-picked from #59330
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/pipeline_task.cpp | 34 +-
be/src/runtime/memory/mem_tracker_limiter.h | 3 +-
.../workload_group/workload_group_manager.cpp | 388 +++++++++------------
.../workload_management/query_task_controller.cpp | 9 +
.../workload_management/query_task_controller.h | 1 +
.../runtime/workload_management/task_controller.h | 2 +-
be/test/pipeline/pipeline_task_test.cpp | 200 ++++++++++-
be/test/pipeline/thrift_builder.h | 5 +
.../workload_group/workload_group_manager_test.cpp | 4 +-
gensrc/thrift/PaloInternalService.thrift | 2 +-
10 files changed, 405 insertions(+), 243 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index bf787f05105..dbe52cbabc0 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -643,6 +643,14 @@ Status PipelineTask::do_revoke_memory(const
std::shared_ptr<SpillContext>& spill
bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size,
OperatorBase* op) {
auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
+ // If reserve memory failed and the query is not enable spill, just
disable reserve memory(this will enable
+ // memory hard limit check, and will cancel the query if allocate memory
failed) and let it run.
+ if (!st.ok() && !_state->enable_spill()) {
+ LOG(INFO) << print_id(_query_id) << " reserve memory failed due to "
<< st
+ << ", and it is not enable spill, disable reserve memory and
let it run";
+
_state->get_query_ctx()->resource_ctx()->task_controller()->disable_reserve_memory();
+ return true;
+ }
COUNTER_UPDATE(_memory_reserve_times, 1);
auto sink_revocable_mem_size = _sink->revocable_mem_size(_state);
if (st.ok() && _state->enable_force_spill() && _sink->is_spillable() &&
@@ -659,13 +667,30 @@ bool PipelineTask::_try_to_reserve_memory(const size_t
reserve_size, OperatorBas
op->node_id(), _state->task_id(),
PrettyPrinter::print_bytes(op->revocable_mem_size(_state)),
PrettyPrinter::print_bytes(sink_revocable_mem_size),
st.to_string());
- // PROCESS_MEMORY_EXCEEDED error msg alread contains
process_mem_log_str
+ // PROCESS_MEMORY_EXCEEDED error msg already contains
process_mem_log_str
if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
debug_msg +=
fmt::format(", debug info: {}",
GlobalMemoryArbitrator::process_mem_log_str());
}
- LOG_EVERY_N(INFO, 100) << debug_msg;
// If sink has enough revocable memory, trigger revoke memory
+ LOG(INFO) << fmt::format(
+ "Query: {} sink: {}, node id: {}, task id: "
+ "{}, revocable mem size: {}",
+ print_id(_query_id), _sink->get_name(), _sink->node_id(),
_state->task_id(),
+ PrettyPrinter::print_bytes(sink_revocable_mem_size));
+ ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+ _state->get_query_ctx()->resource_ctx()->shared_from_this(),
reserve_size, st);
+ _spilling = true;
+ return false;
+ // !!! Attention:
+ // In the past, if reserve failed, not add this query to paused list,
because it is very small, will not
+ // consume a lot of memory. But need set low memory mode to indicate
that the system should
+ // not use too much memory.
+ // But if we only set _state->get_query_ctx()->set_low_memory_mode()
here, and return true, the query will
+ // continue to run and not blocked, and this reserve maybe the last
block of join sink opertorator, and it will
+ // build hash table directly and will consume a lot of memory. So that
should return false directly.
+ // TODO: we should using a global system buffer management logic to
deal with low memory mode.
+ /**
if (sink_revocable_mem_size >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
LOG(INFO) << fmt::format(
"Query: {} sink: {}, node id: {}, task id: "
@@ -677,11 +702,8 @@ bool PipelineTask::_try_to_reserve_memory(const size_t
reserve_size, OperatorBas
_spilling = true;
return false;
} else {
- // If reserve failed, not add this query to paused list, because
it is very small, will not
- // consume a lot of memory. But need set low memory mode to
indicate that the system should
- // not use too much memory.
_state->get_query_ctx()->set_low_memory_mode();
- }
+ } */
}
return true;
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index da9285e5255..fd0418f4826 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -126,7 +126,6 @@ public:
int64_t group_num() const { return _group_num; }
int64_t limit() const { return _limit; }
void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
- bool enable_check_limit() const { return _enable_check_limit; }
void set_enable_check_limit(bool enable_check_limit) {
_enable_check_limit = enable_check_limit;
}
@@ -298,7 +297,7 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes)
{
}
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
- if (bytes <= 0 || !enable_check_limit() || _limit <= 0) {
+ if (bytes <= 0 || !_enable_check_limit || _limit <= 0) {
return Status::OK();
}
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 5b2fd2efd6c..f671b7a6e14 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -346,14 +346,33 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
}
- bool has_query_exceed_process_memlimit = false;
+ // In previous loop, some query is cancelled, and now there is no query in
cancel list. Resume all paused queries.
+ if (revoking_memory_from_other_query_) {
+ for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end(); ++it) {
+ auto& queries_list = it->second;
+ for (auto query_it = queries_list.begin(); query_it !=
queries_list.end(); ++query_it) {
+ auto resource_ctx = query_it->resource_ctx_.lock();
+ // The query is finished during in paused list.
+ if (resource_ctx == nullptr) {
+ LOG(INFO) << "Query: " << query_it->query_id() << " is
nullptr, erase it.";
+ continue;
+ }
+ LOG(INFO) << "Query " <<
print_id(resource_ctx->task_controller()->task_id())
+ << " is blocked due to process memory not enough,
but already "
+ "cancelled some queries, resumt it now.";
+ resource_ctx->task_controller()->set_memory_sufficient(true);
+ }
+ }
+ revoking_memory_from_other_query_ = false;
+ }
+
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
auto query_count = queries_list.size();
const auto& wg = it->first;
if (query_count != 0) {
- LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " <<
wg->name() << ": "
+ LOG_EVERY_T(INFO, 1) << "Paused queries count of workload group "
<< wg->name() << ": "
<< query_count;
}
@@ -388,6 +407,11 @@ void WorkloadGroupMgr::handle_paused_queries() {
VLOG_DEBUG << "Query: " <<
print_id(resource_ctx->task_controller()->task_id())
<< " remove from paused list";
query_it = queries_list.erase(query_it);
+ // The query is cancelled, just break. And wait for the
query to release the memory. Other query maybe not need spill.
+ if (resource_ctx->task_controller()->is_cancelled()) {
+ revoking_memory_from_other_query_ = true;
+ return;
+ }
continue;
}
} else if (resource_ctx->task_controller()
@@ -469,6 +493,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
bool spill_res = handle_single_query_(
resource_ctx, query_it->reserve_size_,
query_it->elapsed_time(),
resource_ctx->task_controller()->paused_reason());
+
if (!spill_res) {
++query_it;
continue;
@@ -477,6 +502,11 @@ void WorkloadGroupMgr::handle_paused_queries() {
<< "Query: " <<
print_id(resource_ctx->task_controller()->task_id())
<< " remove from paused list";
query_it = queries_list.erase(query_it);
+ // The query is cancelled, just break. And wait for
the query to release the memory. Other query maybe not need spill.
+ if (resource_ctx->task_controller()->is_cancelled()) {
+ revoking_memory_from_other_query_ = true;
+ return;
+ }
continue;
}
} else {
@@ -499,105 +529,41 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
}
} else {
- if (revoking_memory_from_other_query_) {
- // Previously, we have revoked memory from other query,
and the cancel stage finished.
- // So, resume all queries now.
-
resource_ctx->task_controller()->set_memory_sufficient(true);
- VLOG_DEBUG << "Query " <<
print_id(resource_ctx->task_controller()->task_id())
- << " is blocked due to process memory not
enough, but already "
- "cancelled some queries, resumt it now.";
- query_it = queries_list.erase(query_it);
- continue;
- }
- has_query_exceed_process_memlimit = true;
- // If wg's memlimit not exceed, but process memory exceed, it
means cache or other metadata
- // used too much memory. Should clean all cache here.
- // Clear all cache not part of cache, because the cache thread
already try to release cache step
- // by step. And it is not useful.
- //
- // here query is paused because of PROCESS_MEMORY_EXCEEDED,
- // normally, before process memory exceeds, daemon thread
`refresh_cache_capacity` will
- // adjust the cache capacity to 0.
- // but at this time, process may not actually exceed the limit,
- // just (process memory + current query expected reserve
memory > process memory limit)
- // so the behavior at this time is the same as the process
memory limit exceed, clear all cache.
- if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
- 0.05 &&
- doris::GlobalMemoryArbitrator::
-
last_memory_exceeded_cache_capacity_adjust_weighted > 0.05) {
- doris::GlobalMemoryArbitrator::
-
last_memory_exceeded_cache_capacity_adjust_weighted = 0.04;
-
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
- LOG(INFO) << "There are some queries need process memory,
so that set cache "
- "capacity to 0 now";
- }
-
- // `cache_ratio_ < 0.05` means that the cache has been cleared
- // before the query enters the paused state.
- // but the query is still paused because of process memory
exceed,
- // so here we will try to continue to release other memory.
- //
- // need to check config::disable_memory_gc here, if not, when
config::disable_memory_gc == true,
- // cache is not adjusted, query_it->cache_ratio_ will always
be 1, and this if branch will nenver
- // execute, this query will never be resumed, and will
deadlock here.
- if (query_it->cache_ratio_ < 0.05 ||
config::disable_memory_gc) {
- // If workload group's memory usage > min memory, then it
means the workload group use too much memory
- // in memory contention state. Should just spill
- if (wg->total_mem_used() > wg->min_memory_limit()) {
- auto revocable_tasks =
-
resource_ctx->task_controller()->get_revocable_tasks();
- if (revocable_tasks.empty()) {
- Status status = Status::MemoryLimitExceeded(
- "Workload group memory usage {} > min
memory {}, but no "
- "revocable tasks",
- wg->total_mem_used(),
wg->min_memory_limit());
-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-
resource_ctx->task_controller()->task_id(), status);
+ // If workload group's memory usage > min memory, then it
means the workload group use too much memory
+ // in memory contention state. Should just spill
+ if (wg->total_mem_used() > wg->min_memory_limit()) {
+ bool spill_res = handle_single_query_(
+ resource_ctx, query_it->reserve_size_,
query_it->elapsed_time(),
+ resource_ctx->task_controller()->paused_reason());
+ if (!spill_res) {
+ ++query_it;
+ continue;
+ } else {
+ VLOG_DEBUG
+ << "Query: " <<
print_id(resource_ctx->task_controller()->task_id())
+ << " remove from paused list";
+ query_it = queries_list.erase(query_it);
+ // The query is cancelled, just break. And wait for
the query to release the memory. Other query maybe not need spill.
+ if (resource_ctx->task_controller()->is_cancelled()) {
revoking_memory_from_other_query_ = true;
- // If any query is cancelled, then skip others
because it will release many memory and
- // other query may not need release memory.
- return;
- } else {
- SCOPED_ATTACH_TASK(resource_ctx);
- auto status =
resource_ctx->task_controller()->revoke_memory();
- if (!status.ok()) {
-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-
resource_ctx->task_controller()->task_id(), status);
- revoking_memory_from_other_query_ = true;
- return;
- }
- query_it = queries_list.erase(query_it);
- continue;
}
- }
-
- // Other workload groups many use a lot of memory, should
revoke memory from other workload groups
- // by cancelling their queries.
- int64_t revoked_size = revoke_memory_from_other_groups_();
- if (revoked_size > 0) {
- // Revoke memory from other workload groups will
cancel some queries, wait them cancel finished
- // and then check it again.
- revoking_memory_from_other_query_ = true;
+ // If any query is cancelled or spilled to disk, we
need to stop and not revoke memory from other queries.
return;
}
-
- // TODO revoke from memtable
}
- // `cache_ratio_ > 0.05` means that the cache has not been
cleared
- // when the query enters the paused state.
- // `last_affected_cache_capacity_adjust_weighted < 0.05` means
that
- // the cache has been cleared at this time.
- // this means that the cache has been cleaned after the query
enters the paused state.
- // assuming that some memory has been released, wake up the
query to continue execution.
- if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
- 0.05 &&
- query_it->cache_ratio_ > 0.05) {
- LOG(INFO) << "Query: " <<
print_id(resource_ctx->task_controller()->task_id())
- << " will be resume after cache adjust.";
-
resource_ctx->task_controller()->set_memory_sufficient(true);
- query_it = queries_list.erase(query_it);
- continue;
+
+ // Other workload groups many use a lot of memory, should
revoke memory from other workload groups
+ // by cancelling their queries.
+ int64_t revoked_size = revoke_memory_from_other_groups_();
+ if (revoked_size > 0) {
+ // Revoke memory from other workload groups will cancel
some queries, wait them cancel finished
+ // and then check it again.
+ revoking_memory_from_other_query_ = true;
+ return;
}
+
+ // TODO revoke from memtable
+
++query_it;
}
}
@@ -612,21 +578,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
++it;
}
}
- // Attention: has to be here. It means, no query is at cancelling state
and all query blocked by process
- // not enough has been resumed.
- revoking_memory_from_other_query_ = false;
-
- if (!has_query_exceed_process_memlimit &&
-
doris::GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted
< 0.05) {
- // No query paused due to process exceed limit, so that enable cache
now.
-
doris::GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted
=
- doris::GlobalMemoryArbitrator::
-
last_periodic_refreshed_cache_capacity_adjust_weighted.load(
- std::memory_order_relaxed);
- doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
- LOG(INFO) << "No query was paused due to insufficient process memory,
so that set cache "
- "capacity to
last_periodic_refreshed_cache_capacity_adjust_weighted now";
- }
}
// Find the workload group that could revoke lot of memory:
@@ -688,6 +639,9 @@ int64_t
WorkloadGroupMgr::revoke_memory_from_other_groups_() {
// If the query could release some memory, for example, spill disk, then the
return value is true.
// If the query could not release memory, then cancel the query, the return
value is true.
// If the query is not ready to do these tasks, it means just wait, then
return value is false.
+// Return value:
+// true: the query is spilled or be cancelled. The manager should remove it
from paused queries list.
+// false: the query is not ready to do these tasks. The manager should
continue to wait.
bool WorkloadGroupMgr::handle_single_query_(const
std::shared_ptr<ResourceContext>& requestor,
size_t size_to_reserve, int64_t
time_in_queue,
Status paused_reason) {
@@ -705,125 +659,111 @@ bool WorkloadGroupMgr::handle_single_query_(const
std::shared_ptr<ResourceContex
const auto wg = requestor->workload_group();
auto revocable_tasks = requestor->task_controller()->get_revocable_tasks();
- if (revocable_tasks.empty()) {
- const auto limit = requestor->memory_context()->mem_limit();
- const auto reserved_size =
requestor->memory_context()->reserved_consumption();
- if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
- // During waiting time, another operator in the query may finished
and release
- // many memory and we could run.
- if ((memory_usage + size_to_reserve) < limit) {
- LOG(INFO) << "Query: " << query_id << ", usage("
- << PrettyPrinter::print_bytes(memory_usage) << " + "
<< size_to_reserve
- << ") less than limit(" <<
PrettyPrinter::print_bytes(limit)
- << "), resume it.";
- requestor->task_controller()->set_memory_sufficient(true);
- return true;
- } else if (time_in_queue >=
config::spill_in_paused_queue_timeout_ms) {
- // if cannot find any memory to release, then let the query
continue to run as far as possible.
- // after `disable_reserve_memory`, the query will not enter
the paused state again,
- // if the memory is really insufficient, Allocator will throw
an exception
- // of query memory limit exceed and the query will be canceled,
- // or it will be canceled by memory gc when the process memory
exceeds the limit.
- auto log_str = fmt::format(
- "Query {} memory limit is exceeded, but could "
- "not find memory that could release or spill to disk,
disable reserve "
- "memory and resume it. Query memory usage: "
- "{}, limit: {}, reserved "
- "size: {}, try to reserve: {}, wg info: {}. {}",
- query_id, PrettyPrinter::print_bytes(memory_usage),
- PrettyPrinter::print_bytes(limit),
- PrettyPrinter::print_bytes(reserved_size),
- PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string(),
- doris::ProcessProfile::instance()
- ->memory_profile()
- ->process_memory_detail_str());
- LOG_LONG_STRING(INFO, log_str);
- // Disable reserve memory will enable query level memory
check, if the query
- // need a lot of memory than the memory limit, it will be
killed.
- // Do not need set memlimit = ajusted_mem_limit because
workload group refresher thread
- // will update automatically.
- requestor->task_controller()->disable_reserve_memory();
- requestor->task_controller()->set_memory_sufficient(true);
- return true;
- } else {
- return false;
- }
- } else if
(paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
- if (!wg->exceed_limit()) {
- LOG(INFO) << "Query: " << query_id
- << " paused caused by
WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it.";
- requestor->task_controller()->set_memory_sufficient(true);
- return true;
- } else if (time_in_queue >
config::spill_in_paused_queue_timeout_ms) {
- // if cannot find any memory to release, then let the query
continue to run as far as possible
- // or cancelled by gc if memory is really not enough.
- auto log_str = fmt::format(
- "Query {} workload group memory is exceeded"
- ", and there is no cache now. And could not find task
to spill, disable "
- "reserve memory and resume it. "
- "Query memory usage: {}, limit: {}, reserved "
- "size: {}, try to reserve: {}, wg info: {}."
- " Maybe you should set the workload group's limit to a
lower value. {}",
- query_id, PrettyPrinter::print_bytes(memory_usage),
- PrettyPrinter::print_bytes(limit),
- PrettyPrinter::print_bytes(reserved_size),
- PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string(),
- doris::ProcessProfile::instance()
- ->memory_profile()
- ->process_memory_detail_str());
- LOG_LONG_STRING(INFO, log_str);
- requestor->task_controller()->disable_reserve_memory();
- requestor->task_controller()->set_memory_sufficient(true);
- return true;
- } else {
- return false;
- }
- } else {
- // Should not consider about process memory. For example, the
query's limit is 100g, workload
- // group's memlimit is 10g, process memory is 20g. The query
reserve will always failed in wg
- // limit, and process is always have memory, so that it will
resume and failed reserve again.
- const size_t test_memory_size = std::max<size_t>(size_to_reserve,
32L * 1024 * 1024);
- if
(!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(test_memory_size)) {
- LOG(INFO) << "Query: " << query_id
- << ", process limit not exceeded now, resume this
query"
- << ", process memory info: "
- <<
GlobalMemoryArbitrator::process_memory_used_details_str()
- << ", wg info: " << wg->debug_string();
- requestor->task_controller()->set_memory_sufficient(true);
- return true;
- } else if (time_in_queue >
config::spill_in_paused_queue_timeout_ms) {
- // if cannot find any memory to release, then let the query
continue to run as far as possible
- // or cancelled by gc if memory is really not enough.
- auto log_str = fmt::format(
- "Query {} process memory is exceeded"
- ", and there is no cache now. And could not find task
to spill, disable "
- "reserve memory and resume it. "
- "Query memory usage: {}, limit: {}, reserved "
- "size: {}, try to reserve: {}, wg info: {}."
- " Maybe you should set the workload group's limit to a
lower value. {}",
- query_id, PrettyPrinter::print_bytes(memory_usage),
- PrettyPrinter::print_bytes(limit),
- PrettyPrinter::print_bytes(reserved_size),
- PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string(),
- doris::ProcessProfile::instance()
- ->memory_profile()
- ->process_memory_detail_str());
- LOG_LONG_STRING(INFO, log_str);
- requestor->task_controller()->disable_reserve_memory();
- requestor->task_controller()->set_memory_sufficient(true);
- } else {
- return false;
- }
- }
- } else {
+ if (!revocable_tasks.empty()) {
SCOPED_ATTACH_TASK(requestor);
auto status = requestor->task_controller()->revoke_memory();
if (!status.ok()) {
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- requestor->task_controller()->task_id(), status);
+ requestor->task_controller()->cancel(status);
+ }
+ return true;
+ }
+ const auto limit = requestor->memory_context()->mem_limit();
+ const auto reserved_size =
requestor->memory_context()->reserved_consumption();
+ if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+ // During waiting time, another operator in the query may finished and
release
+ // many memory and we could run.
+ if ((memory_usage + size_to_reserve) < limit) {
+ LOG(INFO) << "Query: " << query_id << ", usage("
+ << PrettyPrinter::print_bytes(memory_usage) << " + " <<
size_to_reserve
+ << ") less than limit(" <<
PrettyPrinter::print_bytes(limit)
+ << "), resume it.";
+ requestor->task_controller()->set_memory_sufficient(true);
+ return true;
+ } else {
+ // if cannot find any memory to release, then let the query
continue to run as far as possible.
+ // after `disable_reserve_memory`, the query will not enter the
paused state again,
+ // if the memory is really insufficient, Allocator will throw an
exception
+ // of query memory limit exceed and the query will be canceled,
+ // or it will be canceled by memory gc when the process memory
exceeds the limit.
+ auto log_str = fmt::format(
+ "Query {} memory limit is exceeded, but could "
+ "not find memory that could release or spill to disk,
disable reserve "
+ "memory and resume it. Query memory usage: "
+ "{}, limit: {}, reserved "
+ "size: {}, try to reserve: {}, wg info: {}. {}",
+ query_id, PrettyPrinter::print_bytes(memory_usage),
+ PrettyPrinter::print_bytes(limit),
PrettyPrinter::print_bytes(reserved_size),
+ PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string(),
+ doris::ProcessProfile::instance()
+ ->memory_profile()
+ ->process_memory_detail_str());
+ LOG_LONG_STRING(INFO, log_str);
+ // Disable reserve memory will enable query level memory check, if
the query
+ // need a lot of memory than the memory limit, it will be killed.
+ // Do not need set memlimit = ajusted_mem_limit because workload
group refresher thread
+ // will update automatically.
+ requestor->task_controller()->disable_reserve_memory();
+ requestor->task_controller()->set_memory_sufficient(true);
+ return true;
+ }
+ } else if (paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+ if (!wg->exceed_limit()) {
+ LOG(INFO) << "Query: " << query_id
+ << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED,
now resume it.";
+ requestor->task_controller()->set_memory_sufficient(true);
+ return true;
+ } else {
+ Status error_status = Status::MemoryLimitExceeded(
+ "Query {} workload group memory is exceeded"
+ ", and there is no cache now. And could not find task to
spill, "
+ "try to cancel query. "
+ "Query memory usage: {}, limit: {}, reserved "
+ "size: {}, try to reserve: {}, wg info: {}."
+ " Maybe you should set the workload group's limit to a
lower value. {}",
+ query_id, PrettyPrinter::print_bytes(memory_usage),
+ PrettyPrinter::print_bytes(limit),
PrettyPrinter::print_bytes(reserved_size),
+ PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string(),
+ doris::ProcessProfile::instance()
+ ->memory_profile()
+ ->process_memory_detail_str());
+ LOG_LONG_STRING(INFO, error_status.to_string());
+ requestor->task_controller()->cancel(error_status);
+ return true;
+ }
+ } else {
+ // Should not consider about process memory. For example, the query's
limit is 100g, workload
+ // group's memlimit is 10g, process memory is 20g. The query reserve
will always failed in wg
+ // limit, and process is always have memory, so that it will resume
and failed reserve again.
+ const size_t test_memory_size = std::max<size_t>(size_to_reserve, 32L
* 1024 * 1024);
+ if
(!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(test_memory_size)) {
+ LOG(INFO) << "Query: " << query_id
+ << ", process limit not exceeded now, resume this query"
+ << ", process memory info: "
+ <<
GlobalMemoryArbitrator::process_memory_used_details_str()
+ << ", wg info: " << wg->debug_string();
+ requestor->task_controller()->set_memory_sufficient(true);
+ return true;
+ } else {
+ // if cannot find any memory to release, then let the query
continue to run as far as possible
+ // or cancelled by gc if memory is really not enough.
+ Status error_status = Status::MemoryLimitExceeded(
+ "Query {} process memory is exceeded"
+ ", and there is no cache now. And could not find task to
spill, disable "
+ "reserve memory and resume it. "
+ "Query memory usage: {}, limit: {}, reserved "
+ "size: {}, try to reserve: {}, wg info: {}."
+ " Maybe you should set the workload group's limit to a
lower value. {}",
+ query_id, PrettyPrinter::print_bytes(memory_usage),
+ PrettyPrinter::print_bytes(limit),
PrettyPrinter::print_bytes(reserved_size),
+ PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string(),
+ doris::ProcessProfile::instance()
+ ->memory_profile()
+ ->process_memory_detail_str());
+ LOG_LONG_STRING(INFO, error_status.to_string());
+ requestor->task_controller()->cancel(error_status);
+ return true;
}
}
- return true;
}
void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool
enable_hard_limit) {
diff --git a/be/src/runtime/workload_management/query_task_controller.cpp
b/be/src/runtime/workload_management/query_task_controller.cpp
index 43ef520b794..02385c2e4af 100644
--- a/be/src/runtime/workload_management/query_task_controller.cpp
+++ b/be/src/runtime/workload_management/query_task_controller.cpp
@@ -120,6 +120,15 @@ size_t QueryTaskController::get_revocable_size() {
return revocable_size;
}
+void QueryTaskController::disable_reserve_memory() {
+ TaskController::disable_reserve_memory();
+ auto query_ctx = query_ctx_.lock();
+ if (query_ctx == nullptr) {
+ return;
+ }
+ query_ctx->query_mem_tracker()->set_enable_check_limit(true);
+}
+
Status QueryTaskController::revoke_memory() {
auto query_ctx = query_ctx_.lock();
if (query_ctx == nullptr) {
diff --git a/be/src/runtime/workload_management/query_task_controller.h
b/be/src/runtime/workload_management/query_task_controller.h
index 64681177f92..c217572138e 100644
--- a/be/src/runtime/workload_management/query_task_controller.h
+++ b/be/src/runtime/workload_management/query_task_controller.h
@@ -37,6 +37,7 @@ public:
bool cancel_impl(const Status& reason) override { return
cancel_impl(reason, -1); }
bool is_pure_load_task() const override;
int32_t get_slot_count() const override;
+ void disable_reserve_memory() override;
bool is_enable_reserve_memory() const override;
void set_memory_sufficient(bool sufficient) override;
int64_t memory_sufficient_time() override;
diff --git a/be/src/runtime/workload_management/task_controller.h
b/be/src/runtime/workload_management/task_controller.h
index de3a3db6fef..c1d9fad4587 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -96,7 +96,7 @@ public:
virtual bool is_pure_load_task() const { return false; }
void set_low_memory_mode(bool low_memory_mode) { low_memory_mode_ =
low_memory_mode; }
bool low_memory_mode() { return low_memory_mode_; }
- void disable_reserve_memory() { enable_reserve_memory_ = false; }
+ virtual void disable_reserve_memory() { enable_reserve_memory_ = false; }
virtual bool is_enable_reserve_memory() const { return
enable_reserve_memory_; }
virtual void set_memory_sufficient(bool sufficient) {};
virtual int64_t memory_sufficient_time() { return 0; };
diff --git a/be/test/pipeline/pipeline_task_test.cpp
b/be/test/pipeline/pipeline_task_test.cpp
index 987faaa13ec..8a8e16e3c4d 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -750,9 +750,11 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
-
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
-
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
- EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+ // Not check low memory mode here, because we temporary not use this
feature, the
+ // system buffer should be checked globally.
+ //
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+ //
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+ //
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
@@ -767,9 +769,9 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
-
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
-
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
- EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+ //
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+ //
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+ //
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
@@ -780,6 +782,9 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
}
}
+// Test for reserve memory fail for non-spillable task. It will not affect
anything, the query
+// will continue to run. And will disable reserve memory, so that the query
will failed when allocated
+// memory > limit.
TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
{
_query_options = TQueryOptionsBuilder()
@@ -787,6 +792,7 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.set_enable_reserve_memory(true)
+ .set_enable_spill(false)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
@@ -882,8 +888,184 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
}
+ {
+ // Reserve failed and but not enable spill disk, so that the query
will continue to run.
+ read_dep->set_ready();
+
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+ EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ EXPECT_FALSE(task->_spilling);
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+ EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+ EXPECT_FALSE(task->_eos);
+ // Not enable spill disk, so that task will not be paused.
+ EXPECT_FALSE(task->_spilling);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_TRUE(source_finish_dep->ready());
+
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
+ EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
+ EXPECT_FALSE(
+
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
+ }
+ {
+ // Reserve failed .
+ task->_operators.front()->cast<DummyOperator>()._disable_reserve_mem =
true;
+ task->_spilling = false;
+ task->_operators.front()->cast<DummyOperator>()._eos = true;
+
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused
= false;
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+ EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+ EXPECT_TRUE(task->_eos);
+ EXPECT_FALSE(task->_spilling);
+ EXPECT_TRUE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_TRUE(source_finish_dep->ready());
+ EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
+
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ EXPECT_FALSE(
+
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
+ }
{
// Reserve failed and paused.
+
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused
= false;
+ task->_sink->cast<DummySinkOperatorX>()._disable_reserve_mem = true;
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+ EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+ EXPECT_TRUE(task->_eos);
+ EXPECT_FALSE(task->_spilling);
+ EXPECT_TRUE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_TRUE(source_finish_dep->ready());
+
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
+ EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ EXPECT_FALSE(
+
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
+ }
+ delete ExecEnv::GetInstance()->_workload_group_manager;
+}
+
+// Test reserve memory fail for spillable pipeline task
+TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL_SPILLABLE) {
+ {
+ _query_options = TQueryOptionsBuilder()
+ .set_enable_local_exchange(true)
+ .set_enable_local_shuffle(true)
+ .set_runtime_filter_max_in_num(15)
+ .set_enable_reserve_memory(true)
+ .set_enable_spill(true)
+ .build();
+ auto fe_address = TNetworkAddress();
+ fe_address.hostname = LOCALHOST;
+ fe_address.port = DUMMY_PORT;
+ _query_ctx =
+ QueryContext::create(_query_id, ExecEnv::GetInstance(),
_query_options, fe_address,
+ true, fe_address,
QuerySource::INTERNAL_FRONTEND);
+ _task_scheduler = std::make_unique<MockTaskScheduler>();
+ _query_ctx->_task_scheduler = _task_scheduler.get();
+ _build_fragment_context();
+
+ TWorkloadGroupInfo twg_info;
+ twg_info.__set_id(0);
+ twg_info.__set_name("_dummpy_workload_group");
+ twg_info.__set_version(0);
+
+ WorkloadGroupInfo workload_group_info =
WorkloadGroupInfo::parse_topic_info(twg_info);
+
+ ((MockRuntimeState*)_runtime_state.get())->_workload_group =
+ std::make_shared<WorkloadGroup>(workload_group_info);
+
((MockThreadMemTrackerMgr*)thread_context()->thread_mem_tracker_mgr.get())
+ ->_test_low_memory = true;
+
+ ExecEnv::GetInstance()->_workload_group_manager = new
MockWorkloadGroupMgr();
+ EXPECT_TRUE(_runtime_state->enable_spill());
+ }
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ Dependency* read_dep;
+ Dependency* write_dep;
+ Dependency* source_finish_dep;
+ {
+ OperatorPtr source_op;
+ // 1. create and set the source operator of
multi_cast_data_stream_source for new pipeline
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ sink_op->_spillable = true;
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ {
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ EXPECT_GT(task->_execution_dependencies.size(), 1);
+ read_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
+ .value()
+ ->dependencies()
+ .front();
+ write_dep =
_runtime_state->get_sink_local_state()->dependencies().front();
+ }
+ {
+ _query_ctx->get_execution_dependency()->set_ready();
+ // Task is blocked by read dependency.
+ read_dep->block();
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_FALSE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_FALSE(task->_read_dependencies.empty());
+ EXPECT_FALSE(task->_write_dependencies.empty());
+ EXPECT_FALSE(task->_finish_dependencies.empty());
+ EXPECT_TRUE(task->_opened);
+ EXPECT_FALSE(read_dep->ready());
+ EXPECT_TRUE(write_dep->ready());
+ EXPECT_FALSE(read_dep->_blocked_task.empty());
+ source_finish_dep =
+
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
+ .value()
+ ->finishdependency();
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+ }
+ {
+ task->_operators.front()->cast<DummyOperator>()._revocable_mem_size =
+ vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
+ task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
+ vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
+ }
+ {
+ // Reserve failed and enable spill disk, so that the query be paused.
read_dep->set_ready();
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
@@ -896,9 +1078,11 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_FALSE(task->_eos);
+ // Not enable spill disk, so that task will not be paused.
EXPECT_TRUE(task->_spilling);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
+
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_TRUE(
@@ -921,14 +1105,15 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
+
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_TRUE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
- // Reserve failed and paused.
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused
= false;
+ // Disable reserve memory, so that the get_reserve_mem_size == 0, so
that reserve will always success
task->_sink->cast<DummySinkOperatorX>()._disable_reserve_mem = true;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
@@ -940,6 +1125,7 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
+
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
diff --git a/be/test/pipeline/thrift_builder.h
b/be/test/pipeline/thrift_builder.h
index 1f792553e66..cf03d609f0c 100644
--- a/be/test/pipeline/thrift_builder.h
+++ b/be/test/pipeline/thrift_builder.h
@@ -96,6 +96,11 @@ public:
return *this;
}
+ TQueryOptionsBuilder& set_enable_spill(int64_t enable_spill) {
+ _query_options.__set_enable_spill(enable_spill);
+ return *this;
+ }
+
TQueryOptions& build() { return _query_options; }
TQueryOptionsBuilder(const TQueryOptionsBuilder&) = delete;
diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp
b/be/test/runtime/workload_group/workload_group_manager_test.cpp
index d3c589552f2..f81488928c9 100644
--- a/be/test/runtime/workload_group/workload_group_manager_test.cpp
+++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp
@@ -231,8 +231,8 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed3) {
query_context->query_mem_tracker()->consume(-1024L * 1024 * 4);
- // Query was not cancelled, because the query's limit is bigger than the
wg's limit and the wg's policy is NONE.
- ASSERT_FALSE(query_context->is_cancelled());
+ // In the wg's policy is NONE. If the query reserve memory failed and
revocable memory == 0, just cancel it.
+ ASSERT_TRUE(query_context->is_cancelled());
// Its limit == workload group's limit
ASSERT_EQ(query_context->resource_ctx()->memory_context()->mem_limit(),
wg->memory_limit());
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index aabebaa0b1b..65ae7e876b4 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -146,7 +146,7 @@ struct TQueryOptions {
// if set, this will overwrite the BE config.
30: optional i32 max_pushdown_conditions_per_column
// whether enable spilling to disk
- 31: optional bool enable_spilling = false;
+ // 31: optional bool enable_spilling = false;
// whether enable parallel merge in exchange node
32: optional bool enable_enable_exchange_node_parallel_merge = false; //
deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]