This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7229aeac549 [refactor](wg&memtracker) using weak ptr to delete
memtracker and query context automatically (#41549)
7229aeac549 is described below
commit 7229aeac5493601b09f28ea15e1a898ca59565b6
Author: yiguolei <[email protected]>
AuthorDate: Wed Oct 9 23:37:05 2024 +0800
[refactor](wg&memtracker) using weak ptr to delete memtracker and query
context automatically (#41549)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/common/daemon.cpp | 1 +
be/src/runtime/load_channel.cpp | 7 --
be/src/runtime/load_channel.h | 1 -
be/src/runtime/memory/mem_tracker_limiter.h | 3 -
be/src/runtime/query_context.cpp | 2 -
be/src/runtime/workload_group/workload_group.cpp | 84 +++++++++++++---------
be/src/runtime/workload_group/workload_group.h | 15 +---
.../workload_group/workload_group_manager.cpp | 7 ++
.../workload_group/workload_group_manager.h | 2 +
9 files changed, 63 insertions(+), 59 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 5da49758865..27fbfb71d7f 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -296,6 +296,7 @@ void Daemon::memory_maintenance_thread() {
// TODO replace memory_gc_thread.
// step 6. Refresh weighted memory ratio of workload groups.
+ doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
// step 7. Analyze blocking queries.
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index f8c11639719..1ac7753b197 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -64,7 +64,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
timeout_s, bool is_hig
if (workload_group_ptr) {
wg_ptr = workload_group_ptr;
wg_ptr->add_mem_tracker_limiter(mem_tracker);
- _need_release_memtracker = true;
}
}
}
@@ -85,12 +84,6 @@ LoadChannel::~LoadChannel() {
rows_str << ", index id: " << entry.first << ", total_received_rows: "
<< entry.second.first
<< ", num_rows_filtered: " << entry.second.second;
}
- if (_need_release_memtracker) {
- WorkloadGroupPtr wg_ptr =
_query_thread_context.get_workload_group_ptr();
- if (wg_ptr) {
-
wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker());
- }
- }
LOG(INFO) << "load channel removed"
<< " load_id=" << _load_id << ", is high priority=" <<
_is_high_priority
<< ", sender_ip=" << _sender_ip << rows_str.str();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 6fad8c536ec..6c150ed74d9 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -127,7 +127,6 @@ private:
int64_t _backend_id;
bool _enable_profile;
- bool _need_release_memtracker = false;
};
inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index faf354cca4c..251a7c25a74 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -123,9 +123,6 @@ public:
bool is_query_cancelled() { return _is_query_cancelled; }
void set_is_query_cancelled(bool is_cancelled) {
_is_query_cancelled.store(is_cancelled); }
- // Iterator into mem_tracker_limiter_pool for this object. Stored to have
O(1) remove.
- std::list<std::weak_ptr<MemTrackerLimiter>>::iterator
wg_tracker_limiter_group_it;
-
/*
* Part 3, Memory tracking method (use carefully!)
*
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 8931854897e..c602dc683fe 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -159,8 +159,6 @@ QueryContext::~QueryContext() {
uint64_t group_id = 0;
if (_workload_group) {
group_id = _workload_group->id(); // before remove
- _workload_group->remove_mem_tracker_limiter(query_mem_tracker);
- _workload_group->remove_query(_query_id);
}
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 6f3b51f09fd..0488e9ec83c 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -144,21 +144,32 @@ void WorkloadGroup::check_and_update(const
WorkloadGroupInfo& tg_info) {
}
}
+// MemtrackerLimiter is not removed during query context release, so that
should remove it here.
int64_t WorkloadGroup::make_memory_tracker_snapshots(
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
int64_t used_memory = 0;
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
- for (const auto& trackerWptr : mem_tracker_group.trackers) {
- auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- if (tracker_snapshots != nullptr) {
- tracker_snapshots->insert(tracker_snapshots->end(), tracker);
+ for (auto trackerWptr = mem_tracker_group.trackers.begin();
+ trackerWptr != mem_tracker_group.trackers.end();) {
+ auto tracker = trackerWptr->lock();
+ if (tracker == nullptr) {
+ trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
+ } else {
+ if (tracker_snapshots != nullptr) {
+ tracker_snapshots->insert(tracker_snapshots->end(),
tracker);
+ }
+ used_memory += tracker->consumption();
+ ++trackerWptr;
}
- used_memory += tracker->consumption();
}
}
- refresh_memory(used_memory);
+ // refresh total memory used.
+ _total_mem_used = used_memory;
+ // reserve memory is recorded in the query mem tracker
+ // and _total_mem_used already contains all the current reserve memory.
+ // so after refreshing _total_mem_used, reset
_wg_refresh_interval_memory_growth.
+ _wg_refresh_interval_memory_growth.store(0.0);
_mem_used_status->set_value(used_memory);
return used_memory;
}
@@ -167,35 +178,38 @@ int64_t WorkloadGroup::memory_used() {
return make_memory_tracker_snapshots(nullptr);
}
-void WorkloadGroup::refresh_memory(int64_t used_memory) {
- // refresh total memory used.
- _total_mem_used = used_memory;
- // reserve memory is recorded in the query mem tracker
- // and _total_mem_used already contains all the current reserve memory.
- // so after refreshing _total_mem_used, reset
_wg_refresh_interval_memory_growth.
- _wg_refresh_interval_memory_growth.store(0.0);
-}
+void WorkloadGroup::do_sweep() {
+ // Clear memtracker limiter that is registered during query or load.
+ for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
+ std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
+ for (auto trackerWptr = mem_tracker_group.trackers.begin();
+ trackerWptr != mem_tracker_group.trackers.end();) {
+ auto tracker = trackerWptr->lock();
+ if (tracker == nullptr) {
+ trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
+ } else {
+ ++trackerWptr;
+ }
+ }
+ }
-void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
+ // Clear query context that is registered during query context ctor
std::unique_lock<std::shared_mutex> wlock(_mutex);
- auto group_num = mem_tracker_ptr->group_num();
- std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
- mem_tracker_ptr->wg_tracker_limiter_group_it =
- _mem_tracker_limiter_pool[group_num].trackers.insert(
- _mem_tracker_limiter_pool[group_num].trackers.end(),
mem_tracker_ptr);
+ for (auto iter = _query_ctxs.begin(); iter != _query_ctxs.end();) {
+ if (iter->second.lock() == nullptr) {
+ iter = _query_ctxs.erase(iter);
+ } else {
+ iter++;
+ }
+ }
}
-void
WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
+void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
- if (mem_tracker_ptr->wg_tracker_limiter_group_it !=
- _mem_tracker_limiter_pool[group_num].trackers.end()) {
- _mem_tracker_limiter_pool[group_num].trackers.erase(
- mem_tracker_ptr->wg_tracker_limiter_group_it);
- mem_tracker_ptr->wg_tracker_limiter_group_it =
- _mem_tracker_limiter_pool[group_num].trackers.end();
- }
+ _mem_tracker_limiter_pool[group_num].trackers.insert(
+ _mem_tracker_limiter_pool[group_num].trackers.end(),
mem_tracker_ptr);
}
int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile*
profile, bool is_minor_gc) {
@@ -230,14 +244,16 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem,
RuntimeProfile* profile,
auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
const std::string& label) {
return fmt::format(
- "{} cancel top memory overcommit tracker <{}> consumption {}.
details:{}, Execute "
+ "{} cancel top memory overcommit tracker <{}> consumption {}.
details:{}, "
+ "Execute "
"again after enough memory, details see be.INFO.",
cancel_str, label, MemCounter::print_bytes(mem_consumption),
GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
};
auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const
std::string& label) {
return fmt::format(
- "{} cancel top memory used tracker <{}> consumption {}.
details:{}, Execute again "
+ "{} cancel top memory used tracker <{}> consumption {}.
details:{}, Execute "
+ "again "
"after enough memory, details see be.INFO.",
cancel_str, label, MemCounter::print_bytes(mem_consumption),
GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
@@ -249,7 +265,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem,
RuntimeProfile* profile,
_id, _name, _memory_limit, used_memory, need_free_mem);
Defer defer {[&]() {
LOG(INFO) << fmt::format(
- "[MemoryGC] work load group finished gc, id:{} name:{}, memory
limit: {}, used: "
+ "[MemoryGC] work load group finished gc, id:{} name:{}, memory
limit: {}, "
+ "used: "
"{}, need_free_mem: {}, freed memory: {}.",
_id, _name, _memory_limit, used_memory, need_free_mem,
freed_mem);
}};
@@ -542,7 +559,8 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
_cgroup_cpu_ctl->update_cpu_soft_limit(
CgroupCpuCtl::cpu_soft_limit_default_value());
} else {
- LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is illegal: "
+ LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is "
+ "illegal: "
<< cpu_hard_limit << ", gid=" << tg_id;
}
} else {
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 2fbb4dd3030..933c5afdb4e 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -89,7 +89,8 @@ public:
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
// call make_memory_tracker_snapshots, so also refresh total memory used.
int64_t memory_used();
- void refresh_memory(int64_t used_memory);
+
+ void do_sweep();
int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
@@ -132,8 +133,6 @@ public:
void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
- void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
-
// when mem_limit <=0 , it's an invalid value, then current group not
participating in memory GC
// because mem_limit is not a required property
bool is_mem_limit_valid() {
@@ -154,11 +153,6 @@ public:
return Status::OK();
}
- void remove_query(TUniqueId query_id) {
- std::unique_lock<std::shared_mutex> wlock(_mutex);
- _query_ctxs.erase(query_id);
- }
-
void shutdown() {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_is_shutdown = true;
@@ -169,11 +163,6 @@ public:
return _is_shutdown && _query_ctxs.empty();
}
- int query_num() {
- std::shared_lock<std::shared_mutex> r_lock(_mutex);
- return _query_ctxs.size();
- }
-
int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool
is_minor_gc);
void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 65a8e3685c8..003f07f1db0 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -136,6 +136,13 @@ void
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
<< ", before wg size=" << old_wg_size << ", after wg size=" <<
new_wg_size;
}
+void WorkloadGroupMgr::do_sweep() {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ for (auto& [wg_id, wg] : _workload_groups) {
+ wg->do_sweep();
+ }
+}
+
struct WorkloadGroupMemInfo {
int64_t total_mem_used = 0;
std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index d8547c3383e..f76e98d2606 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -50,6 +50,8 @@ public:
WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);
+ void do_sweep();
+
void stop();
std::atomic<bool> _enable_cpu_hard_limit = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]