This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new 29d36c7207 [fix][branch-1.2](memory) Fix crash at bthread_setspecific in brpc::Socket::CheckHealth() #22334 29d36c7207 is described below commit 29d36c720733d21e3f871519b7a30bebfb28cb13 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Fri Jul 28 21:13:01 2023 +0800 [fix][branch-1.2](memory) Fix crash at bthread_setspecific in brpc::Socket::CheckHealth() #22334 --- be/src/runtime/thread_context.cpp | 28 +++++++----- be/src/runtime/thread_context.h | 89 ++++++++++++++++++++++++++++----------- 2 files changed, 82 insertions(+), 35 deletions(-) diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index d30d836064..71957c366b 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -19,7 +19,6 @@ #include "common/signal_handler.h" #include "runtime/runtime_state.h" -#include "util/doris_metrics.h" namespace doris { @@ -41,10 +40,12 @@ ScopeMemCount::~ScopeMemCount() { AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, const std::string& task_id, const TUniqueId& fragment_instance_id) { + SwitchBthreadLocal::switch_to_bthread_local(); thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker); } AttachTask::AttachTask(RuntimeState* runtime_state) { + SwitchBthreadLocal::switch_to_bthread_local(); doris::signal::query_id_hi = runtime_state->query_id().hi; doris::signal::query_id_lo = runtime_state->query_id().lo; thread_context()->attach_task(print_id(runtime_state->query_id()), @@ -54,36 +55,43 @@ AttachTask::AttachTask(RuntimeState* runtime_state) { AttachTask::~AttachTask() { thread_context()->detach_task(); -#ifndef NDEBUG - DorisMetrics::instance()->attach_task_thread_count->increment(1); -#endif // NDEBUG + SwitchBthreadLocal::switch_back_pthread_local(); } SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter( const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { + SwitchBthreadLocal::switch_to_bthread_local(); _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId()); } SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() { thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); + SwitchBthreadLocal::switch_back_pthread_local(); } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { - _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); + SwitchBthreadLocal::switch_to_bthread_local(); + if (mem_tracker) { + _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); + } } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer( const std::shared_ptr<MemTracker>& mem_tracker) : _mem_tracker(mem_tracker) { - _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); + SwitchBthreadLocal::switch_to_bthread_local(); + if (_mem_tracker) { + _need_pop = + thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); + } } AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { -#ifndef NDEBUG - DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1); -#endif // NDEBUG - if (_need_pop) thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); + if (_need_pop) { + thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); + } + SwitchBthreadLocal::switch_back_pthread_local(); } } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index c2e66ba42e..0a4a676bb8 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -18,6 +18,7 @@ #pragma once #include <bthread/bthread.h> +#include <bthread/types.h> #include <string> #include <thread> @@ -27,10 +28,10 @@ #include "gutil/macros.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/threadlocal.h" -#include "util/defer_op.h" +#include "util/defer_op.h" // IWYU pragma: keep // Used to observe the memory usage of the specified code segment -#ifdef USE_MEM_TRACKER +#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER) // Count a code segment memory (memory malloc - memory free) to int64_t // Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; xxx; } #define SCOPED_MEM_COUNT(scope_mem) \ @@ -48,7 +49,7 @@ #endif // Used to observe query/load/compaction/e.g. execution thread memory usage and respond when memory exceeds the limit. -#ifdef USE_MEM_TRACKER +#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER) // Attach to query/load/compaction/e.g. when thread starts. // This will save some info about a working thread in the thread context. // And count the memory during thread execution (is actually also the code segment that executes the function) @@ -86,6 +87,7 @@ namespace doris { class TUniqueId; class ThreadContext; +extern bool k_doris_exit; extern bthread_key_t btls_key; // Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error, @@ -186,42 +188,79 @@ public: return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); } + int switch_bthread_local_count = 0; + private: std::string _task_id = ""; TUniqueId _fragment_instance_id; }; +// Switch thread context from pthread local to bthread local context. // Cache the pointer of bthread local in pthead local, // Avoid calling bthread_getspecific frequently to get bthread local, which has performance problems. -static void pthread_attach_bthread() { - bthread_id = bthread_self(); - bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); - if (bthread_context == nullptr) { - // A new bthread starts, two scenarios: - // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL - // 2. There are not enough reusable btls in btls pool. - // else, two scenarios: - // 1. A new bthread starts, but get a reuses btls. - // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. - // So tracker call reset 0 like reuses btls. - bthread_context = new ThreadContext; - // The brpc server should respond as quickly as possible. - bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); - // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); +class SwitchBthreadLocal { +public: + static void switch_to_bthread_local() { + if (bthread_self() != 0) { + // Very frequent bthread_getspecific will slow, but switch_to_bthread_local is not expected to be much. + bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); + if (bthread_context == nullptr) { + // A new bthread starts, two scenarios: + // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL + // 2. There are not enough reusable btls in btls pool. + // else, two scenarios: + // 1. A new bthread starts, but get a reuses btls. + // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. + // So tracker call reset 0 like reuses btls. + // during this period, stop the use of thread_context. + thread_context_ptr.init = false; + bthread_context = new ThreadContext; + // The brpc server should respond as quickly as possible. + bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); + // set the data so that next time bthread_getspecific in the thread returns the data. + CHECK((0 == bthread_setspecific(btls_key, bthread_context)) || doris::k_doris_exit); + thread_context_ptr.init = true; + } + bthread_id = bthread_self(); + bthread_context->switch_bthread_local_count++; + } } -} + // `switch_to_bthread_local` and `switch_back_pthread_local` should be used in pairs, + // `switch_to_bthread_local` should only be called if `switch_to_bthread_local` returns true + static void switch_back_pthread_local() { + if (bthread_self() != 0) { + if (!bthread_equal(bthread_self(), bthread_id)) { + bthread_id = bthread_self(); + bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); + DCHECK(bthread_context != nullptr); + } + bthread_context->switch_bthread_local_count--; + if (bthread_context->switch_bthread_local_count == 0) { + bthread_context = thread_context_ptr._ptr; + } + } + } +}; + +// Note: All use of thread_context() in bthread requires the use of SwitchBthreadLocal. static ThreadContext* thread_context() { if (bthread_self() != 0) { - if (bthread_self() != bthread_id) { - // A new bthread starts or pthread switch occurs, during this period, stop the use of thread_context. - thread_context_ptr.init = false; - pthread_attach_bthread(); - thread_context_ptr.init = true; + // in bthread + if (!bthread_equal(bthread_self(), bthread_id)) { + // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. + bthread_id = bthread_self(); + bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); + // if nullptr, a new bthread task start and no reusable bthread local, + // or bthread switch pthread but not call switch_to_bthread_local, use pthread local context + // else, bthread switch pthread and called switch_to_bthread_local, use bthread local context. + if (bthread_context == nullptr) { + bthread_context = thread_context_ptr._ptr; + } } return bthread_context; } else { + // in pthread return thread_context_ptr._ptr; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org