This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cf7a74f6ec [fix](memory) query check cancel while waiting for memory in Allocator, and optimize log (#19967) cf7a74f6ec is described below commit cf7a74f6eceeaf56b7d8beb2dc94cb48ced813f7 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed May 24 11:08:48 2023 +0800 [fix](memory) query check cancel while waiting for memory in Allocator, and optimize log (#19967) After the query check process memory exceed limit in Allocator, it will wait up to 5s. Before, Allocator will not check whether the query is canceled while waiting for memory, this causes the query to not end quickly. --- be/src/common/daemon.cpp | 6 ++-- be/src/runtime/thread_context.cpp | 6 ++-- be/src/runtime/thread_context.h | 9 +++--- be/src/vec/common/allocator.cpp | 50 +++++++++++++++++++++---------- be/src/vec/runtime/vdata_stream_recvr.cpp | 2 +- 5 files changed, 46 insertions(+), 27 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 82c7309766..38017bbca5 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -197,8 +197,8 @@ void Daemon::memory_maintenance_thread() { doris::MemInfo::refresh_proc_meminfo(); doris::MemInfo::refresh_proc_mem_no_allocator_cache(); - // Update and print memory stat when the memory changes by 100M. - if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 104857600) { + // Update and print memory stat when the memory changes by 256M. + if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) { last_print_proc_mem = PerfCounters::get_vm_rss(); doris::MemTrackerLimiter::enable_print_log_process_usage(); @@ -213,7 +213,7 @@ void Daemon::memory_maintenance_thread() { } #endif LOG(INFO) << MemTrackerLimiter:: - process_mem_log_str(); // print mem log when memory state by 100M + process_mem_log_str(); // print mem log when memory state by 256M } } } diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 2e8845b206..15efa37b88 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -20,7 +20,6 @@ #include "common/signal_handler.h" #include "runtime/runtime_state.h" #include "util/doris_metrics.h" // IWYU pragma: keep -#include "util/uid_util.h" namespace doris { class MemTracker; @@ -33,15 +32,14 @@ ThreadContextPtr::ThreadContextPtr() { } AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, - const std::string& task_id, const TUniqueId& fragment_instance_id) { + const TUniqueId& task_id, const TUniqueId& fragment_instance_id) { thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker); } AttachTask::AttachTask(RuntimeState* runtime_state) { doris::signal::query_id_hi = runtime_state->query_id().hi; doris::signal::query_id_lo = runtime_state->query_id().lo; - thread_context()->attach_task(print_id(runtime_state->query_id()), - runtime_state->fragment_instance_id(), + thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(), runtime_state->query_mem_tracker()); } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index c6f6a46c4e..4fbfd9155d 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -149,7 +149,7 @@ public: ~ThreadContext() { thread_context_ptr.init = false; } - void attach_task(const std::string& task_id, const TUniqueId& fragment_instance_id, + void attach_task(const TUniqueId& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { #ifndef BE_TEST // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. @@ -164,11 +164,12 @@ public: } void detach_task() { - _task_id = ""; + _task_id = TUniqueId(); _fragment_instance_id = TUniqueId(); thread_mem_tracker_mgr->detach_limiter_tracker(); } + const TUniqueId& task_id() const { return _task_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } std::string get_thread_id() { @@ -189,7 +190,7 @@ public: } private: - std::string _task_id = ""; + TUniqueId _task_id; TUniqueId _fragment_instance_id; }; @@ -252,7 +253,7 @@ private: class AttachTask { public: explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, - const std::string& task_id = "", + const TUniqueId& task_id = TUniqueId(), const TUniqueId& fragment_instance_id = TUniqueId()); explicit AttachTask(RuntimeState* runtime_state); diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index e41454a5c3..b74ed398d4 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -27,10 +27,12 @@ #include <thread> // Allocator is used by too many files. For compilation speed, put dependencies in `.cpp` as much as possible. +#include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/thread_context.h" #include "util/mem_info.h" +#include "util/uid_util.h" template <bool clear_memory_, bool mmap_populate, bool use_mmap> void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t size) const { @@ -39,41 +41,50 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t // Only thread attach query, and has not completely waited for thread_wait_gc_max_milliseconds, // will wait for gc, asynchronous cancel or throw bad::alloc. // Otherwise, if the external catch, directly throw bad::alloc. + auto err_msg = fmt::format( + "Allocator sys memory check failed: Cannot alloc:{}, consuming " + "tracker:<{}>, exec node:<{}>, {}.", + size, doris::thread_context()->thread_mem_tracker()->label(), + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds; + LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum 5s, {}.", + print_id(doris::thread_context()->task_id()), err_msg); while (wait_milliseconds > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { doris::MemInfo::refresh_interval_memory_growth += size; break; } + if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( + doris::thread_context()->task_id())) { + wait_milliseconds = 0; + break; + } wait_milliseconds -= 100; } if (wait_milliseconds <= 0) { // Make sure to completely wait thread_wait_gc_max_milliseconds only once. doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); - auto err_msg = fmt::format( - "Allocator sys memory check failed: Cannot alloc:{}, consuming " - "tracker:<{}>, exec node:<{}>, {}.", - size, doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), - doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); doris::MemTrackerLimiter::print_log_process_usage(err_msg); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. if (!doris::enable_thread_catch_bad_alloc) { + LOG(INFO) << fmt::format( + "Query:{} canceled asyn, after waiting for memory 5s, {}.", + print_id(doris::thread_context()->task_id()), err_msg); doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); } else { + LOG(INFO) << fmt::format( + "Query:{} throw exception, after waiting for memory 5s, {}.", + print_id(doris::thread_context()->task_id()), err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } } + // else, enough memory is available, the query continues execute. } else if (doris::enable_thread_catch_bad_alloc) { - auto err_msg = fmt::format( - "Allocator sys memory check failed: Cannot alloc:{}, consuming tracker:<{}>, " - "exec node:<{}>, {}.", - size, doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), - doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); + LOG(INFO) << fmt::format("throw exception, {}.", err_msg); doris::MemTrackerLimiter::print_log_process_usage(err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } @@ -85,7 +96,6 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz if (doris::skip_memory_check) return; auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); if (!st) { - doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); auto err_msg = doris::thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str( st.to_string(), @@ -93,9 +103,19 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz "Allocator mem tracker check failed"); doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. - if (!doris::enable_thread_catch_bad_alloc) { - doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) { + doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); + if (!doris::enable_thread_catch_bad_alloc) { + LOG(INFO) << fmt::format("Query:{} canceled asyn, {}.", + print_id(doris::thread_context()->task_id()), err_msg); + doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + } else { + LOG(INFO) << fmt::format("Query:{} throw exception, {}.", + print_id(doris::thread_context()->task_id()), err_msg); + throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); + } } else { + LOG(INFO) << fmt::format("throw exception, {}.", err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 1c965a9d3d..ebee26783a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -360,7 +360,7 @@ Status VDataStreamRecvr::create_merger(const std::vector<VExprContext*>& orderin void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - SCOPED_ATTACH_TASK(_query_mem_tracker, print_id(_query_id), _fragment_instance_id); + SCOPED_ATTACH_TASK(_query_mem_tracker, _query_id, _fragment_instance_id); int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org