This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 29d36c7207 [fix][branch-1.2](memory) Fix crash at bthread_setspecific 
in brpc::Socket::CheckHealth() #22334
29d36c7207 is described below

commit 29d36c720733d21e3f871519b7a30bebfb28cb13
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Fri Jul 28 21:13:01 2023 +0800

    [fix][branch-1.2](memory) Fix crash at bthread_setspecific in 
brpc::Socket::CheckHealth() #22334
---
 be/src/runtime/thread_context.cpp | 28 +++++++-----
 be/src/runtime/thread_context.h   | 89 ++++++++++++++++++++++++++++-----------
 2 files changed, 82 insertions(+), 35 deletions(-)

diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index d30d836064..71957c366b 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -19,7 +19,6 @@
 
 #include "common/signal_handler.h"
 #include "runtime/runtime_state.h"
-#include "util/doris_metrics.h"
 
 namespace doris {
 
@@ -41,10 +40,12 @@ ScopeMemCount::~ScopeMemCount() {
 
 AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
                        const std::string& task_id, const TUniqueId& 
fragment_instance_id) {
+    SwitchBthreadLocal::switch_to_bthread_local();
     thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker);
 }
 
 AttachTask::AttachTask(RuntimeState* runtime_state) {
+    SwitchBthreadLocal::switch_to_bthread_local();
     doris::signal::query_id_hi = runtime_state->query_id().hi;
     doris::signal::query_id_lo = runtime_state->query_id().lo;
     thread_context()->attach_task(print_id(runtime_state->query_id()),
@@ -54,36 +55,43 @@ AttachTask::AttachTask(RuntimeState* runtime_state) {
 
 AttachTask::~AttachTask() {
     thread_context()->detach_task();
-#ifndef NDEBUG
-    DorisMetrics::instance()->attach_task_thread_count->increment(1);
-#endif // NDEBUG
+    SwitchBthreadLocal::switch_back_pthread_local();
 }
 
 SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter(
         const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
+     SwitchBthreadLocal::switch_to_bthread_local();
     _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
     
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, 
TUniqueId());
 }
 
 SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() {
     
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
+    SwitchBthreadLocal::switch_back_pthread_local();
 }
 
 AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* 
mem_tracker) {
-    _need_pop = 
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
+    SwitchBthreadLocal::switch_to_bthread_local();
+    if (mem_tracker) {
+        _need_pop = 
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
+    }
 }
 
 AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
         const std::shared_ptr<MemTracker>& mem_tracker)
         : _mem_tracker(mem_tracker) {
-    _need_pop = 
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
+    SwitchBthreadLocal::switch_to_bthread_local();
+    if (_mem_tracker) {
+        _need_pop =
+                
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
+    }
 }
 
 AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
-#ifndef NDEBUG
-    
DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1);
-#endif // NDEBUG
-    if (_need_pop) 
thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
+    if (_need_pop) {
+        thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
+    }
+    SwitchBthreadLocal::switch_back_pthread_local();
 }
 
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index c2e66ba42e..0a4a676bb8 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <bthread/bthread.h>
+#include <bthread/types.h>
 
 #include <string>
 #include <thread>
@@ -27,10 +28,10 @@
 #include "gutil/macros.h"
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 #include "runtime/threadlocal.h"
-#include "util/defer_op.h"
+#include "util/defer_op.h" // IWYU pragma: keep
 
 // Used to observe the memory usage of the specified code segment
-#ifdef USE_MEM_TRACKER
+#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER)
 // Count a code segment memory (memory malloc - memory free) to int64_t
 // Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; 
xxx; }
 #define SCOPED_MEM_COUNT(scope_mem) \
@@ -48,7 +49,7 @@
 #endif
 
 // Used to observe query/load/compaction/e.g. execution thread memory usage 
and respond when memory exceeds the limit.
-#ifdef USE_MEM_TRACKER
+#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER)
 // Attach to query/load/compaction/e.g. when thread starts.
 // This will save some info about a working thread in the thread context.
 // And count the memory during thread execution (is actually also the code 
segment that executes the function)
@@ -86,6 +87,7 @@ namespace doris {
 class TUniqueId;
 class ThreadContext;
 
+extern bool k_doris_exit;
 extern bthread_key_t btls_key;
 
 // Using gcc11 compiles thread_local variable on lower versions of GLIBC will 
report an error,
@@ -186,42 +188,79 @@ public:
         return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
     }
 
+    int switch_bthread_local_count = 0;
+
 private:
     std::string _task_id = "";
     TUniqueId _fragment_instance_id;
 };
 
+// Switch thread context from pthread local to bthread local context.
 // Cache the pointer of bthread local in pthead local,
 // Avoid calling bthread_getspecific frequently to get bthread local, which 
has performance problems.
-static void pthread_attach_bthread() {
-    bthread_id = bthread_self();
-    bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-    if (bthread_context == nullptr) {
-        // A new bthread starts, two scenarios:
-        // 1. First call to bthread_getspecific (and before any 
bthread_setspecific) returns NULL
-        // 2. There are not enough reusable btls in btls pool.
-        // else, two scenarios:
-        // 1. A new bthread starts, but get a reuses btls.
-        // 2. A pthread switch occurs. Because the pthread switch cannot be 
accurately identified at the moment.
-        // So tracker call reset 0 like reuses btls.
-        bthread_context = new ThreadContext;
-        // The brpc server should respond as quickly as possible.
-        bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
-        // set the data so that next time bthread_getspecific in the thread 
returns the data.
-        CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
+class SwitchBthreadLocal {
+public:
+    static void switch_to_bthread_local() {
+        if (bthread_self() != 0) {
+            // Very frequent bthread_getspecific will slow, but 
switch_to_bthread_local is not expected to be much.
+            bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+            if (bthread_context == nullptr) {
+                // A new bthread starts, two scenarios:
+                // 1. First call to bthread_getspecific (and before any 
bthread_setspecific) returns NULL
+                // 2. There are not enough reusable btls in btls pool.
+                // else, two scenarios:
+                // 1. A new bthread starts, but get a reuses btls.
+                // 2. A pthread switch occurs. Because the pthread switch 
cannot be accurately identified at the moment.
+                // So tracker call reset 0 like reuses btls.
+                // during this period, stop the use of thread_context.
+                thread_context_ptr.init = false;
+                bthread_context = new ThreadContext;
+                // The brpc server should respond as quickly as possible.
+                bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
+                // set the data so that next time bthread_getspecific in the 
thread returns the data.
+                CHECK((0 == bthread_setspecific(btls_key, bthread_context)) || 
doris::k_doris_exit);
+                thread_context_ptr.init = true;
+            }
+            bthread_id = bthread_self();
+            bthread_context->switch_bthread_local_count++;
+        }
     }
-}
 
+    // `switch_to_bthread_local` and `switch_back_pthread_local` should be 
used in pairs,
+    // `switch_to_bthread_local` should only be called if 
`switch_to_bthread_local` returns true
+    static void switch_back_pthread_local() {
+        if (bthread_self() != 0) {
+            if (!bthread_equal(bthread_self(), bthread_id)) {
+                bthread_id = bthread_self();
+                bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+                DCHECK(bthread_context != nullptr);
+            }
+            bthread_context->switch_bthread_local_count--;
+            if (bthread_context->switch_bthread_local_count == 0) {
+                bthread_context = thread_context_ptr._ptr;
+            }
+        }
+    }
+};
+
+// Note: All use of thread_context() in bthread requires the use of 
SwitchBthreadLocal.
 static ThreadContext* thread_context() {
     if (bthread_self() != 0) {
-        if (bthread_self() != bthread_id) {
-            // A new bthread starts or pthread switch occurs, during this 
period, stop the use of thread_context.
-            thread_context_ptr.init = false;
-            pthread_attach_bthread();
-            thread_context_ptr.init = true;
+        // in bthread
+        if (!bthread_equal(bthread_self(), bthread_id)) {
+            // bthread switching pthread may be very frequent, remember not to 
use lock or other time-consuming operations.
+            bthread_id = bthread_self();
+            bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+            // if nullptr, a new bthread task start and no reusable bthread 
local,
+            // or bthread switch pthread but not call switch_to_bthread_local, 
use pthread local context
+            // else, bthread switch pthread and called 
switch_to_bthread_local, use bthread local context.
+            if (bthread_context == nullptr) {
+                bthread_context = thread_context_ptr._ptr;
+            }
         }
         return bthread_context;
     } else {
+        // in pthread
         return thread_context_ptr._ptr;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to