This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 487fd3c1d7 [bugfix](memtracker)fix exceed memory limit log (#11485) 487fd3c1d7 is described below commit 487fd3c1d715477c3d3f321b6e8ce0addc30bc12 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Aug 4 10:22:20 2022 +0800 [bugfix](memtracker)fix exceed memory limit log (#11485) --- be/src/exec/cross_join_node.cpp | 1 - be/src/exec/except_node.cpp | 1 - be/src/exec/hash_join_node.cpp | 2 - be/src/exec/intersect_node.cpp | 1 - be/src/exec/set_operation_node.cpp | 1 - be/src/runtime/memory/mem_tracker_limiter.cpp | 56 ++++++++++-------------- be/src/runtime/memory/mem_tracker_limiter.h | 9 ++-- be/src/runtime/memory/mem_tracker_task_pool.cpp | 8 ++-- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 29 ++++++------ be/src/runtime/memory/thread_mem_tracker_mgr.h | 41 +++-------------- be/src/runtime/plan_fragment_executor.cpp | 2 +- be/src/runtime/thread_context.cpp | 15 ------- be/src/runtime/thread_context.h | 40 +++++------------ be/src/vec/exec/join/vhash_join_node.cpp | 1 - be/src/vec/exec/vaggregation_node.cpp | 2 - be/src/vec/exec/vcross_join_node.cpp | 1 - be/src/vec/exec/vset_operation_node.cpp | 1 - 17 files changed, 64 insertions(+), 147 deletions(-) diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp index b26f4f2cd4..0743fe04c4 100644 --- a/be/src/exec/cross_join_node.cpp +++ b/be/src/exec/cross_join_node.cpp @@ -52,7 +52,6 @@ Status CrossJoinNode::close(RuntimeState* state) { Status CrossJoinNode::construct_build_side(RuntimeState* state) { // Do a full scan of child(1) and store all build row batches. RETURN_IF_ERROR(child(1)->open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Cross join, while getting next from child 1"); while (true) { RowBatch* batch = _build_batch_pool->add( diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 58a9b67f2f..dd92859671 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -40,7 +40,6 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExceptNode::open(RuntimeState* state) { RETURN_IF_ERROR(SetOperationNode::open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Except Node, while probing the hash table."); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 7c572ff95d..02f52d2124 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -186,7 +186,6 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) { // The hash join node needs to keep in memory all build tuples, including the tuple // row ptrs. The row ptrs are copied into the hash table's internal structure so they // don't need to be stored in the _build_pool. - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash table."); RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker().get()); RETURN_IF_ERROR(child(1)->open(state)); @@ -304,7 +303,6 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo // In most cases, no additional memory overhead will be applied for at this stage, // but if the expression calculation in this node needs to apply for additional memory, // it may cause the memory to exceed the limit. - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while execute get_next."); SCOPED_TIMER(_runtime_profile->total_time_counter()); if (reached_limit()) { diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index 9f5eb3ece1..a79810734d 100644 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -43,7 +43,6 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { // 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table // repeat [2] this for all the rest child Status IntersectNode::open(RuntimeState* state) { - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Intersect Node, while probing the hash table."); RETURN_IF_ERROR(SetOperationNode::open(state)); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index e0bfbb199c..7a9d3a334f 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -138,7 +138,6 @@ Status SetOperationNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("SetOperation, while constructing the hash table."); RETURN_IF_CANCELLED(state); // open result expr lists. for (const std::vector<ExprContext*>& exprs : _child_expr_lists) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 722495aee7..f5c825155b 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -143,7 +143,8 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) { Status MemTrackerLimiter::try_gc_memory(int64_t bytes) { if (UNLIKELY(gc_memory(_limit - bytes))) { return Status::MemoryLimitExceeded( - fmt::format("label={}, limit={}, used={}, failed consume size={}", label(), _limit, _consumption->current_value(), bytes)); + fmt::format("label={}, limit={}, used={}, failed consume size={}", label(), _limit, + _consumption->current_value(), bytes)); } VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes << " consumption=" << _consumption->current_value() << " limit=" << _limit; @@ -196,9 +197,9 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge std::vector<NewMemTracker::Snapshot> snapshots; NewMemTracker::make_group_snapshot(&snapshots, 0, _group_num, _label); for (const auto& snapshot : snapshots) { - child_trackers_usage += NewMemTracker::log_usage(snapshot); + child_trackers_usage += "\n " + NewMemTracker::log_usage(snapshot); } - if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage; + if (!child_trackers_usage.empty()) detail += child_trackers_usage; return detail; } @@ -216,47 +217,36 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, return join(usage_strings, "\n"); } -Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size, Status failed_try_consume_rt, const TUniqueId& fragment_instance_id) { +Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); - std::string detail; - if (!failed_try_consume_rt) { - DCHECK(failed_try_consume_rt.is_mem_limit_exceeded()); - detail += "in {} mem tracker consume log=<{}>."; - // failed_try_consume_rt.to_string() starts with `Memory exceed limit: ` - detail = fmt::format(detail, msg, failed_try_consume_rt.to_string()); - } else { - detail += "in {} mem tracker consume log=<label={}, limit={}, used={}, failed consume size={}>."; - detail = fmt::format(detail, msg, _label, _consumption->current_value(), _limit, - PrettyPrinter::print(failed_consume_size, TUnit::BYTES)); - } - detail += " fragment={}, backend={} free memory left={}."; - detail = fmt::format( - detail, print_id(fragment_instance_id), - BackendOptions::get_localhost(), + std::string detail = fmt::format( + "{}, failed mem consume:<consume_size={}, mem_limit={}, mem_used={}, tracker_label={}, " + "in backend={} free memory left={}. details mem usage see be.INFO.", + msg, PrettyPrinter::print(failed_consume_size, TUnit::BYTES), _limit, + _consumption->current_value(), _label, BackendOptions::get_localhost(), PrettyPrinter::print(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity(), TUnit::BYTES)); - detail += " If this is a query, can change the limit by session variable exec_mem_limit."; Status status = Status::MemoryLimitExceeded(detail); - detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace()); // only print the tracker log_usage in be log. - if (ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() < failed_consume_size) { - // Dumping the process NewMemTracker is expensive. Limiting the recursive depth to two - // levels limits the level of detail to a one-line summary for each query NewMemTracker. - detail += "\n" + ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2); - } else { - detail += "\n" + log_usage(); + if (_print_log_usage) { + if (ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() < failed_consume_size) { + // Dumping the process MemTracker is expensive. Limiting the recursive depth to two + // levels limits the level of detail to a one-line summary for each query MemTracker. + detail += "\n" + ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2); + } else { + detail += "\n" + log_usage(); + } + detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace()); + LOG(WARNING) << detail; + _print_log_usage = false; } - - LOG(WARNING) << detail; return status; } Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& msg, - int64_t failed_alloc_size, - Status failed_try_consume_rt) { - Status rt = mem_limit_exceeded(msg, failed_alloc_size, failed_try_consume_rt, - state->fragment_instance_id()); + int64_t failed_alloc_size) { + Status rt = mem_limit_exceeded(msg, failed_alloc_size); state->log_error(rt.to_string()); return rt; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 104a1c20ca..0c003d6f3a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -143,12 +143,9 @@ public: // If 'failed_allocation_size' is greater than zero, logs the allocation size. If // 'failed_allocation_size' is zero, nothing about the allocation size is logged. // If 'state' is non-nullptr, logs the error to 'state'. - Status mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size, - Status failed_try_consume_rt, - const TUniqueId& fragment_instance_id = TUniqueId()); + Status mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size); Status mem_limit_exceeded(RuntimeState* state, const std::string& msg = std::string(), - int64_t failed_consume_size = -1, - Status failed_try_consume_rt = Status::OK()); + int64_t failed_consume_size = -1); std::string debug_string() { std::stringstream msg; @@ -208,6 +205,8 @@ private: // The number of child trackers that have been added. std::atomic_size_t _had_child_count = 0; + bool _print_log_usage = true; + // Lock to protect gc_memory(). This prevents many GCs from occurring at once. std::mutex _gc_lock; // Functions to call after the limit is reached to free memory. diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 3c775db5ec..b63b25df17 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -31,15 +31,13 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_task_mem_tracker // Combine new tracker and emplace into one operation to avoid the use of locks // Name for task MemTrackers. '$0' is replaced with the task id. std::shared_ptr<MemTrackerLimiter> tracker; - bool new_emplace = _task_mem_trackers.lazy_emplace_l( + bool new_emplace = _task_mem_trackers.try_emplace_l( task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) { tracker = v; }, - [&](const auto& ctor) { - tracker = std::make_shared<MemTrackerLimiter>(mem_limit, label, parent); - ctor(task_id, tracker); - }); + std::make_shared<MemTrackerLimiter>(mem_limit, label, parent)); if (new_emplace) { LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); + return get_task_mem_tracker(task_id); } return tracker; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index ab2d1fc1c0..aea89acb3b 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -25,14 +25,12 @@ namespace doris { void ThreadMemTrackerMgr::attach_limiter_tracker( - const std::string& cancel_msg, const std::string& task_id, - const TUniqueId& fragment_instance_id, + const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { DCHECK(mem_tracker); flush_untracked_mem<false>(); _task_id = task_id; _fragment_instance_id = fragment_instance_id; - _exceed_cb.cancel_msg = cancel_msg; _limiter_tracker = mem_tracker; } @@ -40,7 +38,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() { flush_untracked_mem<false>(); _task_id = ""; _fragment_instance_id = TUniqueId(); - _exceed_cb.cancel_msg = ""; _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker(); } @@ -52,18 +49,22 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details } } -void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size, Status failed_try_consume_rt) { - if (_exceed_cb.cb_func != nullptr) { - _exceed_cb.cb_func(); +void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size) { + if (_cb_func != nullptr) { + _cb_func(); } - if (is_attach_task()) { - if (_exceed_cb.cancel_task) { - auto st = _limiter_tracker->mem_limit_exceeded( - fmt::format("query mem limit exceeded and cancel it, {}", _exceed_cb.cancel_msg), - failed_consume_size, failed_try_consume_rt, _fragment_instance_id); - exceeded_cancel_task(st.to_string()); - _exceed_cb.cancel_task = false; // Make sure it will only be canceled once + if (is_attach_query()) { + std::string cancel_msg; + if (!_consumer_tracker_stack.empty()) { + cancel_msg = fmt::format( + "exec node:<name={}>, can change the limit by `set exec_mem_limit=xxx`", + _consumer_tracker_stack[-1]->label()); + } else { + cancel_msg = "exec node:unknown, can change the limit by `set exec_mem_limit=xxx`"; } + auto st = _limiter_tracker->mem_limit_exceeded(cancel_msg, failed_consume_size); + exceeded_cancel_task(st.to_string()); + _check_limit = false; // Make sure it will only be canceled once } } } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 13eed946ba..2f5bfd2d9c 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -32,22 +32,6 @@ extern bthread_key_t btls_key; static const bthread_key_t EMPTY_BTLS_KEY = {0, 0}; using ExceedCallBack = void (*)(); -struct MemExceedCallBackInfo { - std::string cancel_msg; - bool cancel_task; // Whether to cancel the task when the current tracker exceeds the limit. - ExceedCallBack cb_func; - - MemExceedCallBackInfo() { init(); } - - MemExceedCallBackInfo(const std::string& cancel_msg, bool cancel_task, ExceedCallBack cb_func) - : cancel_msg(cancel_msg), cancel_task(cancel_task), cb_func(cb_func) {} - - void init() { - cancel_msg = ""; - cancel_task = true; - cb_func = nullptr; - } -}; // TCMalloc new/delete Hook is counted in the memory_tracker of the current thread. // @@ -61,7 +45,6 @@ public: ~ThreadMemTrackerMgr() { flush_untracked_mem<false>(); - _exceed_cb.init(); DCHECK(_consumer_tracker_stack.empty()); } @@ -75,8 +58,7 @@ public: void init(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker - void attach_limiter_tracker(const std::string& cancel_msg, const std::string& task_id, - const TUniqueId& fragment_instance_id, + void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr<MemTrackerLimiter>& mem_tracker); void detach_limiter_tracker(); @@ -86,16 +68,7 @@ public: void push_consumer_tracker(NewMemTracker* mem_tracker); void pop_consumer_tracker(); - MemExceedCallBackInfo update_exceed_call_back(const std::string& cancel_msg, bool cancel_task, - ExceedCallBack cb_func) { - _temp_exceed_cb = _exceed_cb; - _exceed_cb.cancel_msg = cancel_msg; - _exceed_cb.cancel_task = cancel_task; - _exceed_cb.cb_func = cb_func; - return _temp_exceed_cb; - } - - void update_exceed_call_back(const MemExceedCallBackInfo& exceed_cb) { _exceed_cb = exceed_cb; } + void set_exceed_call_back(ExceedCallBack cb_func) { _cb_func = cb_func; } // Note that, If call the memory allocation operation in TCMalloc new/delete Hook, // such as calling LOG/iostream/sstream/stringstream/etc. related methods, @@ -114,7 +87,7 @@ public: template <bool CheckLimit> void flush_untracked_mem(); - bool is_attach_task() { return _task_id != ""; } + bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return _limiter_tracker; } @@ -138,7 +111,7 @@ private: // If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled void exceeded_cancel_task(const std::string& cancel_details); - void exceeded(int64_t failed_consume_size, Status failed_try_consume_rt); + void exceeded(int64_t failed_consume_size); private: // Cache untracked mem, only update to _untracked_mems when switching mem tracker. @@ -155,14 +128,12 @@ private: bool _check_attach = true; std::string _task_id; TUniqueId _fragment_instance_id; - MemExceedCallBackInfo _exceed_cb; - MemExceedCallBackInfo _temp_exceed_cb; + ExceedCallBack _cb_func = nullptr; }; inline void ThreadMemTrackerMgr::init() { DCHECK(_consumer_tracker_stack.empty()); _task_id = ""; - _exceed_cb.init(); _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker(); _check_limit = true; } @@ -219,7 +190,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. _limiter_tracker->consume(_untracked_mem); - exceeded(_untracked_mem, st); + exceeded(_untracked_mem); } } else { _limiter_tracker->consume(_untracked_mem); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index fcce7fa43c..8f8cdd62b7 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -271,7 +271,7 @@ Status PlanFragmentExecutor::open() { if (_cancel_reason == PPlanFragmentCancelReason::CALL_RPC_ERROR) { status = Status::RuntimeError(_cancel_msg); } else if (_cancel_reason == PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED) { - status = Status::MemoryAllocFailed(_cancel_msg); + status = Status::MemoryLimitExceeded(_cancel_msg); } } diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 6ae9c644a0..e62074ae62 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -72,21 +72,6 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { #endif // USE_MEM_TRACKER } -UpdateMemExceedCallBack::UpdateMemExceedCallBack(const std::string& cancel_msg, bool cancel_task, - ExceedCallBack cb_func) { -#ifdef USE_MEM_TRACKER - DCHECK(cancel_msg != std::string()); - _old_cb = thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back( - cancel_msg, cancel_task, cb_func); -#endif -} - -UpdateMemExceedCallBack::~UpdateMemExceedCallBack() { -#ifdef USE_MEM_TRACKER - thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back(_old_cb); -#endif // USE_MEM_TRACKER -} - SwitchBthread::SwitchBthread() { #ifdef USE_MEM_TRACKER _bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 254b465b00..d657e3b063 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -33,9 +33,11 @@ #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) -#define SCOPED_UPDATE_MEM_EXCEED_CALL_BACK(cancel_msg, ...) \ - auto VARNAME_LINENUM(update_exceed_cb) = \ - doris::UpdateMemExceedCallBack(cancel_msg, ##__VA_ARGS__) +// Attach to task when thread starts +#define SCOPED_ATTACH_TASK(arg1, ...) \ + auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) + +#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() namespace doris { @@ -104,8 +106,8 @@ public: BRPC = 5 // to be added ... }; - inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", - "STORAGE"}; + inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", + "COMPACTION", "STORAGE", "BRPC"}; public: ThreadContext() { @@ -139,8 +141,7 @@ public: _type = type; _task_id = task_id; _fragment_instance_id = fragment_instance_id; - _thread_mem_tracker_mgr->attach_limiter_tracker(TaskTypeStr[_type], task_id, - fragment_instance_id, mem_tracker); + _thread_mem_tracker_mgr->attach_limiter_tracker(task_id, fragment_instance_id, mem_tracker); } void detach_task() { @@ -225,19 +226,6 @@ public: ~AddThreadMemTrackerConsumer(); }; -class UpdateMemExceedCallBack { -public: - explicit UpdateMemExceedCallBack(const std::string& cancel_msg, bool cancel_task = true, - ExceedCallBack cb_func = nullptr); - - ~UpdateMemExceedCallBack(); - -private: -#ifdef USE_MEM_TRACKER - MemExceedCallBackInfo _old_cb; -#endif -}; - class SwitchBthread { public: explicit SwitchBthread(); @@ -261,15 +249,8 @@ public: } }; -#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() - -// Attach to task when thread starts -#define SCOPED_ATTACH_TASK(arg1, ...) \ - auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) - #define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \ auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit() - #define CONSUME_THREAD_MEM_TRACKER(size) \ doris::thread_context()->_thread_mem_tracker_mgr->consume(size) #define RELEASE_THREAD_MEM_TRACKER(size) \ @@ -278,5 +259,8 @@ public: doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size, tracker) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size, tracker) - +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ + return doris::thread_context() \ + ->_thread_mem_tracker_mgr->limiter_mem_tracker() \ + ->mem_limit_exceeded(state, msg, ##__VA_ARGS__); } // namespace doris diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index e6aa53b1da..98ffa85654 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -1077,7 +1077,6 @@ Status HashJoinNode::open(RuntimeState* state) { Status HashJoinNode::_hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(1)->open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash table."); SCOPED_TIMER(_build_timer); MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors()); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 5907d3c802..5acfc1ff6b 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -351,7 +351,6 @@ Status AggregationNode::prepare(RuntimeState* state) { Status AggregationNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute open."); RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state)); @@ -386,7 +385,6 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute get_next."); if (_is_streaming_preagg) { bool child_eos = false; diff --git a/be/src/vec/exec/vcross_join_node.cpp b/be/src/vec/exec/vcross_join_node.cpp index 4c79355326..a8564a6310 100644 --- a/be/src/vec/exec/vcross_join_node.cpp +++ b/be/src/vec/exec/vcross_join_node.cpp @@ -52,7 +52,6 @@ Status VCrossJoinNode::close(RuntimeState* state) { Status VCrossJoinNode::construct_build_side(RuntimeState* state) { // Do a full scan of child(1) and store all build row batches. RETURN_IF_ERROR(child(1)->open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Cross join, while getting next from the child 1"); bool eos = false; while (true) { diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 44c1638463..a20b88b767 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -227,7 +227,6 @@ void VSetOperationNode::hash_table_init() { //build a hash table from child(0) Status VSetOperationNode::hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(0)->open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Set Operation Node, while constructing the hash table"); Block block; MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org