This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cf7a74f6ec [fix](memory) query check cancel while waiting for memory 
in Allocator, and optimize log (#19967)
cf7a74f6ec is described below

commit cf7a74f6eceeaf56b7d8beb2dc94cb48ced813f7
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed May 24 11:08:48 2023 +0800

    [fix](memory) query check cancel while waiting for memory in Allocator, and 
optimize log (#19967)
    
    After the query check process memory exceed limit in Allocator, it will 
wait up to 5s.
    Before, Allocator will not check whether the query is canceled while 
waiting for memory, this causes the query to not end quickly.
---
 be/src/common/daemon.cpp                  |  6 ++--
 be/src/runtime/thread_context.cpp         |  6 ++--
 be/src/runtime/thread_context.h           |  9 +++---
 be/src/vec/common/allocator.cpp           | 50 +++++++++++++++++++++----------
 be/src/vec/runtime/vdata_stream_recvr.cpp |  2 +-
 5 files changed, 46 insertions(+), 27 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 82c7309766..38017bbca5 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -197,8 +197,8 @@ void Daemon::memory_maintenance_thread() {
         doris::MemInfo::refresh_proc_meminfo();
         doris::MemInfo::refresh_proc_mem_no_allocator_cache();
 
-        // Update and print memory stat when the memory changes by 100M.
-        if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 104857600) 
{
+        // Update and print memory stat when the memory changes by 256M.
+        if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) 
{
             last_print_proc_mem = PerfCounters::get_vm_rss();
             doris::MemTrackerLimiter::enable_print_log_process_usage();
 
@@ -213,7 +213,7 @@ void Daemon::memory_maintenance_thread() {
             }
 #endif
             LOG(INFO) << MemTrackerLimiter::
-                            process_mem_log_str(); // print mem log when 
memory state by 100M
+                            process_mem_log_str(); // print mem log when 
memory state by 256M
         }
     }
 }
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index 2e8845b206..15efa37b88 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -20,7 +20,6 @@
 #include "common/signal_handler.h"
 #include "runtime/runtime_state.h"
 #include "util/doris_metrics.h" // IWYU pragma: keep
-#include "util/uid_util.h"
 
 namespace doris {
 class MemTracker;
@@ -33,15 +32,14 @@ ThreadContextPtr::ThreadContextPtr() {
 }
 
 AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
-                       const std::string& task_id, const TUniqueId& 
fragment_instance_id) {
+                       const TUniqueId& task_id, const TUniqueId& 
fragment_instance_id) {
     thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker);
 }
 
 AttachTask::AttachTask(RuntimeState* runtime_state) {
     doris::signal::query_id_hi = runtime_state->query_id().hi;
     doris::signal::query_id_lo = runtime_state->query_id().lo;
-    thread_context()->attach_task(print_id(runtime_state->query_id()),
-                                  runtime_state->fragment_instance_id(),
+    thread_context()->attach_task(runtime_state->query_id(), 
runtime_state->fragment_instance_id(),
                                   runtime_state->query_mem_tracker());
 }
 
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index c6f6a46c4e..4fbfd9155d 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -149,7 +149,7 @@ public:
 
     ~ThreadContext() { thread_context_ptr.init = false; }
 
-    void attach_task(const std::string& task_id, const TUniqueId& 
fragment_instance_id,
+    void attach_task(const TUniqueId& task_id, const TUniqueId& 
fragment_instance_id,
                      const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
 #ifndef BE_TEST
         // will only attach_task at the beginning of the thread function, 
there should be no duplicate attach_task.
@@ -164,11 +164,12 @@ public:
     }
 
     void detach_task() {
-        _task_id = "";
+        _task_id = TUniqueId();
         _fragment_instance_id = TUniqueId();
         thread_mem_tracker_mgr->detach_limiter_tracker();
     }
 
+    const TUniqueId& task_id() const { return _task_id; }
     const TUniqueId& fragment_instance_id() const { return 
_fragment_instance_id; }
 
     std::string get_thread_id() {
@@ -189,7 +190,7 @@ public:
     }
 
 private:
-    std::string _task_id = "";
+    TUniqueId _task_id;
     TUniqueId _fragment_instance_id;
 };
 
@@ -252,7 +253,7 @@ private:
 class AttachTask {
 public:
     explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
-                        const std::string& task_id = "",
+                        const TUniqueId& task_id = TUniqueId(),
                         const TUniqueId& fragment_instance_id = TUniqueId());
 
     explicit AttachTask(RuntimeState* runtime_state);
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index e41454a5c3..b74ed398d4 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -27,10 +27,12 @@
 #include <thread>
 
 // Allocator is used by too many files. For compilation speed, put 
dependencies in `.cpp` as much as possible.
+#include "runtime/fragment_mgr.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 #include "runtime/thread_context.h"
 #include "util/mem_info.h"
+#include "util/uid_util.h"
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap>
 void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t size) const {
@@ -39,41 +41,50 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
         // Only thread attach query, and has not completely waited for 
thread_wait_gc_max_milliseconds,
         // will wait for gc, asynchronous cancel or throw bad::alloc.
         // Otherwise, if the external catch, directly throw bad::alloc.
+        auto err_msg = fmt::format(
+                "Allocator sys memory check failed: Cannot alloc:{}, consuming 
"
+                "tracker:<{}>, exec node:<{}>, {}.",
+                size, doris::thread_context()->thread_mem_tracker()->label(),
+                
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
+                doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
         if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() 
&&
             doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) {
             int64_t wait_milliseconds = 
doris::config::thread_wait_gc_max_milliseconds;
+            LOG(INFO) << fmt::format("Query:{} waiting for enough memory, 
maximum 5s, {}.",
+                                     
print_id(doris::thread_context()->task_id()), err_msg);
             while (wait_milliseconds > 0) {
                 std::this_thread::sleep_for(std::chrono::milliseconds(100));
                 if 
(!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
                     doris::MemInfo::refresh_interval_memory_growth += size;
                     break;
                 }
+                if 
(doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+                            doris::thread_context()->task_id())) {
+                    wait_milliseconds = 0;
+                    break;
+                }
                 wait_milliseconds -= 100;
             }
             if (wait_milliseconds <= 0) {
                 // Make sure to completely wait 
thread_wait_gc_max_milliseconds only once.
                 
doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
-                auto err_msg = fmt::format(
-                        "Allocator sys memory check failed: Cannot alloc:{}, 
consuming "
-                        "tracker:<{}>, exec node:<{}>, {}.",
-                        size, 
doris::thread_context()->thread_mem_tracker()->label(),
-                        
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
-                        
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
                 doris::MemTrackerLimiter::print_log_process_usage(err_msg);
                 // If the external catch, throw bad::alloc first, let the 
query actively cancel. Otherwise asynchronous cancel.
                 if (!doris::enable_thread_catch_bad_alloc) {
+                    LOG(INFO) << fmt::format(
+                            "Query:{} canceled asyn, after waiting for memory 
5s, {}.",
+                            print_id(doris::thread_context()->task_id()), 
err_msg);
                     
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
                 } else {
+                    LOG(INFO) << fmt::format(
+                            "Query:{} throw exception, after waiting for 
memory 5s, {}.",
+                            print_id(doris::thread_context()->task_id()), 
err_msg);
                     throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
                 }
             }
+            // else, enough memory is available, the query continues execute.
         } else if (doris::enable_thread_catch_bad_alloc) {
-            auto err_msg = fmt::format(
-                    "Allocator sys memory check failed: Cannot alloc:{}, 
consuming tracker:<{}>, "
-                    "exec node:<{}>, {}.",
-                    size, 
doris::thread_context()->thread_mem_tracker()->label(),
-                    
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
-                    
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
+            LOG(INFO) << fmt::format("throw exception, {}.", err_msg);
             doris::MemTrackerLimiter::print_log_process_usage(err_msg);
             throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
         }
@@ -85,7 +96,6 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::memory_tracker_check(siz
     if (doris::skip_memory_check) return;
     auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size);
     if (!st) {
-        doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
         auto err_msg =
                 
doris::thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str(
                         st.to_string(),
@@ -93,9 +103,19 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::memory_tracker_check(siz
                         "Allocator mem tracker check failed");
         
doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg);
         // If the external catch, throw bad::alloc first, let the query 
actively cancel. Otherwise asynchronous cancel.
-        if (!doris::enable_thread_catch_bad_alloc) {
-            
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
+        if 
(doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) {
+            doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
+            if (!doris::enable_thread_catch_bad_alloc) {
+                LOG(INFO) << fmt::format("Query:{} canceled asyn, {}.",
+                                         
print_id(doris::thread_context()->task_id()), err_msg);
+                
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
+            } else {
+                LOG(INFO) << fmt::format("Query:{} throw exception, {}.",
+                                         
print_id(doris::thread_context()->task_id()), err_msg);
+                throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
+            }
         } else {
+            LOG(INFO) << fmt::format("throw exception, {}.", err_msg);
             throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
         }
     }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 1c965a9d3d..ebee26783a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -360,7 +360,7 @@ Status VDataStreamRecvr::create_merger(const 
std::vector<VExprContext*>& orderin
 
 void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int 
be_number,
                                  int64_t packet_seq, 
::google::protobuf::Closure** done) {
-    SCOPED_ATTACH_TASK(_query_mem_tracker, print_id(_query_id), 
_fragment_instance_id);
+    SCOPED_ATTACH_TASK(_query_mem_tracker, _query_id, _fragment_instance_id);
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, 
done);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to