This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 41f9ee2f9e mem_tracker_factor_v2 (#10743) 41f9ee2f9e is described below commit 41f9ee2f9e07864bfa5d2a1512db336c83802496 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Jul 12 18:09:41 2022 +0800 mem_tracker_factor_v2 (#10743) --- be/src/runtime/memory/mem_tracker_base.cpp | 53 ++++ be/src/runtime/memory/mem_tracker_base.h | 78 ++++++ be/src/runtime/memory/mem_tracker_limiter.cpp | 333 +++++++++++++++++++++++ be/src/runtime/memory/mem_tracker_limiter.h | 348 ++++++++++++++++++++++++ be/src/runtime/memory/mem_tracker_observe.cpp | 87 ++++++ be/src/runtime/memory/mem_tracker_observe.h | 91 +++++++ be/src/runtime/memory/mem_tracker_task_pool.cpp | 153 +++++++++++ be/src/runtime/memory/mem_tracker_task_pool.h | 58 ++++ 8 files changed, 1201 insertions(+) diff --git a/be/src/runtime/memory/mem_tracker_base.cpp b/be/src/runtime/memory/mem_tracker_base.cpp new file mode 100644 index 0000000000..bb407e2bf8 --- /dev/null +++ b/be/src/runtime/memory/mem_tracker_base.cpp @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.cpp +// and modified by Doris + +#include "runtime/memory/mem_tracker_base.h" + +#include "util/time.h" + +namespace doris { + +const std::string MemTrackerBase::COUNTER_NAME = "PeakMemoryUsage"; + +MemTrackerBase::MemTrackerBase(const std::string& label, MemTrackerLimiter* parent, + RuntimeProfile* profile) + : _label(label), + // Not 100% sure the id is unique. This is generated because it is faster than converting to int after hash. + _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()), + _parent(parent) { + if (profile == nullptr) { + _consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES); + } else { + // By default, memory consumption is tracked via calls to consume()/release(), either to + // the tracker itself or to one of its descendents. Alternatively, a consumption metric + // can be specified, and then the metric's value is used as the consumption rather than + // the tally maintained by consume() and release(). A tcmalloc metric is used to track + // process memory consumption, since the process memory usage may be higher than the + // computed total memory (tcmalloc does not release deallocated memory immediately). + // Other consumption metrics are used in trackers below the process level to account + // for memory (such as free buffer pool buffers) that is not tracked by consume() and + // release(). + _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); + } +} + +MemTrackerBase::MemTrackerBase(const std::string& label) + : MemTrackerBase(label, nullptr, nullptr) {} +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_base.h b/be/src/runtime/memory/mem_tracker_base.h new file mode 100644 index 0000000000..10d554839b --- /dev/null +++ b/be/src/runtime/memory/mem_tracker_base.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.h +// and modified by Doris + +#pragma once + +#include "util/runtime_profile.h" + +namespace doris { + +class MemTrackerLimiter; + +// A MemTracker tracks memory consumption. +// This class is thread-safe. +class MemTrackerBase { +public: + const std::string& label() const { return _label; } + + // Returns the memory consumed in bytes. + int64_t consumption() const { return _consumption->current_value(); } + int64_t peak_consumption() const { return _consumption->value(); } + + MemTrackerBase(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile); + + // this is used for creating an orphan mem tracker, or for unit test. + // If a mem tracker has parent, it should be created by `create_tracker()` + MemTrackerBase(const std::string& label = std::string()); + + MemTrackerLimiter* parent() const { return _parent; } + int64_t id() { return _id; } + bool is_limited() { return _is_limited; } // MemTrackerLimiter + bool is_observed() { return _is_observed; } + void set_is_limited() { _is_limited = true; } // MemTrackerObserve + void set_is_observed() { _is_observed = true; } + + // Usually, a negative values means that the statistics are not accurate, + // 1. The released memory is not consumed. + // 2. The same block of memory, tracker A calls consume, and tracker B calls release. + // 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker, + // after the release is called on the parent MemTracker, + // the child ~MemTracker will cause repeated releases. + void memory_leak_check() { DCHECK_EQ(_consumption->current_value(), 0); } + + static const std::string COUNTER_NAME; + +protected: + // label used in the usage string (log_usage()) + std::string _label; + + // Automatically generated, unique for each mem tracker. + int64_t _id; + + std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _consumption; // in bytes + + bool _is_limited = false; // is MemTrackerLimiter + + bool _is_observed = false; // is MemTrackerObserve + + MemTrackerLimiter* _parent; // The parent of this tracker. +}; + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp new file mode 100644 index 0000000000..6193558ab0 --- /dev/null +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/mem_tracker_limiter.h" + +#include <fmt/format.h> + +#include "gutil/once.h" +#include "runtime/memory/mem_tracker_observe.h" +#include "runtime/thread_context.h" +#include "service/backend_options.h" +#include "util/pretty_printer.h" +#include "util/string_util.h" + +namespace doris { + +// The ancestor for all trackers. Every tracker is visible from the process down. +// All manually created trackers should specify the process tracker as the parent. +static MemTrackerLimiter* process_tracker; +static GoogleOnceType process_tracker_once = GOOGLE_ONCE_INIT; + +MemTrackerLimiter* MemTrackerLimiter::create_tracker(int64_t byte_limit, const std::string& label, + MemTrackerLimiter* parent, + RuntimeProfile* profile) { + // Do not check limit exceed when add_child_tracker, otherwise it will cause deadlock when log_usage is called. + STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); + if (!parent) { + parent = MemTrackerLimiter::get_process_tracker(); + } + MemTrackerLimiter* tracker(new MemTrackerLimiter("[Limit]-" + label, parent, profile)); + parent->add_child_tracker(tracker); + tracker->set_is_limited(); + tracker->init(byte_limit); + return tracker; +} + +void MemTrackerLimiter::init(int64_t limit) { + DCHECK_GE(limit, -1); + _limit = limit; + MemTrackerLimiter* tracker = this; + while (tracker != nullptr) { + _ancestor_all_trackers.push_back(tracker); + if (tracker->has_limit()) _ancestor_limiter_trackers.push_back(tracker); + tracker = tracker->_parent; + } + DCHECK_GT(_ancestor_all_trackers.size(), 0); + DCHECK_EQ(_ancestor_all_trackers[0], this); +} + +MemTrackerLimiter::~MemTrackerLimiter() { + // TCMalloc hook will be triggered during destructor memtracker, may cause crash. + if (_label == "Process") doris::thread_local_ctx._init = false; + flush_untracked_mem(); + if (parent()) { + // Do not call release on the parent tracker to avoid repeated releases. + // Ensure that all consume/release are triggered by TCMalloc new/delete hook. + std::lock_guard<SpinLock> l(_parent->_child_trackers_lock); + if (_child_tracker_it != _parent->_child_limiter_trackers.end()) { + _parent->_child_limiter_trackers.erase(_child_tracker_it); + _child_tracker_it = _parent->_child_limiter_trackers.end(); + } + } + // The child observe tracker life cycle is controlled by its parent limiter tarcker. + for (audo tracker : _child_observe_trackers) { + delete tracker; + } + DCHECK_EQ(_untracked_mem, 0); +} + +void MemTrackerLimiter::add_child_tracker(MemTrackerLimiter* tracker) { + std::lock_guard<SpinLock> l(_child_trackers_lock); + tracker->_child_tracker_it = + _child_limiter_trackers.insert(_child_limiter_trackers.end(), tracker); +} + +void MemTrackerLimiter::add_child_tracker(MemTrackerObserve* tracker) { + std::lock_guard<SpinLock> l(_child_trackers_lock); + tracker->_child_tracker_it = + _child_observe_trackers.insert(_child_observe_trackers.end(), tracker); +} + +void MemTrackerLimiter::remove_child_tracker(MemTrackerLimiter* tracker) { + std::lock_guard<SpinLock> l(_child_trackers_lock); + if (tracker->_child_tracker_it != _child_limiter_trackers.end()) { + _child_limiter_trackers.erase(tracker->_child_tracker_it); + tracker->_child_tracker_it = _child_limiter_trackers.end(); + } +} + +void MemTrackerLimiter::remove_child_tracker(MemTrackerObserve* tracker) { + std::lock_guard<SpinLock> l(_child_trackers_lock); + if (tracker->_child_tracker_it != _child_observe_trackers.end()) { + _child_observe_trackers.erase(tracker->_child_tracker_it); + tracker->_child_tracker_it = _child_observe_trackers.end(); + } +} + +void MemTrackerLimiter::create_process_tracker() { + process_tracker = new MemTrackerLimiter("Process", nullptr, nullptr); + process_tracker->init(-1); +} + +MemTrackerLimiter* MemTrackerLimiter::get_process_tracker() { + GoogleOnceInit(&process_tracker_once, &MemTrackerLimiter::create_process_tracker); + return process_tracker; +} + +void MemTrackerLimiter::list_process_trackers(std::vector<MemTrackerBase*>* trackers) { + trackers->clear(); + std::deque<MemTrackerLimiter*> to_process; + to_process.push_front(get_process_tracker()); + while (!to_process.empty()) { + MemTrackerLimiter* t = to_process.back(); + to_process.pop_back(); + + trackers->push_back(t); + std::list<MemTrackerLimiter*> limiter_children; + std::list<MemTrackerObserve*> observe_children; + { + std::lock_guard<SpinLock> l(t->_child_trackers_lock); + limiter_children = t->_child_limiter_trackers; + observe_children = t->_child_observe_trackers; + } + for (const auto& child : limiter_children) { + to_process.emplace_back(std::move(child)); + } + if (config::show_observe_tracker) { + for (const auto& child : observe_children) { + trackers->push_back(child); + } + } + } +} + +MemTrackerLimiter* MemTrackerLimiter::common_ancestor(MemTrackerLimiter* dst) { + if (id() == dst->id()) return dst; + DCHECK_EQ(_ancestor_all_trackers.back(), dst->_ancestor_all_trackers.back()) + << "Must have same ancestor"; + int ancestor_idx = _ancestor_all_trackers.size() - 1; + int dst_ancestor_idx = dst->_ancestor_all_trackers.size() - 1; + while (ancestor_idx > 0 && dst_ancestor_idx > 0 && + _ancestor_all_trackers[ancestor_idx - 1] == + dst->_ancestor_all_trackers[dst_ancestor_idx - 1]) { + --ancestor_idx; + --dst_ancestor_idx; + } + return _ancestor_all_trackers[ancestor_idx]; +} + +MemTrackerLimiter* MemTrackerLimiter::limit_exceeded_tracker() const { + for (const auto& tracker : _ancestor_limiter_trackers) { + if (tracker->limit_exceeded()) { + return tracker; + } + } + return nullptr; +} + +int64_t MemTrackerLimiter::spare_capacity() const { + int64_t result = std::numeric_limits<int64_t>::max(); + for (const auto& tracker : _ancestor_limiter_trackers) { + int64_t mem_left = tracker->limit() - tracker->consumption(); + result = std::min(result, mem_left); + } + return result; +} + +int64_t MemTrackerLimiter::get_lowest_limit() const { + if (_ancestor_limiter_trackers.empty()) return -1; + int64_t min_limit = std::numeric_limits<int64_t>::max(); + for (const auto& tracker : _ancestor_limiter_trackers) { + DCHECK(tracker->has_limit()); + min_limit = std::min(min_limit, tracker->limit()); + } + return min_limit; +} + +bool MemTrackerLimiter::gc_memory(int64_t max_consumption) { + if (max_consumption < 0) return true; + std::lock_guard<std::mutex> l(_gc_lock); + int64_t pre_gc_consumption = consumption(); + // Check if someone gc'd before us + if (pre_gc_consumption < max_consumption) return false; + + int64_t curr_consumption = pre_gc_consumption; + // Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later. + const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; + // Try to free up some memory + for (int i = 0; i < _gc_functions.size(); ++i) { + // Try to free up the amount we are over plus some extra so that we don't have to + // immediately GC again. Don't free all the memory since that can be unnecessarily + // expensive. + int64_t bytes_to_free = curr_consumption - max_consumption + EXTRA_BYTES_TO_FREE; + _gc_functions[i](bytes_to_free); + curr_consumption = consumption(); + if (max_consumption - curr_consumption <= EXTRA_BYTES_TO_FREE) break; + } + + return curr_consumption > max_consumption; +} + +Status MemTrackerLimiter::try_gc_memory(int64_t bytes) { + if (UNLIKELY(gc_memory(_limit - bytes))) { + return Status::MemoryLimitExceeded( + fmt::format("label={} TryConsume failed size={}, used={}, limit={}", label(), bytes, + _consumption->current_value(), _limit)); + } + VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes + << " consumption=" << _consumption->current_value() << " limit=" << _limit; + return Status::OK(); +} + +// Calling this on the query tracker results in output like: +// +// Query(4a4c81fedaed337d:4acadfda00000000) Limit=10.00 GB Total=508.28 MB Peak=508.45 MB +// Fragment 4a4c81fedaed337d:4acadfda00000000: Total=8.00 KB Peak=8.00 KB +// EXCHANGE_NODE (id=4): Total=0 Peak=0 +// DataStreamRecvr: Total=0 Peak=0 +// Block Manager: Limit=6.68 GB Total=394.00 MB Peak=394.00 MB +// Fragment 4a4c81fedaed337d:4acadfda00000006: Total=233.72 MB Peak=242.24 MB +// AGGREGATION_NODE (id=1): Total=139.21 MB Peak=139.84 MB +// HDFS_SCAN_NODE (id=0): Total=93.94 MB Peak=102.24 MB +// DataStreamSender (dst_id=2): Total=45.99 KB Peak=85.99 KB +// Fragment 4a4c81fedaed337d:4acadfda00000003: Total=274.55 MB Peak=274.62 MB +// AGGREGATION_NODE (id=3): Total=274.50 MB Peak=274.50 MB +// EXCHANGE_NODE (id=2): Total=0 Peak=0 +// DataStreamRecvr: Total=45.91 KB Peak=684.07 KB +// DataStreamSender (dst_id=4): Total=680.00 B Peak=680.00 B +// +// If 'reservation_metrics_' are set, we ge a more granular breakdown: +// TrackerName: Limit=5.00 MB Reservation=5.00 MB OtherMemory=1.04 MB +// Total=6.04 MB Peak=6.45 MB +// +std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logged_consumption) { + // Make sure the consumption is up to date. + int64_t curr_consumption = consumption(); + int64_t peak_consumption = _consumption->value(); + if (logged_consumption != nullptr) *logged_consumption = curr_consumption; + + std::string detail = + "MemTracker log_usage Label: {}, Limit: {}, Total: {}, Peak: {}, Exceeded: {}"; + detail = fmt::format(detail, _label, PrettyPrinter::print(_limit, TUnit::BYTES), + PrettyPrinter::print(curr_consumption, TUnit::BYTES), + PrettyPrinter::print(peak_consumption, TUnit::BYTES), + limit_exceeded() ? "true" : "false"); + + // This call does not need the children, so return early. + if (max_recursive_depth == 0) return detail; + + // Recurse and get information about the children + int64_t child_consumption; + std::string child_trackers_usage; + std::list<MemTrackerLimiter*> limiter_children; + std::list<MemTrackerObserve*> observe_children; + { + std::lock_guard<SpinLock> l(_child_trackers_lock); + limiter_children = _child_limiter_trackers; + observe_children = _child_observe_trackers; + } + child_trackers_usage = log_usage(max_recursive_depth - 1, limiter_children, &child_consumption); + for (const auto& child : observe_children) { + child_trackers_usage += "\n" + child->log_usage(&child_consumption); + } + if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage; + return detail; +} + +std::string MemTrackerLimiter::log_usage(int max_recursive_depth, + const std::list<MemTrackerLimiter*>& trackers, + int64_t* logged_consumption) { + *logged_consumption = 0; + std::vector<std::string> usage_strings; + for (const auto& tracker : trackers) { + if (tracker) { + int64_t tracker_consumption; + std::string usage_string = + tracker->log_usage(max_recursive_depth, &tracker_consumption); + if (!usage_string.empty()) usage_strings.push_back(usage_string); + *logged_consumption += tracker_consumption; + } + } + return join(usage_strings, "\n"); +} + +Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& details, + int64_t failed_allocation_size, Status failed_alloc) { + STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); + MemTrackerLimiter* process_tracker = MemTrackerLimiter::get_process_tracker(); + std::string detail = + "Memory exceed limit. fragment={}, details={}, on backend={}. Memory left in process " + "limit={}."; + detail = fmt::format(detail, state != nullptr ? print_id(state->fragment_instance_id()) : "", + details, BackendOptions::get_localhost(), + PrettyPrinter::print(process_tracker->spare_capacity(), TUnit::BYTES)); + if (!failed_alloc) { + detail += " failed alloc=<{}>. current tracker={}."; + detail = fmt::format(detail, failed_alloc.to_string(), _label); + } else { + detail += " current tracker <label={}, used={}, limit={}, failed alloc size={}>."; + detail = fmt::format(detail, _label, _consumption->current_value(), _limit, + PrettyPrinter::print(failed_allocation_size, TUnit::BYTES)); + } + detail += " If this is a query, can change the limit by session variable exec_mem_limit."; + Status status = Status::MemoryLimitExceeded(detail); + if (state != nullptr) state->log_error(detail); + + // only print the tracker log_usage in be log. + if (process_tracker->spare_capacity() < failed_allocation_size) { + // Dumping the process MemTracker is expensive. Limiting the recursive depth to two + // levels limits the level of detail to a one-line summary for each query MemTracker. + detail += "\n" + process_tracker->log_usage(2); + } + detail += "\n" + log_usage(); + + LOG(WARNING) << detail; + return status; +} + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h new file mode 100644 index 0000000000..d5d937523f --- /dev/null +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -0,0 +1,348 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/config.h" +#include "runtime/memory/mem_tracker_base.h" +#include "runtime/runtime_state.h" +#include "util/mem_info.h" + +namespace doris { + +class MemTrackerObserve; + +// Tracker contains an limit, and can be arranged into a tree structure such that the consumption +// tracked by a MemTracker is also tracked by its ancestors. +// Used for: +// 1. Track and limit the memory usage of process and query. +// Automatic memory consume based on system memory allocation (Currently, based on TCMlloc hook). +// 2. Execution logic that requires memory size to participate in control. +// Manual consumption, but will not affect the overall statistics of the process. +// +// We use a five-level hierarchy of mem trackers: process, query pool, query, instance, +// node. Specific parts of the fragment (exec nodes, sinks, etc) will add a +// fifth level when they are initialized. +// +// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is +// reached. If limit_exceeded() is called and the limit is exceeded, it will first call +// the GcFunctions to try to free memory and recheck the limit. For example, the process +// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so +// this will be called before the process limit is reported as exceeded. GcFunctions are +// called in the order they are added, so expensive functions should be added last. +// GcFunctions are called with a global lock held, so should be non-blocking and not +// call back into MemTrackers, except to release memory. +class MemTrackerLimiter final : public MemTrackerBase { +public: + // Creates and adds the tracker to the tree + static MemTrackerLimiter* create_tracker(int64_t byte_limit, const std::string& label, + MemTrackerLimiter* parent = nullptr, + RuntimeProfile* profile = nullptr); + + // Walks the MemTrackerLimiter hierarchy and populates _ancestor_all_trackers and limit_trackers_ + void init(int64_t limit); + + ~MemTrackerLimiter(); + + // Adds tracker to _child_trackers + void add_child_tracker(MemTrackerLimiter* tracker); + void add_child_tracker(MemTrackerObserve* tracker); + // Remove tracker from _child_trackers + void remove_child_tracker(MemTrackerLimiter* tracker); + void remove_child_tracker(MemTrackerObserve* tracker); + + // Leaf tracker, without any child + bool is_leaf() { _child_limiter_trackers.size() + _child_observe_trackers.size() == 0; } + + // Gets a "process" tracker, creating it if necessary. + static MemTrackerLimiter* get_process_tracker(); + + // Returns a list of all the valid trackers. + static void list_process_trackers(std::vector<MemTrackerBase*>* trackers); + +public: + // The following func, for execution logic that requires memory size to participate in control. + // this does not change the value of process tracker. + + // only consume self, will not sync to parent. Usually used to manually record the specified memory, + // It is independent of the automatically recording of thread local tracker, so the same block of memory + // will be recorded in the thread local tracker and the current tracker at the same time. + void consume_self(int64_t bytes); + void release_self(int64_t bytes) { consume_self(-bytes); } + + // up to (but not including) end_tracker. + // This is useful if we want to move tracking between trackers that share a common (i.e. end_tracker) + // ancestor. This happens when we want to update tracking on a particular mem tracker but the consumption + // against the limit recorded in one of its ancestors already happened. + void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker); + void release_local(int64_t bytes, MemTrackerLimiter* end_tracker) { + consume_local(-bytes, end_tracker); + } + + // Transfer 'bytes' of consumption from this tracker to 'dst'. + // Forced transfer, 'dst' may limit exceed, and more ancestor trackers will be updated. + void transfer_to(MemTrackerLimiter* dst, int64_t bytes); + + // When the accumulated untracked memory value exceeds the upper limit, + // the current value is returned and set to 0. + // Thread safety. + int64_t add_untracked_mem(int64_t bytes); + + // In most cases, no need to call flush_untracked_mem on the child tracker, + // because when it is destructed, theoretically all its children have been destructed. + void flush_untracked_mem() { consume(_untracked_mem.exchange(0)); } + + // Find the common ancestor and update trackers between 'this'/'dst' and + // the common ancestor. This logic handles all cases, including the + // two trackers being the same or being ancestors of each other because + // 'all_trackers_' includes the current tracker. + MemTrackerLimiter* common_ancestor(MemTrackerLimiter* dst); + +public: + // The following func, for mem limit. + + Status check_sys_mem_info(int64_t bytes) { + // TODO add mmap + if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) { + return Status::MemoryLimitExceeded(fmt::format( + "{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}", + _label, bytes, MemInfo::current_mem(), MemInfo::mem_limit())); + } + return Status::OK(); + } + + bool has_limit() const { return _limit >= 0; } + int64_t limit() const { return _limit; } + void update_limit(int64_t limit) { + DCHECK(has_limit()); + _limit = limit; + } + bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); } + bool any_limit_exceeded() const { return limit_exceeded_tracker() != nullptr; } + + // Returns true if a valid limit of this tracker or one of its ancestors is exceeded. + MemTrackerLimiter* limit_exceeded_tracker() const; + + Status check_limit(int64_t bytes); + + // Returns the maximum consumption that can be made without exceeding the limit on + // this tracker or any of its parents. Returns int64_t::max() if there are no + // limits and a negative value if any limit is already exceeded. + int64_t spare_capacity() const; + + // Returns the lowest limit for this tracker and its ancestors. Returns -1 if there is no limit. + int64_t get_lowest_limit() const; + + typedef std::function<void(int64_t bytes_to_free)> GcFunction; + /// Add a function 'f' to be called if the limit is reached, if none of the other + /// previously-added GC functions were successful at freeing up enough memory. + /// 'f' does not need to be thread-safe as long as it is added to only one MemTrackerLimiter. + /// Note that 'f' must be valid for the lifetime of this MemTrackerLimiter. + void add_gc_function(GcFunction f) { _gc_functions.push_back(f); } + + // If consumption is higher than max_consumption, attempts to free memory by calling + // any added GC functions. Returns true if max_consumption is still exceeded. Takes gc_lock. + // Note: If the cache of segment/chunk is released due to insufficient query memory at a certain moment, + // the performance of subsequent queries may be degraded, so the use of gc function should be careful enough. + bool gc_memory(int64_t max_consumption); + Status try_gc_memory(int64_t bytes); + + /// Logs the usage of this tracker and optionally its children (recursively). + /// If 'logged_consumption' is non-nullptr, sets the consumption value logged. + /// 'max_recursive_depth' specifies the maximum number of levels of children + /// to include in the dump. If it is zero, then no children are dumped. + /// Limiting the recursive depth reduces the cost of dumping, particularly + /// for the process MemTracker. + std::string log_usage(int max_recursive_depth = INT_MAX, int64_t* logged_consumption = nullptr); + + // Log the memory usage when memory limit is exceeded and return a status object with + // details of the allocation which caused the limit to be exceeded. + // If 'failed_allocation_size' is greater than zero, logs the allocation size. If + // 'failed_allocation_size' is zero, nothing about the allocation size is logged. + // If 'state' is non-nullptr, logs the error to 'state'. + Status mem_limit_exceeded(RuntimeState* state, const std::string& details = std::string(), + int64_t failed_allocation = -1, Status failed_alloc = Status::OK()); + + std::string debug_string() { + std::stringstream msg; + msg << "limit: " << _limit << "; " + << "consumption: " << _consumption->current_value() << "; " + << "label: " << _label << "; " + << "all tracker size: " << _ancestor_all_trackers.size() << "; " + << "limit trackers size: " << _ancestor_limiter_trackers.size() << "; " + << "parent is null: " << ((_parent == nullptr) ? "true" : "false") << "; "; + return msg.str(); + } + +private: + // The following func, for automatic memory tracking and limiting based on system memory allocation. + friend class ThreadMemTrackerMgr; + + MemTrackerLimiter(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile) + : MemTrackerBase(label, parent, profile) {} + + // Creates the process tracker. + static void create_process_tracker(); + + // Increases consumption of this tracker and its ancestors by 'bytes'. + void consume(int64_t bytes); + + // Decreases consumption of this tracker and its ancestors by 'bytes'. + void release(int64_t bytes) { consume(-bytes); } + + // Increases consumption of this tracker and its ancestors by 'bytes' only if + // they can all consume 'bytes' without exceeding limit. If limit would be exceed, + // no MemTrackers are updated. Returns true if the consumption was successfully updated. + WARN_UNUSED_RESULT + Status try_consume(int64_t bytes); + + /// Log consumption of all the trackers provided. Returns the sum of consumption in + /// 'logged_consumption'. 'max_recursive_depth' specifies the maximum number of levels + /// of children to include in the dump. If it is zero, then no children are dumped. + static std::string log_usage(int max_recursive_depth, + const std::list<MemTrackerLimiter*>& trackers, + int64_t* logged_consumption); + +private: + // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。 + int64_t _limit; + + // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate + // to avoid frequent calls to consume/release of MemTracker. + std::atomic<int64_t> _untracked_mem = 0; + + // All the child trackers of this tracker. Used for error reporting and + // listing only (i.e. updating the consumption of a parent tracker does not + // update that of its children). + SpinLock _child_trackers_lock; + std::list<MemTrackerLimiter*> _child_limiter_trackers; + std::list<MemTrackerObserve*> _child_observe_trackers; + // Iterator into parent_->_child_limiter_trackers for this object. Stored to have O(1) remove. + std::list<MemTrackerLimiter*>::iterator _child_tracker_it; + + // this tracker plus all of its ancestors + std::vector<MemTrackerLimiter*> _ancestor_all_trackers; + // _ancestor_all_trackers with valid limits + std::vector<MemTrackerLimiter*> _ancestor_limiter_trackers; + + // Lock to protect gc_memory(). This prevents many GCs from occurring at once. + std::mutex _gc_lock; + // Functions to call after the limit is reached to free memory. + std::vector<GcFunction> _gc_functions; +}; + +inline void MemTrackerLimiter::consume(int64_t bytes) { + if (bytes == 0) { + return; + } else { + for (auto& tracker : _ancestor_all_trackers) { + tracker->_consumption->add(bytes); + } + } +} + +inline Status MemTrackerLimiter::try_consume(int64_t bytes) { + if (bytes <= 0) { + release(-bytes); + return Status::OK(); + } + RETURN_IF_ERROR(check_sys_mem_info(bytes)); + int i; + // Walk the tracker tree top-down. + for (i = _ancestor_all_trackers.size() - 1; i >= 0; --i) { + MemTrackerLimiter* tracker = _ancestor_all_trackers[i]; + if (tracker->limit() < 0) { + tracker->_consumption->add(bytes); // No limit at this tracker. + } else { + // If TryConsume fails, we can try to GC, but we may need to try several times if + // there are concurrent consumers because we don't take a lock before trying to + // update _consumption. + while (true) { + if (LIKELY(tracker->_consumption->try_add(bytes, tracker->limit()))) break; + Status st = tracker->try_gc_memory(bytes); + if (!st) { + // Failed for this mem tracker. Roll back the ones that succeeded. + for (int j = _ancestor_all_trackers.size() - 1; j > i; --j) { + _ancestor_all_trackers[j]->_consumption->add(-bytes); + } + return st; + } + } + } + } + // Everyone succeeded, return. + DCHECK_EQ(i, -1); + return Status::OK(); +} + +inline void MemTrackerLimiter::consume_self(int64_t bytes) { + int64_t consume_bytes = add_untracked_mem(bytes); + if (consume_bytes != 0) { + _consumption->add(consume_bytes); + } +} + +inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter* end_tracker) { + DCHECK(end_tracker); + if (bytes == 0) return; + for (auto& tracker : _ancestor_all_trackers) { + if (tracker == end_tracker) return; + tracker->consume_self(bytes); + } +} + +inline void MemTrackerLimiter::transfer_to(MemTrackerLimiter* dst, int64_t bytes) { + DCHECK(dst->is_limited()); + if (id() == dst->id()) return; + release_local(bytes, MemTrackerLimiter::get_process_tracker()); + dst->consume_local(bytes, MemTrackerLimiter::get_process_tracker()); +} + +inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { + _untracked_mem += bytes; + if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) { + return _untracked_mem.exchange(0); + } + return 0; +} + +inline Status MemTrackerLimiter::check_limit(int64_t bytes) { + if (bytes <= 0) return Status::OK(); + RETURN_IF_ERROR(check_sys_mem_info(bytes)); + int i; + // Walk the tracker tree top-down. + for (i = _ancestor_all_trackers.size() - 1; i >= 0; --i) { + MemTrackerLimiter* tracker = _ancestor_all_trackers[i]; + if (tracker->limit() > 0) { + while (true) { + if (LIKELY(tracker->_consumption->current_value() + bytes < tracker->limit())) + break; + RETURN_IF_ERROR(tracker->try_gc_memory(bytes)); + } + } + } + return Status::OK(); +} + +#define RETURN_LIMIT_EXCEEDED(tracker, ...) return tracker->mem_limit_exceeded(__VA_ARGS__); +#define RETURN_IF_LIMIT_EXCEEDED(tracker, state, msg) \ + if (tracker->any_limit_exceeded()) RETURN_LIMIT_EXCEEDED(tracker, state, msg); +#define RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, msg) \ + if (state->instance_mem_tracker()->any_limit_exceeded()) \ + RETURN_LIMIT_EXCEEDED(state->instance_mem_tracker(), state, msg); + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_observe.cpp b/be/src/runtime/memory/mem_tracker_observe.cpp new file mode 100644 index 0000000000..f696df2f94 --- /dev/null +++ b/be/src/runtime/memory/mem_tracker_observe.cpp @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/mem_tracker_observe.h" + +#include <fmt/format.h> +#include <parallel_hashmap/phmap.h> + +#include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/thread_context.h" +#include "util/pretty_printer.h" + +namespace doris { + +using TemporaryTrackersMap = phmap::parallel_flat_hash_map< + std::string, MemTrackerObserve*, phmap::priv::hash_default_hash<std::string>, + phmap::priv::hash_default_eq<std::string>, + std::allocator<std::pair<const std::string, MemTrackerObserve*>>, 12, std::mutex>; + +static TemporaryTrackersMap _temporary_mem_trackers; + +MemTrackerObserve* MemTrackerObserve::create_tracker(const std::string& label, + RuntimeProfile* profile) { + STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); + MemTrackerLimiter* parent = tls_ctx()->_thread_mem_tracker_mgr->limiter_mem_tracker(); + DCHECK(parent); + std::string parent_label = parent->label(); + std::string reset_label; + if (parent_label.find_first_of("#") != parent_label.npos) { + reset_label = fmt::format("[Observe]-{}#{}", label, + parent_label.substr(parent_label.find_first_of("#"), -1)); + } else { + reset_label = fmt::format("[Observe]-{}", label); + } + MemTrackerObserve* tracker(new MemTrackerObserve(reset_label, parent, profile)); + parent->add_child_tracker(tracker); + tracker->set_is_observed(); + return tracker; +} + +MemTrackerObserve::~MemTrackerObserve() { + if (parent()) { + parent()->remove_child_tracker(this); + } +} + +// Count the memory in the scope to a temporary tracker with the specified label name. +// This is very useful when debugging. You can find the position where the tracker statistics are +// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots. +// TODO(zxy) track specifies the memory for each line in the code segment, instead of manually adding +// a switch temporary tracker to each line. Maybe there are open source tools to do this? +MemTrackerObserve* MemTrackerObserve::get_temporary_mem_tracker(const std::string& label) { + // First time this label registered, make a new object, otherwise do nothing. + // Avoid using locks to resolve erase conflicts. + _temporary_mem_trackers.try_emplace_l( + label, [](MemTrackerObserve*) {}, + MemTrackerObserve::create_tracker(fmt::format("[Temporary]-{}", label))); + return _temporary_mem_trackers[label]; +} + +std::string MemTrackerObserve::log_usage(int64_t* logged_consumption) { + // Make sure the consumption is up to date. + int64_t curr_consumption = consumption(); + int64_t peak_consumption = _consumption->value(); + if (logged_consumption != nullptr) *logged_consumption = curr_consumption; + if (curr_consumption == 0) return ""; + std::string detail = "MemTracker log_usage Label: {}, Total: {}, Peak: {}"; + detail = fmt::format(detail, _label, PrettyPrinter::print(curr_consumption, TUnit::BYTES), + PrettyPrinter::print(peak_consumption, TUnit::BYTES)); + return detail; +} + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_observe.h b/be/src/runtime/memory/mem_tracker_observe.h new file mode 100644 index 0000000000..3213319207 --- /dev/null +++ b/be/src/runtime/memory/mem_tracker_observe.h @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/memory/mem_tracker_base.h" + +namespace doris { + +class MemTrackerLimiter; + +// Used to manually track memory usage at specified locations, including all exec node trackers. +// +// There is no parent-child relationship between MemTrackerObserves. Both fathers are fragment instance trakcers, +// but their consumption will not consume fragment instance trakcers synchronously. Therefore, errors in statistics +// will not affect the memory tracking and restrictions of processes and Query. +class MemTrackerObserve final : public MemTrackerBase { +public: + // Creates and adds the tracker to the tree + static MemTrackerObserve* create_tracker(const std::string& label, + RuntimeProfile* profile = nullptr); + + ~MemTrackerObserve(); + + // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get. + // Temporary trackers are not automatically destructed, which is usually used for debugging. + static MemTrackerObserve* get_temporary_mem_tracker(const std::string& label); + +public: + void consume(int64_t bytes); + + void release(int64_t bytes) { consume(-bytes); } + + static void batch_consume(int64_t bytes, const std::vector<MemTrackerObserve*>& trackers) { + for (auto& tracker : trackers) { + tracker->consume(bytes); + } + } + + // Transfer 'bytes' of consumption from this tracker to 'dst'. + void transfer_to(MemTrackerObserve* dst, int64_t bytes); + + bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < consumption(); } + + std::string log_usage(int64_t* logged_consumption = nullptr); + + std::string debug_string() { + std::stringstream msg; + msg << "label: " << _label << "; " + << "consumption: " << _consumption->current_value() << "; " + << "parent is null: " << ((_parent == nullptr) ? "true" : "false") << "; "; + return msg.str(); + } + + // Iterator into parent_->_child_observe_trackers for this object. Stored to have O(1) remove. + std::list<MemTrackerObserve*>::iterator _child_tracker_it; + +private: + MemTrackerObserve(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile) + : MemTrackerBase(label, parent, profile) {} +}; + +inline void MemTrackerObserve::consume(int64_t bytes) { + if (bytes == 0) { + return; + } else { + _consumption->add(bytes); + } +} + +inline void MemTrackerObserve::transfer_to(MemTrackerObserve* dst, int64_t bytes) { + if (id() == dst->id()) return; + release(bytes); + dst->consume(bytes); +} + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp new file mode 100644 index 0000000000..d643acdc4b --- /dev/null +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/mem_tracker_task_pool.h" + +#include "common/config.h" +#include "runtime/exec_env.h" +#include "util/pretty_printer.h" + +namespace doris { + +MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id, + int64_t mem_limit, + const std::string& label, + MemTrackerLimiter* parent) { + DCHECK(!task_id.empty()); + // First time this task_id registered, make a new object, otherwise do nothing. + // Combine create_tracker and emplace into one operation to avoid the use of locks + // Name for task MemTrackers. '$0' is replaced with the task id. + _task_mem_trackers.try_emplace_l( + task_id, [](MemTrackerLimiter*) {}, + MemTrackerLimiter::create_tracker(mem_limit, label, parent)); + return get_task_mem_tracker(task_id); +} + +MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id, + int64_t mem_limit) { + VLOG_FILE << "Register Query memory tracker, query id: " << query_id + << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); + return register_task_mem_tracker_impl(query_id, mem_limit, + fmt::format("Query#queryId={}", query_id), + ExecEnv::GetInstance()->query_pool_mem_tracker()); +} + +MemTrackerLimiter* MemTrackerTaskPool::register_load_mem_tracker(const std::string& load_id, + int64_t mem_limit) { + // In load, the query id of the fragment is executed, which is the same as the load id of the load channel. + VLOG_FILE << "Register Load memory tracker, load id: " << load_id + << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); + return register_task_mem_tracker_impl(load_id, mem_limit, + fmt::format("Load#loadId={}", load_id), + ExecEnv::GetInstance()->load_pool_mem_tracker()); +} + +MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& task_id) { + DCHECK(!task_id.empty()); + MemTrackerLimiter* tracker = nullptr; + // Avoid using locks to resolve erase conflicts + _task_mem_trackers.if_contains(task_id, [&tracker](MemTrackerLimiter* v) { tracker = v; }); + return tracker; +} + +void MemTrackerTaskPool::logout_task_mem_tracker() { + std::vector<std::string> expired_tasks; + for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) { + if (!it->second) { + // https://github.com/apache/incubator-doris/issues/10006 + expired_tasks.emplace_back(it->first); + } else if (it->second->is_leaf() == true && it->second->peak_consumption() > 0) { + // No RuntimeState uses this task MemTracker, it is only referenced by this map, + // and tracker was not created soon, delete it. + if (config::memory_leak_detection && it->second->consumption() != 0) { + // If consumption is not equal to 0 before query mem tracker is destructed, + // there are two possibilities in theory. + // 1. A memory leak occurs. + // 2. Some of the memory consumed/released on the query mem tracker is actually released/consume on + // other trackers such as the process mem tracker, and there is no manual transfer between the two trackers. + // + // The second case should be eliminated in theory, but it has not been done so far, so the query memory leak + // cannot be located, and the value of the query pool mem tracker statistics will be inaccurate. + LOG(WARNING) << "Task memory tracker memory leak:" << it->second->debug_string(); + } + // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers, + // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is, + // the negative number of the current value of consume. + it->second->parent()->consume_local(-it->second->consumption(), + MemTrackerLimiter::get_process_tracker()); + expired_tasks.emplace_back(it->first); + } else { + // Log limit exceeded query tracker. + if (it->second->limit_exceeded()) { + it->second->mem_limit_exceeded( + nullptr, + fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first), + 0, Status::OK()); + } + } + } + for (auto tid : expired_tasks) { + if (!_task_mem_trackers[tid]) { + _task_mem_trackers.erase(tid); + VLOG_FILE << "Deregister null task mem tracker, task id: " << tid; + } else { + delete _task_mem_trackers[tid]; + _task_mem_trackers.erase(tid); + VLOG_FILE << "Deregister not used task mem tracker, task id: " << tid; + } + } +} + +// TODO(zxy) More observable methods +// /// Logs the usage of 'limit' number of queries based on maximum total memory +// /// consumption. +// std::string MemTracker::LogTopNQueries(int limit) { +// if (limit == 0) return ""; +// priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>, +// std::greater<pair<int64_t, string>>> +// min_pq; +// GetTopNQueries(min_pq, limit); +// std::vector<string> usage_strings(min_pq.size()); +// while (!min_pq.empty()) { +// usage_strings.push_back(min_pq.top().second); +// min_pq.pop(); +// } +// std::reverse(usage_strings.begin(), usage_strings.end()); +// return join(usage_strings, "\n"); +// } + +// /// Helper function for LogTopNQueries that iterates through the MemTracker hierarchy +// /// and populates 'min_pq' with 'limit' number of elements (that contain state related +// /// to query MemTrackers) based on maximum total memory consumption. +// void MemTracker::GetTopNQueries( +// priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>, +// greater<pair<int64_t, string>>>& min_pq, +// int limit) { +// list<weak_ptr<MemTracker>> children; +// { +// lock_guard<SpinLock> l(child_trackers_lock_); +// children = child_trackers_; +// } +// for (const auto& child_weak : children) { +// shared_ptr<MemTracker> child = child_weak.lock(); +// if (child) { +// child->GetTopNQueries(min_pq, limit); +// } +// } +// } + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h new file mode 100644 index 0000000000..ae7d82caf4 --- /dev/null +++ b/be/src/runtime/memory/mem_tracker_task_pool.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <parallel_hashmap/phmap.h> + +#include "runtime/memory/mem_tracker_limiter.h" + +namespace doris { + +using TaskTrackersMap = phmap::parallel_flat_hash_map< + std::string, MemTrackerLimiter*, phmap::priv::hash_default_hash<std::string>, + phmap::priv::hash_default_eq<std::string>, + std::allocator<std::pair<const std::string, MemTrackerLimiter*>>, 12, std::mutex>; + +// Global task pool for query MemTrackers. Owned by ExecEnv. +class MemTrackerTaskPool { +public: + // Construct a MemTracker object for 'task_id' with 'mem_limit' as the memory limit. + // The MemTracker is a child of the pool MemTracker, Calling this with the same + // 'task_id' will return the same MemTracker object. This is used to track the local + // memory usage of all tasks executing. The first time this is called for a task, + // a new MemTracker object is created with the pool tracker as its parent. + // Newly created trackers will always have a limit of -1. + MemTrackerLimiter* register_task_mem_tracker_impl(const std::string& task_id, int64_t mem_limit, + const std::string& label, + MemTrackerLimiter* parent); + MemTrackerLimiter* register_query_mem_tracker(const std::string& query_id, int64_t mem_limit); + MemTrackerLimiter* register_load_mem_tracker(const std::string& load_id, int64_t mem_limit); + + MemTrackerLimiter* get_task_mem_tracker(const std::string& task_id); + + // Remove the mem tracker that has ended the query. + void logout_task_mem_tracker(); + +private: + // All per-task MemTracker objects. + // The life cycle of task memtracker in the process is the same as task runtime state, + // MemTrackers will be removed from this map after query finish or cancel. + TaskTrackersMap _task_mem_trackers; +}; + +} // namespace doris \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org