This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 487fd3c1d7 [bugfix](memtracker)fix exceed memory limit log (#11485)
487fd3c1d7 is described below

commit 487fd3c1d715477c3d3f321b6e8ce0addc30bc12
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Thu Aug 4 10:22:20 2022 +0800

    [bugfix](memtracker)fix exceed memory limit log (#11485)
---
 be/src/exec/cross_join_node.cpp                  |  1 -
 be/src/exec/except_node.cpp                      |  1 -
 be/src/exec/hash_join_node.cpp                   |  2 -
 be/src/exec/intersect_node.cpp                   |  1 -
 be/src/exec/set_operation_node.cpp               |  1 -
 be/src/runtime/memory/mem_tracker_limiter.cpp    | 56 ++++++++++--------------
 be/src/runtime/memory/mem_tracker_limiter.h      |  9 ++--
 be/src/runtime/memory/mem_tracker_task_pool.cpp  |  8 ++--
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 29 ++++++------
 be/src/runtime/memory/thread_mem_tracker_mgr.h   | 41 +++--------------
 be/src/runtime/plan_fragment_executor.cpp        |  2 +-
 be/src/runtime/thread_context.cpp                | 15 -------
 be/src/runtime/thread_context.h                  | 40 +++++------------
 be/src/vec/exec/join/vhash_join_node.cpp         |  1 -
 be/src/vec/exec/vaggregation_node.cpp            |  2 -
 be/src/vec/exec/vcross_join_node.cpp             |  1 -
 be/src/vec/exec/vset_operation_node.cpp          |  1 -
 17 files changed, 64 insertions(+), 147 deletions(-)

diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp
index b26f4f2cd4..0743fe04c4 100644
--- a/be/src/exec/cross_join_node.cpp
+++ b/be/src/exec/cross_join_node.cpp
@@ -52,7 +52,6 @@ Status CrossJoinNode::close(RuntimeState* state) {
 Status CrossJoinNode::construct_build_side(RuntimeState* state) {
     // Do a full scan of child(1) and store all build row batches.
     RETURN_IF_ERROR(child(1)->open(state));
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Cross join, while getting next from 
child 1");
 
     while (true) {
         RowBatch* batch = _build_batch_pool->add(
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 58a9b67f2f..dd92859671 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -40,7 +40,6 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
 
 Status ExceptNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(SetOperationNode::open(state));
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Except Node, while probing the hash 
table.");
     // if a table is empty, the result must be empty
     if (_hash_tbl->size() == 0) {
         _hash_tbl_iterator = _hash_tbl->begin();
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 7c572ff95d..02f52d2124 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -186,7 +186,6 @@ Status HashJoinNode::construct_hash_table(RuntimeState* 
state) {
     // The hash join node needs to keep in memory all build tuples, including 
the tuple
     // row ptrs.  The row ptrs are copied into the hash table's internal 
structure so they
     // don't need to be stored in the _build_pool.
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash 
table.");
     RowBatch build_batch(child(1)->row_desc(), state->batch_size(), 
mem_tracker().get());
     RETURN_IF_ERROR(child(1)->open(state));
 
@@ -304,7 +303,6 @@ Status HashJoinNode::get_next(RuntimeState* state, 
RowBatch* out_batch, bool* eo
     // In most cases, no additional memory overhead will be applied for at 
this stage,
     // but if the expression calculation in this node needs to apply for 
additional memory,
     // it may cause the memory to exceed the limit.
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while execute get_next.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     if (reached_limit()) {
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index 9f5eb3ece1..a79810734d 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -43,7 +43,6 @@ Status IntersectNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 // 2 probe with child(1), then filter the hash table and find the matched 
item, use them to rebuild a hash table
 // repeat [2] this for all the rest child
 Status IntersectNode::open(RuntimeState* state) {
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Intersect Node, while probing the hash 
table.");
     RETURN_IF_ERROR(SetOperationNode::open(state));
     // if a table is empty, the result must be empty
     if (_hash_tbl->size() == 0) {
diff --git a/be/src/exec/set_operation_node.cpp 
b/be/src/exec/set_operation_node.cpp
index e0bfbb199c..7a9d3a334f 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -138,7 +138,6 @@ Status SetOperationNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("SetOperation, while constructing the 
hash table.");
     RETURN_IF_CANCELLED(state);
     // open result expr lists.
     for (const std::vector<ExprContext*>& exprs : _child_expr_lists) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 722495aee7..f5c825155b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -143,7 +143,8 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
 Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
     if (UNLIKELY(gc_memory(_limit - bytes))) {
         return Status::MemoryLimitExceeded(
-                fmt::format("label={}, limit={}, used={}, failed consume 
size={}", label(), _limit, _consumption->current_value(), bytes));
+                fmt::format("label={}, limit={}, used={}, failed consume 
size={}", label(), _limit,
+                            _consumption->current_value(), bytes));
     }
     VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
                 << " consumption=" << _consumption->current_value() << " 
limit=" << _limit;
@@ -196,9 +197,9 @@ std::string MemTrackerLimiter::log_usage(int 
max_recursive_depth, int64_t* logge
     std::vector<NewMemTracker::Snapshot> snapshots;
     NewMemTracker::make_group_snapshot(&snapshots, 0, _group_num, _label);
     for (const auto& snapshot : snapshots) {
-        child_trackers_usage += NewMemTracker::log_usage(snapshot);
+        child_trackers_usage += "\n    " + NewMemTracker::log_usage(snapshot);
     }
-    if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage;
+    if (!child_trackers_usage.empty()) detail += child_trackers_usage;
     return detail;
 }
 
@@ -216,47 +217,36 @@ std::string MemTrackerLimiter::log_usage(int 
max_recursive_depth,
     return join(usage_strings, "\n");
 }
 
-Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t 
failed_consume_size, Status failed_try_consume_rt, const TUniqueId& 
fragment_instance_id) {
+Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t 
failed_consume_size) {
     STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
-    std::string detail;
-    if (!failed_try_consume_rt) {
-        DCHECK(failed_try_consume_rt.is_mem_limit_exceeded());
-        detail += "in {} mem tracker consume log=<{}>.";
-        // failed_try_consume_rt.to_string() starts with `Memory exceed limit: 
`
-        detail = fmt::format(detail, msg, failed_try_consume_rt.to_string());
-    } else {
-        detail += "in {} mem tracker consume log=<label={}, limit={}, used={}, 
failed consume size={}>.";
-        detail = fmt::format(detail, msg, _label, 
_consumption->current_value(), _limit,
-                             PrettyPrinter::print(failed_consume_size, 
TUnit::BYTES));
-    }
-    detail += " fragment={}, backend={} free memory left={}.";
-    detail = fmt::format(
-            detail, print_id(fragment_instance_id), 
-            BackendOptions::get_localhost(),
+    std::string detail = fmt::format(
+            "{}, failed mem consume:<consume_size={}, mem_limit={}, 
mem_used={}, tracker_label={}, "
+            "in backend={} free memory left={}. details mem usage see 
be.INFO.",
+            msg, PrettyPrinter::print(failed_consume_size, TUnit::BYTES), 
_limit,
+            _consumption->current_value(), _label, 
BackendOptions::get_localhost(),
             
PrettyPrinter::print(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity(),
                                  TUnit::BYTES));
-    detail += " If this is a query, can change the limit by session variable 
exec_mem_limit.";
     Status status = Status::MemoryLimitExceeded(detail);
 
-    detail += "\n" + 
boost::stacktrace::to_string(boost::stacktrace::stacktrace());
     // only print the tracker log_usage in be log.
-    if (ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() < 
failed_consume_size) {
-        // Dumping the process NewMemTracker is expensive. Limiting the 
recursive depth to two
-        // levels limits the level of detail to a one-line summary for each 
query NewMemTracker.
-        detail += "\n" + 
ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2);
-    } else {
-        detail += "\n" + log_usage();
+    if (_print_log_usage) {
+        if 
(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() < 
failed_consume_size) {
+            // Dumping the process MemTracker is expensive. Limiting the 
recursive depth to two
+            // levels limits the level of detail to a one-line summary for 
each query MemTracker.
+            detail += "\n" + 
ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2);
+        } else {
+            detail += "\n" + log_usage();
+        }
+        detail += "\n" + 
boost::stacktrace::to_string(boost::stacktrace::stacktrace());
+        LOG(WARNING) << detail;
+        _print_log_usage = false;
     }
-
-    LOG(WARNING) << detail;
     return status;
 }
 
 Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const 
std::string& msg,
-                              int64_t failed_alloc_size,
-                              Status failed_try_consume_rt) {
-    Status rt = mem_limit_exceeded(msg, failed_alloc_size, 
failed_try_consume_rt,
-                                    state->fragment_instance_id());
+                                             int64_t failed_alloc_size) {
+    Status rt = mem_limit_exceeded(msg, failed_alloc_size);
     state->log_error(rt.to_string());
     return rt;
 }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 104a1c20ca..0c003d6f3a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -143,12 +143,9 @@ public:
     // If 'failed_allocation_size' is greater than zero, logs the allocation 
size. If
     // 'failed_allocation_size' is zero, nothing about the allocation size is 
logged.
     // If 'state' is non-nullptr, logs the error to 'state'.
-    Status mem_limit_exceeded(const std::string& msg, int64_t 
failed_consume_size,
-                              Status failed_try_consume_rt,
-                              const TUniqueId& fragment_instance_id = 
TUniqueId());
+    Status mem_limit_exceeded(const std::string& msg, int64_t 
failed_consume_size);
     Status mem_limit_exceeded(RuntimeState* state, const std::string& msg = 
std::string(),
-                              int64_t failed_consume_size = -1,
-                              Status failed_try_consume_rt = Status::OK());
+                              int64_t failed_consume_size = -1);
 
     std::string debug_string() {
         std::stringstream msg;
@@ -208,6 +205,8 @@ private:
     // The number of child trackers that have been added.
     std::atomic_size_t _had_child_count = 0;
 
+    bool _print_log_usage = true;
+
     // Lock to protect gc_memory(). This prevents many GCs from occurring at 
once.
     std::mutex _gc_lock;
     // Functions to call after the limit is reached to free memory.
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp 
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 3c775db5ec..b63b25df17 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -31,15 +31,13 @@ std::shared_ptr<MemTrackerLimiter> 
MemTrackerTaskPool::register_task_mem_tracker
     // Combine new tracker and emplace into one operation to avoid the use of 
locks
     // Name for task MemTrackers. '$0' is replaced with the task id.
     std::shared_ptr<MemTrackerLimiter> tracker;
-    bool new_emplace = _task_mem_trackers.lazy_emplace_l(
+    bool new_emplace = _task_mem_trackers.try_emplace_l(
             task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) { 
tracker = v; },
-            [&](const auto& ctor) {
-                tracker = std::make_shared<MemTrackerLimiter>(mem_limit, 
label, parent);
-                ctor(task_id, tracker);
-            });
+            std::make_shared<MemTrackerLimiter>(mem_limit, label, parent));
     if (new_emplace) {
         LOG(INFO) << "Register query/load memory tracker, query/load id: " << 
task_id
                   << " limit: " << PrettyPrinter::print(mem_limit, 
TUnit::BYTES);
+        return get_task_mem_tracker(task_id);
     }
     return tracker;
 }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index ab2d1fc1c0..aea89acb3b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -25,14 +25,12 @@
 namespace doris {
 
 void ThreadMemTrackerMgr::attach_limiter_tracker(
-        const std::string& cancel_msg, const std::string& task_id,
-        const TUniqueId& fragment_instance_id,
+        const std::string& task_id, const TUniqueId& fragment_instance_id,
         const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
     DCHECK(mem_tracker);
     flush_untracked_mem<false>();
     _task_id = task_id;
     _fragment_instance_id = fragment_instance_id;
-    _exceed_cb.cancel_msg = cancel_msg;
     _limiter_tracker = mem_tracker;
 }
 
@@ -40,7 +38,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() {
     flush_untracked_mem<false>();
     _task_id = "";
     _fragment_instance_id = TUniqueId();
-    _exceed_cb.cancel_msg = "";
     _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker();
 }
 
@@ -52,18 +49,22 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const 
std::string& cancel_details
     }
 }
 
-void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size, Status 
failed_try_consume_rt) {
-    if (_exceed_cb.cb_func != nullptr) {
-        _exceed_cb.cb_func();
+void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size) {
+    if (_cb_func != nullptr) {
+        _cb_func();
     }
-    if (is_attach_task()) {
-        if (_exceed_cb.cancel_task) {
-            auto st = _limiter_tracker->mem_limit_exceeded(
-                    fmt::format("query mem limit exceeded and cancel it, {}", 
_exceed_cb.cancel_msg),
-                    failed_consume_size, failed_try_consume_rt, 
_fragment_instance_id);
-            exceeded_cancel_task(st.to_string());
-            _exceed_cb.cancel_task = false; // Make sure it will only be 
canceled once
+    if (is_attach_query()) {
+        std::string cancel_msg;
+        if (!_consumer_tracker_stack.empty()) {
+            cancel_msg = fmt::format(
+                    "exec node:<name={}>, can change the limit by `set 
exec_mem_limit=xxx`",
+                    _consumer_tracker_stack[-1]->label());
+        } else {
+            cancel_msg = "exec node:unknown, can change the limit by `set 
exec_mem_limit=xxx`";
         }
+        auto st = _limiter_tracker->mem_limit_exceeded(cancel_msg, 
failed_consume_size);
+        exceeded_cancel_task(st.to_string());
+        _check_limit = false; // Make sure it will only be canceled once
     }
 }
 } // namespace doris
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 13eed946ba..2f5bfd2d9c 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -32,22 +32,6 @@ extern bthread_key_t btls_key;
 static const bthread_key_t EMPTY_BTLS_KEY = {0, 0};
 
 using ExceedCallBack = void (*)();
-struct MemExceedCallBackInfo {
-    std::string cancel_msg;
-    bool cancel_task; // Whether to cancel the task when the current tracker 
exceeds the limit.
-    ExceedCallBack cb_func;
-
-    MemExceedCallBackInfo() { init(); }
-
-    MemExceedCallBackInfo(const std::string& cancel_msg, bool cancel_task, 
ExceedCallBack cb_func)
-            : cancel_msg(cancel_msg), cancel_task(cancel_task), 
cb_func(cb_func) {}
-
-    void init() {
-        cancel_msg = "";
-        cancel_task = true;
-        cb_func = nullptr;
-    }
-};
 
 // TCMalloc new/delete Hook is counted in the memory_tracker of the current 
thread.
 //
@@ -61,7 +45,6 @@ public:
 
     ~ThreadMemTrackerMgr() {
         flush_untracked_mem<false>();
-        _exceed_cb.init();
         DCHECK(_consumer_tracker_stack.empty());
     }
 
@@ -75,8 +58,7 @@ public:
     void init();
 
     // After attach, the current thread TCMalloc Hook starts to 
consume/release task mem_tracker
-    void attach_limiter_tracker(const std::string& cancel_msg, const 
std::string& task_id,
-                                const TUniqueId& fragment_instance_id,
+    void attach_limiter_tracker(const std::string& task_id, const TUniqueId& 
fragment_instance_id,
                                 const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker);
 
     void detach_limiter_tracker();
@@ -86,16 +68,7 @@ public:
     void push_consumer_tracker(NewMemTracker* mem_tracker);
     void pop_consumer_tracker();
 
-    MemExceedCallBackInfo update_exceed_call_back(const std::string& 
cancel_msg, bool cancel_task,
-                                                  ExceedCallBack cb_func) {
-        _temp_exceed_cb = _exceed_cb;
-        _exceed_cb.cancel_msg = cancel_msg;
-        _exceed_cb.cancel_task = cancel_task;
-        _exceed_cb.cb_func = cb_func;
-        return _temp_exceed_cb;
-    }
-
-    void update_exceed_call_back(const MemExceedCallBackInfo& exceed_cb) { 
_exceed_cb = exceed_cb; }
+    void set_exceed_call_back(ExceedCallBack cb_func) { _cb_func = cb_func; }
 
     // Note that, If call the memory allocation operation in TCMalloc 
new/delete Hook,
     // such as calling LOG/iostream/sstream/stringstream/etc. related methods,
@@ -114,7 +87,7 @@ public:
     template <bool CheckLimit>
     void flush_untracked_mem();
 
-    bool is_attach_task() { return _task_id != ""; }
+    bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
 
     std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return 
_limiter_tracker; }
 
@@ -138,7 +111,7 @@ private:
     // If tryConsume fails due to task mem tracker exceeding the limit, the 
task must be canceled
     void exceeded_cancel_task(const std::string& cancel_details);
 
-    void exceeded(int64_t failed_consume_size, Status failed_try_consume_rt);
+    void exceeded(int64_t failed_consume_size);
 
 private:
     // Cache untracked mem, only update to _untracked_mems when switching mem 
tracker.
@@ -155,14 +128,12 @@ private:
     bool _check_attach = true;
     std::string _task_id;
     TUniqueId _fragment_instance_id;
-    MemExceedCallBackInfo _exceed_cb;
-    MemExceedCallBackInfo _temp_exceed_cb;
+    ExceedCallBack _cb_func = nullptr;
 };
 
 inline void ThreadMemTrackerMgr::init() {
     DCHECK(_consumer_tracker_stack.empty());
     _task_id = "";
-    _exceed_cb.init();
     _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker();
     _check_limit = true;
 }
@@ -219,7 +190,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
             // The memory has been allocated, so when TryConsume fails, need 
to continue to complete
             // the consume to ensure the accuracy of the statistics.
             _limiter_tracker->consume(_untracked_mem);
-            exceeded(_untracked_mem, st);
+            exceeded(_untracked_mem);
         }
     } else {
         _limiter_tracker->consume(_untracked_mem);
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index fcce7fa43c..8f8cdd62b7 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -271,7 +271,7 @@ Status PlanFragmentExecutor::open() {
         if (_cancel_reason == PPlanFragmentCancelReason::CALL_RPC_ERROR) {
             status = Status::RuntimeError(_cancel_msg);
         } else if (_cancel_reason == 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED) {
-            status = Status::MemoryAllocFailed(_cancel_msg);
+            status = Status::MemoryLimitExceeded(_cancel_msg);
         }
     }
 
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index 6ae9c644a0..e62074ae62 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -72,21 +72,6 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
 #endif // USE_MEM_TRACKER
 }
 
-UpdateMemExceedCallBack::UpdateMemExceedCallBack(const std::string& 
cancel_msg, bool cancel_task,
-                                                 ExceedCallBack cb_func) {
-#ifdef USE_MEM_TRACKER
-    DCHECK(cancel_msg != std::string());
-    _old_cb = 
thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back(
-            cancel_msg, cancel_task, cb_func);
-#endif
-}
-
-UpdateMemExceedCallBack::~UpdateMemExceedCallBack() {
-#ifdef USE_MEM_TRACKER
-    
thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back(_old_cb);
-#endif // USE_MEM_TRACKER
-}
-
 SwitchBthread::SwitchBthread() {
 #ifdef USE_MEM_TRACKER
     _bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 254b465b00..d657e3b063 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -33,9 +33,11 @@
 #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
     auto VARNAME_LINENUM(add_mem_consumer) = 
doris::AddThreadMemTrackerConsumer(mem_tracker)
 
-#define SCOPED_UPDATE_MEM_EXCEED_CALL_BACK(cancel_msg, ...) \
-    auto VARNAME_LINENUM(update_exceed_cb) =                \
-            doris::UpdateMemExceedCallBack(cancel_msg, ##__VA_ARGS__)
+// Attach to task when thread starts
+#define SCOPED_ATTACH_TASK(arg1, ...) \
+    auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__)
+
+#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = 
SwitchBthread()
 
 namespace doris {
 
@@ -104,8 +106,8 @@ public:
         BRPC = 5
         // to be added ...
     };
-    inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", 
"LOAD", "COMPACTION",
-                                                     "STORAGE"};
+    inline static const std::string TaskTypeStr[] = {"UNKNOWN",    "QUERY",   
"LOAD",
+                                                     "COMPACTION", "STORAGE", 
"BRPC"};
 
 public:
     ThreadContext() {
@@ -139,8 +141,7 @@ public:
         _type = type;
         _task_id = task_id;
         _fragment_instance_id = fragment_instance_id;
-        _thread_mem_tracker_mgr->attach_limiter_tracker(TaskTypeStr[_type], 
task_id,
-                                                        fragment_instance_id, 
mem_tracker);
+        _thread_mem_tracker_mgr->attach_limiter_tracker(task_id, 
fragment_instance_id, mem_tracker);
     }
 
     void detach_task() {
@@ -225,19 +226,6 @@ public:
     ~AddThreadMemTrackerConsumer();
 };
 
-class UpdateMemExceedCallBack {
-public:
-    explicit UpdateMemExceedCallBack(const std::string& cancel_msg, bool 
cancel_task = true,
-                                     ExceedCallBack cb_func = nullptr);
-
-    ~UpdateMemExceedCallBack();
-
-private:
-#ifdef USE_MEM_TRACKER
-    MemExceedCallBackInfo _old_cb;
-#endif
-};
-
 class SwitchBthread {
 public:
     explicit SwitchBthread();
@@ -261,15 +249,8 @@ public:
     }
 };
 
-#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = 
SwitchBthread()
-
-// Attach to task when thread starts
-#define SCOPED_ATTACH_TASK(arg1, ...) \
-    auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__)
-
 #define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \
     auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit()
-
 #define CONSUME_THREAD_MEM_TRACKER(size) \
     doris::thread_context()->_thread_mem_tracker_mgr->consume(size)
 #define RELEASE_THREAD_MEM_TRACKER(size) \
@@ -278,5 +259,8 @@ public:
     doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size, 
tracker)
 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
     doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size, 
tracker)
-
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...)               \
+    return doris::thread_context()                           \
+            ->_thread_mem_tracker_mgr->limiter_mem_tracker() \
+            ->mem_limit_exceeded(state, msg, ##__VA_ARGS__);
 } // namespace doris
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index e6aa53b1da..98ffa85654 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1077,7 +1077,6 @@ Status HashJoinNode::open(RuntimeState* state) {
 
 Status HashJoinNode::_hash_table_build(RuntimeState* state) {
     RETURN_IF_ERROR(child(1)->open(state));
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash 
table.");
     SCOPED_TIMER(_build_timer);
     MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
 
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 5907d3c802..5acfc1ff6b 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -351,7 +351,6 @@ Status AggregationNode::prepare(RuntimeState* state) {
 Status AggregationNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::open(state));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute open.");
 
     RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
 
@@ -386,7 +385,6 @@ Status AggregationNode::get_next(RuntimeState* state, 
RowBatch* row_batch, bool*
 
 Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) 
{
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute get_next.");
 
     if (_is_streaming_preagg) {
         bool child_eos = false;
diff --git a/be/src/vec/exec/vcross_join_node.cpp 
b/be/src/vec/exec/vcross_join_node.cpp
index 4c79355326..a8564a6310 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -52,7 +52,6 @@ Status VCrossJoinNode::close(RuntimeState* state) {
 Status VCrossJoinNode::construct_build_side(RuntimeState* state) {
     // Do a full scan of child(1) and store all build row batches.
     RETURN_IF_ERROR(child(1)->open(state));
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Cross join, while getting next 
from the child 1");
 
     bool eos = false;
     while (true) {
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 44c1638463..a20b88b767 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -227,7 +227,6 @@ void VSetOperationNode::hash_table_init() {
 //build a hash table from child(0)
 Status VSetOperationNode::hash_table_build(RuntimeState* state) {
     RETURN_IF_ERROR(child(0)->open(state));
-    SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Set Operation Node, while 
constructing the hash table");
     Block block;
     MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to