This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4268634115f3acef3bb332fa6ee664b52cc3072a
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Mon Mar 11 22:09:08 2024 +0800

    [fix](memory) Fix Allocator cancel pipelinex query #32048
---
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 13 ++++++-------
 be/src/runtime/memory/thread_mem_tracker_mgr.h   | 10 +++++-----
 be/src/runtime/thread_context.cpp                |  7 +++----
 be/src/runtime/thread_context.h                  | 13 ++++++-------
 be/src/vec/common/allocator.cpp                  |  4 ++--
 be/src/vec/runtime/vdata_stream_recvr.cpp        |  2 +-
 6 files changed, 23 insertions(+), 26 deletions(-)

diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 0a1686704c0..1feca6976b8 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -25,12 +25,11 @@
 namespace doris {
 
 void ThreadMemTrackerMgr::attach_limiter_tracker(
-        const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
-        const TUniqueId& fragment_instance_id) {
+        const std::shared_ptr<MemTrackerLimiter>& mem_tracker, const 
TUniqueId& query_id) {
     DCHECK(mem_tracker);
     CHECK(init());
     flush_untracked_mem();
-    _fragment_instance_id = fragment_instance_id;
+    _query_id = query_id;
     _limiter_tracker = mem_tracker;
     _limiter_tracker_raw = mem_tracker.get();
     _wait_gc = true;
@@ -40,15 +39,15 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
         const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) {
     CHECK(init());
     flush_untracked_mem();
-    _fragment_instance_id = TUniqueId();
+    _query_id = TUniqueId();
     _limiter_tracker = old_mem_tracker;
     _limiter_tracker_raw = old_mem_tracker.get();
     _wait_gc = false;
 }
 
-void ThreadMemTrackerMgr::cancel_instance(const std::string& exceed_msg) {
-    ExecEnv::GetInstance()->fragment_mgr()->cancel_instance(
-            _fragment_instance_id, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
+void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
+    ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+            _query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, 
exceed_msg);
 }
 
 } // namespace doris
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 4cb22b9e1ae..d54f77ce3fa 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -53,7 +53,7 @@ public:
 
     // After attach, the current thread Memory Hook starts to consume/release 
task mem_tracker
     void attach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker,
-                                const TUniqueId& fragment_instance_id);
+                                const TUniqueId& query_id);
     void detach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& 
old_mem_tracker =
                                         
ExecEnv::GetInstance()->orphan_mem_tracker());
 
@@ -82,7 +82,7 @@ public:
     void consume(int64_t size, int skip_large_memory_check = 0);
     void flush_untracked_mem();
 
-    bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
+    bool is_attach_query() { return _query_id != TUniqueId(); }
 
     std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
         CHECK(init());
@@ -95,7 +95,7 @@ public:
 
     void disable_wait_gc() { _wait_gc = false; }
     [[nodiscard]] bool wait_gc() const { return _wait_gc; }
-    void cancel_instance(const std::string& exceed_msg);
+    void cancel_query(const std::string& exceed_msg);
 
     std::string print_debug_string() {
         fmt::memory_buffer consumer_tracker_buf;
@@ -130,7 +130,7 @@ private:
 
     // If there is a memory new/delete operation in the consume method, it may 
enter infinite recursion.
     bool _stop_consume = false;
-    TUniqueId _fragment_instance_id = TUniqueId();
+    TUniqueId _query_id = TUniqueId();
 };
 
 inline bool ThreadMemTrackerMgr::init() {
@@ -185,7 +185,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_che
                 "malloc or new large memory: {}, {}, this is just a warning, 
not prevent memory "
                 "alloc, stacktrace:\n{}",
                 size,
-                is_attach_query() ? "in query or load: " + 
print_id(_fragment_instance_id)
+                is_attach_query() ? "in query or load: " + print_id(_query_id)
                                   : "not in query or load",
                 get_stack_trace());
         _stop_consume = false;
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index fca09fcabc5..03c10986bd8 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -24,18 +24,17 @@ namespace doris {
 class MemTracker;
 
 AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
-                       const TUniqueId& task_id, const TUniqueId& 
fragment_instance_id) {
+                       const TUniqueId& task_id) {
     ThreadLocalHandle::create_thread_local_if_not_exits();
     signal::set_signal_task_id(task_id);
-    thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker);
+    thread_context()->attach_task(task_id, mem_tracker);
 }
 
 AttachTask::AttachTask(RuntimeState* runtime_state) {
     ThreadLocalHandle::create_thread_local_if_not_exits();
     signal::set_signal_task_id(runtime_state->query_id());
     signal::set_signal_is_nereids(runtime_state->is_nereids());
-    thread_context()->attach_task(runtime_state->query_id(), 
runtime_state->fragment_instance_id(),
-                                  runtime_state->query_mem_tracker());
+    thread_context()->attach_task(runtime_state->query_id(), 
runtime_state->query_mem_tracker());
 }
 
 AttachTask::~AttachTask() {
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index f6ca46acaba..463dbf44349 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -45,8 +45,8 @@
 // This will save some info about a working thread in the thread context.
 // Looking forward to tracking memory during thread execution into 
MemTrackerLimiter.
 #define SCOPED_ATTACH_TASK(arg1) auto VARNAME_LINENUM(attach_task) = 
AttachTask(arg1)
-#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2, arg3) \
-    auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2, arg3)
+#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \
+    auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2)
 
 // Switch MemTrackerLimiter for count memory during thread execution.
 // Used after SCOPED_ATTACH_TASK, in order to count the memory into another
@@ -60,7 +60,7 @@
     auto VARNAME_LINENUM(add_mem_consumer) = 
doris::AddThreadMemTrackerConsumer(mem_tracker)
 #else
 #define SCOPED_ATTACH_TASK(arg1, ...) (void)0
-#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2, arg3) (void)0
+#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) (void)0
 #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) (void)0
 #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0
 #endif
@@ -134,7 +134,7 @@ public:
 
     ~ThreadContext() = default;
 
-    void attach_task(const TUniqueId& task_id, const TUniqueId& 
fragment_instance_id,
+    void attach_task(const TUniqueId& task_id,
                      const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
 #ifndef BE_TEST
         // will only attach_task at the beginning of the thread function, 
there should be no duplicate attach_task.
@@ -144,7 +144,7 @@ public:
                 << ", attach mem tracker label: " << mem_tracker->label();
 #endif
         _task_id = task_id;
-        thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, 
fragment_instance_id);
+        thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, _task_id);
     }
 
     void detach_task() {
@@ -292,8 +292,7 @@ private:
 class AttachTask {
 public:
     explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
-                        const TUniqueId& task_id = TUniqueId(),
-                        const TUniqueId& fragment_instance_id = TUniqueId());
+                        const TUniqueId& task_id = TUniqueId());
 
     explicit AttachTask(RuntimeState* runtime_state);
 
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 3d6d2a7a11e..f1dd2af83f7 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -110,7 +110,7 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
                             "Query:{} canceled asyn, after waiting for memory 
{}ms, {}.",
                             print_id(doris::thread_context()->task_id()), 
wait_milliseconds,
                             err_msg);
-                    
doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg);
+                    
doris::thread_context()->thread_mem_tracker_mgr->cancel_query(err_msg);
                 } else {
                     LOG(INFO) << fmt::format(
                             "Query:{} throw exception, after waiting for 
memory {}ms, {}.",
@@ -148,7 +148,7 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::memory_tracker_check(siz
             if (!doris::enable_thread_catch_bad_alloc) {
                 LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.",
                                          
print_id(doris::thread_context()->task_id()), err_msg);
-                
doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg);
+                
doris::thread_context()->thread_mem_tracker_mgr->cancel_query(err_msg);
             } else {
                 LOG(INFO) << fmt::format("query/load:{} throw exception, {}.",
                                          
print_id(doris::thread_context()->task_id()), err_msg);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 0bef6b6eaab..1cfc359de30 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -425,7 +425,7 @@ Status VDataStreamRecvr::create_merger(const 
VExprContextSPtrs& ordering_expr,
 
 Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int 
be_number,
                                    int64_t packet_seq, 
::google::protobuf::Closure** done) {
-    SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, 
_fragment_instance_id);
+    SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id);
     int use_sender_id = _is_merging ? sender_id : 0;
     return _sender_queues[use_sender_id]->add_block(pblock, be_number, 
packet_seq, done);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to