This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 26bc462e1c [feature-wip] (memory tracker) (step5) Fix track bthread, 
fix track vectorized query (#9145)
26bc462e1c is described below

commit 26bc462e1c46dd751de15d3b477d59015a948476
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Apr 27 20:34:02 2022 +0800

    [feature-wip] (memory tracker) (step5) Fix track bthread, fix track 
vectorized query (#9145)
    
    1. fix track bthread
    - Bthread, a high performance M:N thread library used by brpc. In Doris, a 
brpc server response runs on one bthread, possibly on multiple pthreads. 
Currently, MemTracker consumption relies on pthread local variables (TLS).
    - This caused pthread TLS MemTracker confusion when switching pthread TLS 
MemTracker in brpc server response. So replacing pthread TLS with bthread TLS 
in the brpc server response saves the MemTracker.
    Ref: 
https://github.com/apache/incubator-brpc/blob/731730da85f6af5c25012b4c83ab5bb371320cf8/docs/en/server.md#bthread-local
    
    2. fix track vectorized query
    - Added track mmap. Currently, mmap allocates memory in many places of the 
vectorized execution engine.
    - Refactored ThreadContext to avoid dependency conflicts and make it easier 
to debug.
    - Fix some bugs.
---
 be/src/common/utils.h                         |   2 +
 be/src/exec/exchange_node.cpp                 |   2 +-
 be/src/exec/olap_scanner.cpp                  |   2 +-
 be/src/exec/tablet_sink.cpp                   |   3 +-
 be/src/exec/tablet_sink.h                     |   4 +-
 be/src/olap/byte_buffer.cpp                   |  11 +-
 be/src/olap/lru_cache.cpp                     |   3 +-
 be/src/olap/out_stream.cpp                    |   2 +-
 be/src/olap/out_stream.h                      |   1 -
 be/src/olap/rowset/column_writer.cpp          |   3 +-
 be/src/olap/rowset/column_writer.h            |   1 -
 be/src/olap/rowset/segment_v2/segment.cpp     |   5 +-
 be/src/runtime/bufferpool/system_allocator.cc |   6 +
 be/src/runtime/disk_io_mgr.cc                 |   8 +-
 be/src/runtime/fold_constant_executor.cpp     |   2 +-
 be/src/runtime/fragment_mgr.cpp               |   3 +-
 be/src/runtime/mem_pool.cpp                   |   2 +-
 be/src/runtime/mem_tracker.cpp                |  20 ++-
 be/src/runtime/mem_tracker.h                  |   4 +
 be/src/runtime/memory/chunk_allocator.cpp     |  16 +--
 be/src/runtime/memory/system_allocator.cpp    |   5 +
 be/src/runtime/plan_fragment_executor.cpp     |   8 +-
 be/src/runtime/row_batch.cpp                  |   4 +-
 be/src/runtime/runtime_filter_mgr.cpp         |   4 +-
 be/src/runtime/sorted_run_merger.cc           |   2 +-
 be/src/runtime/tcmalloc_hook.h                |   4 +-
 be/src/runtime/thread_context.cpp             | 127 ++++++++++++++++++
 be/src/runtime/thread_context.h               | 181 +++++++++++---------------
 be/src/runtime/thread_mem_tracker_mgr.cpp     |  18 +--
 be/src/runtime/thread_mem_tracker_mgr.h       | 158 ++++++++++++++--------
 be/src/service/brpc.h                         |  29 +----
 be/src/service/{brpc.h => brpc_conflict.h}    |  18 +--
 be/src/service/internal_service.cpp           |  30 +++++
 be/src/util/bit_util.h                        |   1 -
 be/src/util/doris_metrics.cpp                 |   2 +
 be/src/util/doris_metrics.h                   |   2 +
 be/src/util/file_utils.cpp                    |   3 +
 be/src/vec/common/allocator.h                 |  16 ++-
 be/src/vec/exec/vexchange_node.cpp            |   2 +-
 39 files changed, 452 insertions(+), 262 deletions(-)

diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index c0ca459423..91eb4427c9 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -21,7 +21,9 @@
 
 namespace doris {
 
+#ifndef ARRAY_SIZE
 #define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0]))
+#endif
 
 struct AuthInfo {
     std::string user;
diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 53473f90a2..f1c221b65c 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -80,6 +80,7 @@ Status ExchangeNode::prepare(RuntimeState* state) {
 Status ExchangeNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     if (_is_merging) {
         RETURN_IF_ERROR(_sort_exec_exprs.open(state));
@@ -215,7 +216,6 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, 
RowBatch* output_batc
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next."));
 
-    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
     while ((_num_rows_skipped < _offset)) {
         _num_rows_skipped += output_batch->num_rows();
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 53f9af57a8..68a522236c 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -52,7 +52,7 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, 
OlapScanNode* parent, bool
           _version(-1),
           _mem_tracker(MemTracker::create_tracker(
                   tracker->limit(),
-                  tracker->label() + ":OlapScanner:" + 
thread_local_ctx.get()->thread_id_str(),
+                  tracker->label() + ":OlapScanner:" + 
tls_ctx()->thread_id_str(),
                   tracker)) {}
 
 Status OlapScanner::prepare(
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index e8b35c16d1..9928b7e26f 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -50,7 +50,7 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* 
index_channel, int
         _tuple_data_buffer_ptr = &_tuple_data_buffer;
     }
     _node_channel_tracker =
-            MemTracker::create_tracker(-1, "NodeChannel" + 
thread_local_ctx.get()->thread_id_str());
+            MemTracker::create_tracker(-1, "NodeChannel" + 
tls_ctx()->thread_id_str());
 }
 
 NodeChannel::~NodeChannel() noexcept {
@@ -654,6 +654,7 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
 
 void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, 
const std::string& err,
                                   int64_t tablet_id) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
     const auto& it = _tablets_by_channel.find(node_id);
     if (it == _tablets_by_channel.end()) {
         return;
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 2fa75587e7..1a902e834b 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -33,6 +33,7 @@
 #include "exec/tablet_info.h"
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
+#include "runtime/thread_context.h"
 #include "util/bitmap.h"
 #include "util/countdown_latch.h"
 #include "util/ref_count_closure.h"
@@ -325,6 +326,7 @@ public:
 
     void for_each_node_channel(
             const std::function<void(const std::shared_ptr<NodeChannel>&)>& 
func) {
+        SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
         for (auto& it : _node_channels) {
             func(it.second);
         }
@@ -365,7 +367,7 @@ private:
     std::unordered_map<int64_t, std::string> _failed_channels_msgs;
     Status _intolerable_failure_status = Status::OK();
 
-    std::shared_ptr<MemTracker> _index_channel_tracker; // TODO(zxy) use after
+    std::shared_ptr<MemTracker> _index_channel_tracker;
 };
 
 // Write data to Olap Table.
diff --git a/be/src/olap/byte_buffer.cpp b/be/src/olap/byte_buffer.cpp
index a3099e4e25..822b18cc50 100644
--- a/be/src/olap/byte_buffer.cpp
+++ b/be/src/olap/byte_buffer.cpp
@@ -20,6 +20,7 @@
 #include <sys/mman.h>
 
 #include "olap/utils.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -42,6 +43,8 @@ void StorageByteBuffer::BufDeleter::operator()(char* p) {
         if (0 != munmap(p, _mmap_length)) {
             LOG(FATAL) << "fail to munmap: mem=" << p << ", len=" << 
_mmap_length
                        << ", errno=" << Errno::no() << ", errno_str=" << 
Errno::str();
+        } else {
+            RELEASE_THREAD_LOCAL_MEM_TRACKER(_mmap_length);
         }
     } else {
         delete[] p;
@@ -93,10 +96,12 @@ StorageByteBuffer* 
StorageByteBuffer::reference_buffer(StorageByteBuffer* refere
 
 StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int 
prot, int flags,
                                            int fd, uint64_t offset) {
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(length);
     char* memory = (char*)::mmap(start, length, prot, flags, fd, offset);
 
     if (MAP_FAILED == memory) {
         OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", 
Errno::no(), Errno::str());
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
 
@@ -108,6 +113,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(void* start, 
uint64_t length, int pro
     if (nullptr == buf) {
         deleter(memory);
         OLAP_LOG_WARNING("fail to allocate StorageByteBuffer.");
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
 
@@ -128,10 +134,12 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* 
handler, uint64_t offset
 
     size_t length = handler->length();
     int fd = handler->fd();
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(length);
     char* memory = (char*)::mmap(nullptr, length, prot, flags, fd, offset);
 
     if (MAP_FAILED == memory) {
         OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", 
Errno::no(), Errno::str());
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
 
@@ -143,6 +151,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* 
handler, uint64_t offset
     if (nullptr == buf) {
         deleter(memory);
         OLAP_LOG_WARNING("fail to allocate StorageByteBuffer.");
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
 
@@ -173,7 +182,7 @@ Status StorageByteBuffer::put(uint64_t index, char src) {
 }
 
 Status StorageByteBuffer::put(const char* src, uint64_t src_size, uint64_t 
offset,
-                                  uint64_t length) {
+                              uint64_t length) {
     //没有足够的空间可以写
     if (length > remaining()) {
         return Status::OLAPInternalError(OLAP_ERR_BUFFER_OVERFLOW);
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 0f5ebaa395..1a045b24a3 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -475,8 +475,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, 
void* value, size_t
                                        CachePriority priority) {
     // The memory of the parameter value should be recorded in the tls mem 
tracker,
     // transfer the memory ownership of the value to 
ShardedLRUCache::_mem_tracker.
-    
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
-                                                                               
 charge);
+    
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
 charge);
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const uint32_t hash = _hash_slice(key);
     return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, 
priority);
diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp
index 475d0ca98f..b5d19b841a 100644
--- a/be/src/olap/out_stream.cpp
+++ b/be/src/olap/out_stream.cpp
@@ -25,7 +25,7 @@
 namespace doris {
 
 OutStreamFactory::OutStreamFactory(CompressKind compress_kind, uint32_t 
stream_buffer_size)
-        : _compress_kind(compress_kind), 
_stream_buffer_size(stream_buffer_size) {
+        : _stream_buffer_size(stream_buffer_size) {
     switch (compress_kind) {
     case COMPRESS_NONE:
         _compressor = nullptr;
diff --git a/be/src/olap/out_stream.h b/be/src/olap/out_stream.h
index a67a6c5204..b1954ce6cf 100644
--- a/be/src/olap/out_stream.h
+++ b/be/src/olap/out_stream.h
@@ -141,7 +141,6 @@ public:
 
 private:
     std::map<StreamName, OutStream*> _streams; // All created streams
-    CompressKind _compress_kind;
     Compressor _compressor;
     uint32_t _stream_buffer_size;
 
diff --git a/be/src/olap/rowset/column_writer.cpp 
b/be/src/olap/rowset/column_writer.cpp
index d750db1ec1..6128ab4f6f 100644
--- a/be/src/olap/rowset/column_writer.cpp
+++ b/be/src/olap/rowset/column_writer.cpp
@@ -488,8 +488,7 @@ void ByteColumnWriter::record_position() {
 
 IntegerColumnWriter::IntegerColumnWriter(uint32_t column_id, uint32_t 
unique_column_id,
                                          OutStreamFactory* stream_factory, 
bool is_singed)
-        : _column_id(column_id),
-          _unique_column_id(unique_column_id),
+        : _unique_column_id(unique_column_id),
           _stream_factory(stream_factory),
           _writer(nullptr),
           _is_signed(is_singed) {}
diff --git a/be/src/olap/rowset/column_writer.h 
b/be/src/olap/rowset/column_writer.h
index 9fe2d60f2a..a40a3780ba 100644
--- a/be/src/olap/rowset/column_writer.h
+++ b/be/src/olap/rowset/column_writer.h
@@ -179,7 +179,6 @@ public:
     Status flush() { return _writer->flush(); }
 
 private:
-    uint32_t _column_id;
     uint32_t _unique_column_id;
     OutStreamFactory* _stream_factory;
     RunLengthIntegerWriter* _writer;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index b06083e996..7970d0da5b 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -51,10 +51,9 @@ Segment::Segment(const FilePathDesc& path_desc, uint32_t 
segment_id,
                  const TabletSchema* tablet_schema)
         : _path_desc(path_desc), _segment_id(segment_id), 
_tablet_schema(tablet_schema) {
 #ifndef BE_TEST
-    _mem_tracker = MemTracker::create_virtual_tracker(
-            -1, "Segment", StorageEngine::instance()->tablet_mem_tracker());
+    _mem_tracker = StorageEngine::instance()->tablet_mem_tracker();
 #else
-    _mem_tracker = MemTracker::create_virtual_tracker(-1, "Segment");
+    _mem_tracker = MemTracker::get_process_tracker();
 #endif
 }
 
diff --git a/be/src/runtime/bufferpool/system_allocator.cc 
b/be/src/runtime/bufferpool/system_allocator.cc
index a2dfc394b1..3fa69e981e 100644
--- a/be/src/runtime/bufferpool/system_allocator.cc
+++ b/be/src/runtime/bufferpool/system_allocator.cc
@@ -22,6 +22,7 @@
 
 #include "common/config.h"
 #include "gutil/strings/substitute.h"
+#include "runtime/thread_context.h"
 #include "util/bit_util.h"
 #include "util/error_util.h"
 
@@ -75,9 +76,11 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, 
uint8_t** buffer_mem) {
         // Map an extra huge page so we can fix up the alignment if needed.
         map_len += HUGE_PAGE_SIZE;
     }
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(map_len);
     uint8_t* mem = reinterpret_cast<uint8_t*>(
             mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | 
MAP_PRIVATE, -1, 0));
     if (mem == MAP_FAILED) {
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len);
         return Status::BufferAllocFailed("mmap failed");
     }
 
@@ -89,10 +92,12 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, 
uint8_t** buffer_mem) {
         if (misalignment != 0) {
             uintptr_t fixup = HUGE_PAGE_SIZE - misalignment;
             munmap(mem, fixup);
+            RELEASE_THREAD_LOCAL_MEM_TRACKER(fixup);
             mem += fixup;
             map_len -= fixup;
         }
         munmap(mem + len, map_len - len);
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len - len);
         DCHECK_EQ(reinterpret_cast<uintptr_t>(mem) % HUGE_PAGE_SIZE, 0) << mem;
         // Mark the buffer as a candidate for promotion to huge pages. The 
Linux Transparent
         // Huge Pages implementation will try to back the memory with a huge 
page if it is
@@ -142,6 +147,7 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, 
uint8_t** buffer_mem) {
 void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) {
     if (config::mmap_buffers) {
         int rc = munmap(buffer.data(), buffer.len());
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(buffer.len());
         DCHECK_EQ(rc, 0) << "Unexpected munmap() error: " << errno;
     } else {
         bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && 
config::madvise_huge_pages;
diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc
index ebedb58057..1f65851809 100644
--- a/be/src/runtime/disk_io_mgr.cc
+++ b/be/src/runtime/disk_io_mgr.cc
@@ -220,7 +220,7 @@ void DiskIoMgr::BufferDescriptor::reset(RequestContext* 
reader, ScanRange* range
     _eosr = false;
     _status = Status::OK();
     // Consume in the tls mem tracker when the buffer is allocated.
-    _buffer_mem_tracker = 
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get();
+    _buffer_mem_tracker = 
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get();
 }
 
 void DiskIoMgr::BufferDescriptor::return_buffer() {
@@ -739,7 +739,7 @@ char* DiskIoMgr::get_free_buffer(int64_t* buffer_size) {
         buffer = new char[*buffer_size];
     } else {
         // This means the buffer's memory ownership is transferred from 
DiskIoMgr to tls tracker.
-        
_mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
 *buffer_size);
+        
_mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(),
 *buffer_size);
         buffer = _free_buffers[idx].front();
         _free_buffers[idx].pop_front();
     }
@@ -767,7 +767,7 @@ void DiskIoMgr::gc_io_buffers(int64_t bytes_to_free) {
     // The deleted buffer is released in the tls mem tracker, the deleted 
buffer belongs to DiskIoMgr,
     // so the freed memory should be recorded in the DiskIoMgr mem tracker. So 
if the tls mem tracker
     // and the DiskIoMgr tracker are different, transfer memory ownership.
-    
_mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
 bytes_freed);
+    
_mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(),
 bytes_freed);
 }
 
 void DiskIoMgr::return_free_buffer(BufferDescriptor* desc) {
@@ -793,7 +793,7 @@ void DiskIoMgr::return_free_buffer(char* buffer, int64_t 
buffer_size, MemTracker
         // The deleted buffer is released in the tls mem tracker. When the 
buffer was allocated,
         // it was consumed in BufferDescriptor->buffer_mem_tracker, so if the 
tls mem tracker and
         // the tracker in the parameters are different, transfer memory 
ownership.
-        
tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
 buffer_size);
+        
tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), 
buffer_size);
     }
 }
 
diff --git a/be/src/runtime/fold_constant_executor.cpp 
b/be/src/runtime/fold_constant_executor.cpp
index 4a51b91932..5cdfcb084e 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -44,6 +44,7 @@ TUniqueId FoldConstantExecutor::_dummy_id;
 
 Status FoldConstantExecutor::fold_constant_expr(
         const TFoldConstantParams& params, PConstantExprResult* response) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const auto& expr_map = params.expr_map;
     auto expr_result_map = response->mutable_expr_result_map();
 
@@ -53,7 +54,6 @@ Status FoldConstantExecutor::fold_constant_expr(
     if (UNLIKELY(!status.ok())) {
         return status;
     }
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     for (const auto& m : expr_map) {
         PExprResultMap pexpr_result_map;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 34ac356909..d12bc85f26 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -467,8 +467,7 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
             .instance_id(exec_state->fragment_instance_id())
             .tag("pthread_id", std::to_string((uintptr_t)pthread_self()));
 #ifndef BE_TEST
-    
SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state()->query_type(),
-                              print_id(exec_state->query_id()), 
exec_state->fragment_instance_id(),
+    SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state(),
                               
exec_state->executor()->runtime_state()->instance_mem_tracker());
 #endif
     exec_state->execute();
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index a2b9bf3424..adc86f0e67 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -65,7 +65,7 @@ MemPool::MemPool()
           total_allocated_bytes_(0),
           total_reserved_bytes_(0),
           peak_allocated_bytes_(0),
-          
_mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get())
 {}
+          
_mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get()) {}
 
 MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), 
allocated_bytes(0) {
     DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size);
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index 86f7c0370b..49f8862b0b 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -60,6 +60,19 @@ MemTracker* MemTracker::get_raw_process_tracker() {
     return raw_process_tracker;
 }
 
+// Track memory for all brpc server responses.
+static std::shared_ptr<MemTracker> brpc_server_tracker;
+static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT;
+
+void MemTracker::create_brpc_server_tracker() {
+    brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", 
get_process_tracker(), MemTrackerLevel::OVERVIEW);
+}
+
+std::shared_ptr<MemTracker> MemTracker::get_brpc_server_tracker() {
+    GoogleOnceInit(&brpc_server_tracker_once, 
&MemTracker::create_brpc_server_tracker);
+    return brpc_server_tracker;
+}
+
 void 
MemTracker::list_process_trackers(std::vector<std::shared_ptr<MemTracker>>* 
trackers) {
     trackers->clear();
     std::deque<std::shared_ptr<MemTracker>> to_process;
@@ -88,7 +101,8 @@ std::shared_ptr<MemTracker> 
MemTracker::create_tracker(int64_t byte_limit, const
                                                        const 
std::shared_ptr<MemTracker>& parent,
                                                        MemTrackerLevel level,
                                                        RuntimeProfile* 
profile) {
-    std::shared_ptr<MemTracker> reset_parent = parent ? parent : 
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker();
+    std::shared_ptr<MemTracker> reset_parent =
+            parent ? parent : 
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
     DCHECK(reset_parent);
 
     std::shared_ptr<MemTracker> tracker(
@@ -102,7 +116,8 @@ std::shared_ptr<MemTracker> 
MemTracker::create_tracker(int64_t byte_limit, const
 std::shared_ptr<MemTracker> MemTracker::create_virtual_tracker(
         int64_t byte_limit, const std::string& label, const 
std::shared_ptr<MemTracker>& parent,
         MemTrackerLevel level) {
-   std::shared_ptr<MemTracker> reset_parent = parent ? parent : 
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker();
+    std::shared_ptr<MemTracker> reset_parent =
+            parent ? parent : 
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
     DCHECK(reset_parent);
 
     std::shared_ptr<MemTracker> tracker(
@@ -121,6 +136,7 @@ MemTracker::MemTracker(int64_t byte_limit, const 
std::string& label,
                        RuntimeProfile* profile)
         : _limit(byte_limit),
           _label(label),
+          // Not 100% sure the id is unique. This is generated because it is 
faster than converting to int after hash.
           _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()),
           _parent(parent),
           _level(level) {
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 3d0e3f7271..74a7b4bef6 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -97,6 +97,8 @@ public:
     // Gets a shared_ptr to the "process" tracker, creating it if necessary.
     static std::shared_ptr<MemTracker> get_process_tracker();
     static MemTracker* get_raw_process_tracker();
+    // Gets a shared_ptr to the "brpc server" tracker, creating it if 
necessary.
+    static std::shared_ptr<MemTracker> get_brpc_server_tracker();
 
     Status check_sys_mem_info(int64_t bytes) {
         if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= 
MemInfo::mem_limit()) {
@@ -464,6 +466,8 @@ private:
 
     // Creates the process tracker.
     static void create_process_tracker();
+    // Creates the brpc server tracker.
+    static void create_brpc_server_tracker();
 
     // Limit on memory consumption, in bytes. If limit_ == -1, there is no 
consumption limit.
     int64_t _limit;
diff --git a/be/src/runtime/memory/chunk_allocator.cpp 
b/be/src/runtime/memory/chunk_allocator.cpp
index 2d2a9a5c01..7f8259c034 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -17,6 +17,8 @@
 
 #include "runtime/memory/chunk_allocator.h"
 
+#include <sanitizer/asan_interface.h>
+
 #include <list>
 #include <mutex>
 
@@ -134,8 +136,7 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit)
 
 Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* 
tracker, bool check_limits) {
     MemTracker* reset_tracker =
-            tracker ? tracker
-                    : 
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get();
+            tracker ? tracker : 
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get();
     // In advance, transfer the memory ownership of allocate from 
ChunkAllocator::tracker to the parameter tracker.
     // Next, if the allocate is successful, it will exit normally;
     // if the allocate fails, return this part of the memory to the parameter 
tracker.
@@ -178,7 +179,7 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, 
MemTracker* tracker,
         chunk->data = SystemAllocator::allocate(size);
         // The allocated chunk is consumed in the tls mem tracker, we want to 
consume in the ChunkAllocator tracker,
         // transfer memory ownership. TODO(zxy) replace with switch tls tracker
-        
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
 size);
+        
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
 size);
     }
     chunk_pool_system_alloc_count->increment(1);
     chunk_pool_system_alloc_cost_ns->increment(cost_ns);
@@ -208,9 +209,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* 
tracker) {
                 // it was consumed in the parameter tracker, so if the tls mem 
tracker and the parameter
                 // tracker are different, transfer memory ownership.
                 if (tracker)
-                    tracker->transfer_to(
-                            
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
-                            chunk.size);
+                    
tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(),
+                                         chunk.size);
             }
             chunk_pool_system_free_count->increment(1);
             chunk_pool_system_free_cost_ns->increment(cost_ns);
@@ -223,8 +223,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* 
tracker) {
     if (tracker) {
         tracker->transfer_to(_mem_tracker.get(), chunk.size);
     } else {
-        
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(
-                _mem_tracker.get(), chunk.size);
+        
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
+                                                                       
chunk.size);
     }
     _arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size);
 }
diff --git a/be/src/runtime/memory/system_allocator.cpp 
b/be/src/runtime/memory/system_allocator.cpp
index 374cec5557..6ed5906f00 100644
--- a/be/src/runtime/memory/system_allocator.cpp
+++ b/be/src/runtime/memory/system_allocator.cpp
@@ -23,6 +23,7 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -43,6 +44,8 @@ void SystemAllocator::free(uint8_t* ptr, size_t length) {
             char buf[64];
             LOG(ERROR) << "fail to free memory via munmap, errno=" << errno
                        << ", errmsg=" << strerror_r(errno, buf, 64);
+        } else {
+            RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         }
     } else {
         ::free(ptr);
@@ -63,12 +66,14 @@ uint8_t* SystemAllocator::allocate_via_malloc(size_t 
length) {
 }
 
 uint8_t* SystemAllocator::allocate_via_mmap(size_t length) {
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(length);
     auto ptr = (uint8_t*)mmap(nullptr, length, PROT_READ | PROT_WRITE, 
MAP_ANONYMOUS | MAP_PRIVATE,
                               -1, 0);
     if (ptr == MAP_FAILED) {
         char buf[64];
         LOG(ERROR) << "fail to allocate memory via mmap, errno=" << errno
                    << ", errmsg=" << strerror_r(errno, buf, 64);
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
     return ptr;
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 6cdbce39d2..2745215bfb 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -89,9 +89,7 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
     _runtime_state->set_query_fragments_ctx(fragments_ctx);
 
     RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
-    SCOPED_ATTACH_TASK_THREAD(_runtime_state->query_type(), 
print_id(_runtime_state->query_id()),
-                              _runtime_state->fragment_instance_id(),
-                              _runtime_state->instance_mem_tracker());
+    SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), 
_runtime_state->instance_mem_tracker());
     _runtime_state->set_be_number(request.backend_num);
     if (request.__isset.backend_id) {
         _runtime_state->set_backend_id(request.backend_id);
@@ -442,9 +440,7 @@ void PlanFragmentExecutor::_collect_node_statistics() {
 }
 
 void PlanFragmentExecutor::report_profile() {
-    SCOPED_ATTACH_TASK_THREAD(_runtime_state->query_type(), 
print_id(_runtime_state->query_id()),
-                              _runtime_state->fragment_instance_id(),
-                              _runtime_state->instance_mem_tracker());
+    SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), 
_runtime_state->instance_mem_tracker());
     VLOG_FILE << "report_profile(): instance_id=" << 
_runtime_state->fragment_instance_id();
     DCHECK(_report_status_cb);
 
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 614f7888f1..06306df512 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -44,7 +44,7 @@ const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024;
 const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
 
 RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity)
-        : 
_mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()),
+        : _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()),
           _has_in_flight_row(false),
           _num_rows(0),
           _num_uncommitted_rows(0),
@@ -70,7 +70,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int 
capacity)
 // to allocated string data in special mempool
 // (change via python script that runs over Data_types.cc)
 RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch)
-        : 
_mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()),
+        : _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()),
           _has_in_flight_row(false),
           _num_rows(input_batch.num_rows()),
           _num_uncommitted_rows(0),
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 4eade51b46..008e9d2e58 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -156,8 +156,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     // LOG(INFO) << "entity filter id:" << filter_id;
     cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, 
query_options, _fragment_instance_id);
     cntVal->tracker = MemTracker::create_tracker(
-            -1, 
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->label() + 
":FilterID:" + filter_id,
-            thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker());
+            -1, tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->label() + 
":FilterID:" + filter_id,
+            tls_ctx()->_thread_mem_tracker_mgr->mem_tracker());
     _filter_map.emplace(filter_id, cntVal);
     return Status::OK();
 }
diff --git a/be/src/runtime/sorted_run_merger.cc 
b/be/src/runtime/sorted_run_merger.cc
index 11042a4d9d..5bf518178d 100644
--- a/be/src/runtime/sorted_run_merger.cc
+++ b/be/src/runtime/sorted_run_merger.cc
@@ -129,7 +129,7 @@ public:
         *done = false;
         _pull_task_thread =
         
std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task,
-                    this, 
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker());
+                    this, tls_ctx()->_thread_mem_tracker_mgr->mem_tracker());
 
         RETURN_IF_ERROR(next(nullptr, done));
         return Status::OK();
diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h
index 9ba55fde8e..548b886299 100644
--- a/be/src/runtime/tcmalloc_hook.h
+++ b/be/src/runtime/tcmalloc_hook.h
@@ -36,11 +36,11 @@
 //  destructor to control the behavior of consume can lead to unexpected 
behavior,
 //  like this: if (LIKELY(doris::start_thread_mem_tracker)) {
 void new_hook(const void* ptr, size_t size) {
-    doris::thread_local_ctx.get()->consume_mem(tc_nallocx(size, 0));
+    doris::tls_ctx()->consume_mem(tc_nallocx(size, 0));
 }
 
 void delete_hook(const void* ptr) {
-    
doris::thread_local_ctx.get()->release_mem(tc_malloc_size(const_cast<void*>(ptr)));
+    doris::tls_ctx()->release_mem(tc_malloc_size(const_cast<void*>(ptr)));
 }
 
 void init_hook() {
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index 871cd1ebf0..0b71101d97 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -17,6 +17,9 @@
 
 #include "runtime/thread_context.h"
 
+#include "runtime/runtime_state.h"
+#include "util/doris_metrics.h"
+
 namespace doris {
 
 DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, thread_local_ctx);
@@ -29,4 +32,128 @@ ThreadContext* ThreadContextPtr::get() {
     return thread_local_ctx;
 }
 
+AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, const 
std::string& task_id,
+                                   const TUniqueId& fragment_instance_id,
+                                   const std::shared_ptr<doris::MemTracker>& 
mem_tracker) {
+    DCHECK(task_id != "");
+    tls_ctx()->attach(type, task_id, fragment_instance_id, mem_tracker);
+}
+
+AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type,
+                                   const std::shared_ptr<doris::MemTracker>& 
mem_tracker) {
+#ifndef BE_TEST
+    DCHECK(mem_tracker);
+#endif
+    tls_ctx()->attach(type, "", TUniqueId(), mem_tracker);
+}
+
+AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
+                                   const std::shared_ptr<doris::MemTracker>& 
mem_tracker) {
+#ifndef BE_TEST
+    DCHECK(mem_tracker);
+#endif
+    tls_ctx()->attach(query_to_task_type(query_type), "", TUniqueId(), 
mem_tracker);
+}
+
+AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
+                                   const std::shared_ptr<doris::MemTracker>& 
mem_tracker,
+                                   const std::string& task_id,
+                                   const TUniqueId& fragment_instance_id) {
+#ifndef BE_TEST
+    DCHECK(task_id != "");
+    DCHECK(fragment_instance_id != TUniqueId());
+    DCHECK(mem_tracker);
+#endif
+    tls_ctx()->attach(query_to_task_type(query_type), task_id, 
fragment_instance_id, mem_tracker);
+}
+
+AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state,
+                                   const std::shared_ptr<doris::MemTracker>& 
mem_tracker) {
+#ifndef BE_TEST
+    DCHECK(print_id(runtime_state->query_id()) != "");
+    DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
+    DCHECK(mem_tracker);
+#endif
+    tls_ctx()->attach(query_to_task_type(runtime_state->query_type()),
+                      print_id(runtime_state->query_id()), 
runtime_state->fragment_instance_id(),
+                      mem_tracker);
+}
+
+AttachTaskThread::~AttachTaskThread() {
+    tls_ctx()->detach();
+    DorisMetrics::instance()->attach_task_thread_count->increment(1);
+}
+
+template <bool Existed>
+SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker(
+        const std::shared_ptr<doris::MemTracker>& mem_tracker, bool in_task) {
+    if (config::memory_verbose_track) {
+#ifndef BE_TEST
+        DCHECK(mem_tracker);
+        // The thread tracker must be switched after the attach task, 
otherwise switching
+        // in the main thread will cause the cached tracker not be cleaned up 
in time.
+        DCHECK(in_task == false || 
tls_ctx()->_thread_mem_tracker_mgr->is_attach_task());
+        if (Existed) {
+            _old_tracker_id = 
tls_ctx()->_thread_mem_tracker_mgr->update_tracker<true>(mem_tracker);
+        } else {
+            _old_tracker_id =
+                    
tls_ctx()->_thread_mem_tracker_mgr->update_tracker<false>(mem_tracker);
+        }
+#endif
+#ifndef NDEBUG
+        tls_ctx()->_thread_mem_tracker_mgr->switch_count += 1;
+#endif
+    }
+}
+
+template <bool Existed>
+SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() {
+    if (config::memory_verbose_track) {
+#ifndef NDEBUG
+        tls_ctx()->_thread_mem_tracker_mgr->switch_count -= 1;
+        
DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
+#endif
+#ifndef BE_TEST
+        tls_ctx()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
+#endif
+    }
+}
+
+SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(
+        const std::string& action_type, bool cancel_work, ERRCALLBACK 
err_call_back_func) {
+    DCHECK(action_type != std::string());
+    _old_tracker_cb = 
tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(
+            action_type, cancel_work, err_call_back_func);
+}
+
+SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() {
+    tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb);
+    
DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1);
+}
+
+SwitchBthread::SwitchBthread() {
+    tls = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+    // First call to bthread_getspecific (and before any bthread_setspecific) 
returns NULL
+    if (tls == nullptr) {
+        // Create thread-local data on demand.
+        tls = new ThreadContext;
+        tls->_thread_mem_tracker_mgr->init_bthread();
+        // set the data so that next time bthread_getspecific in the thread 
returns the data.
+        CHECK_EQ(0, bthread_setspecific(btls_key, tls));
+    } else {
+        tls->_thread_mem_tracker_mgr->init_bthread();
+    }
+}
+
+SwitchBthread::~SwitchBthread() {
+    DCHECK(tls != nullptr);
+    tls->_thread_mem_tracker_mgr->clear_untracked_mems();
+#ifndef NDEBUG
+        DorisMetrics::instance()->switch_bthread_count->increment(1);
+#endif
+}
+
+template class SwitchThreadMemTracker<true>;
+template class SwitchThreadMemTracker<false>;
+
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 4d9d60078c..8ab72be634 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -17,15 +17,17 @@
 
 #pragma once
 
+#include <service/brpc_conflict.h>
+// After brpc_conflict.h
+#include <bthread/bthread.h>
+
 #include <string>
 #include <thread>
 
 #include "common/logging.h"
-#include "gen_cpp/Types_types.h"
-#include "runtime/runtime_state.h"
+#include "gen_cpp/PaloInternalService_types.h" // for TQueryType
 #include "runtime/thread_mem_tracker_mgr.h"
 #include "runtime/threadlocal.h"
-#include "util/doris_metrics.h"
 
 // Attach to task when thread starts
 #define SCOPED_ATTACH_TASK_THREAD(type, ...) \
@@ -34,34 +36,50 @@
 // may be different from the order of execution of instructions, which will 
cause the position of
 // the memory track to be unexpected.
 #define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \
-    auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(true)
+    auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(true)
 #define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \
-    auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(false)
+    auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(false)
 // Switch thread mem tracker during task execution.
 // After the non-query thread switches the mem tracker, if the thread will not 
switch the mem
 // tracker again in the short term, can consider manually clear_untracked_mems.
 // The query thread will automatically clear_untracked_mems when detach_task.
 #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = 
SwitchThreadMemTracker<false>(mem_tracker, false)
+    auto VARNAME_LINENUM(switch_tracker) = 
doris::SwitchThreadMemTracker<false>(mem_tracker, false)
 #define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = 
SwitchThreadMemTracker<false>(mem_tracker, true);
+    auto VARNAME_LINENUM(switch_tracker) = 
doris::SwitchThreadMemTracker<false>(mem_tracker, true);
+#define SCOPED_SWITCH_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \
+    auto VARNAME_LINENUM(switch_tracker) = 
doris::SwitchThreadMemTracker<true>(mem_tracker, false)
 #define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = 
SwitchThreadMemTracker<true>(mem_tracker, true)
+    auto VARNAME_LINENUM(switch_tracker) = 
doris::SwitchThreadMemTracker<true>(mem_tracker, true)
 // After the non-query thread switches the mem tracker, if the thread will not 
switch the mem
 // tracker again in the short term, can consider manually clear_untracked_mems.
 // The query thread will automatically clear_untracked_mems when detach_task.
 #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = 
SwitchThreadMemTrackerEndClear(mem_tracker)
+    auto VARNAME_LINENUM(switch_tracker) = 
doris::SwitchThreadMemTrackerEndClear(mem_tracker)
 #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \
     auto VARNAME_LINENUM(witch_tracker_cb) =                            \
-            SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__)
+            doris::SwitchThreadMemTrackerErrCallBack(action_type, 
##__VA_ARGS__)
+#define SCOPED_SWITCH_BTHREAD() auto VARNAME_LINENUM(switch_bthread) = 
SwitchBthread()
+// Before switching the same tracker multiple times, add tracker as early as 
possible,
+// and then call `SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER` to 
reduce one map find.
+// For example, in the exec_node open phase `add tracker`, it is no longer 
necessary to determine
+// whether the tracker exists in TLS when switching the tracker in the 
exec_node get_next phase.
+// TODO(zxy): Duplicate add tracker is currently prohibited, because it will,
+// 1. waste time 2. `_untracked_mems[mem_tracker->id()] = 0` will cause the 
memory track to be lost.
+// Some places may have to repeat the add tracker. optimize after.
 #define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
-    thread_local_ctx.get()->_thread_mem_tracker_mgr->add_tracker(mem_tracker)
+    doris::tls_ctx()->_thread_mem_tracker_mgr->add_tracker(mem_tracker)
+#define CONSUME_THREAD_LOCAL_MEM_TRACKER(size) \
+    doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(size)
+#define RELEASE_THREAD_LOCAL_MEM_TRACKER(size) \
+    doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(-size)
 
 namespace doris {
 
 class TUniqueId;
 
+extern bthread_key_t btls_key;
+
 // The thread context saves some info about a working thread.
 // 2 requried info:
 //   1. thread_id:   Current thread id, Auto generated.
@@ -80,19 +98,20 @@ public:
         STORAGE = 4
         // to be added ...
     };
-    inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", 
"LOAD", "COMPACTION", "STORAGE"};
+    inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", 
"LOAD", "COMPACTION",
+                                                     "STORAGE"};
 
 public:
-    ThreadContext() : _thread_id(std::this_thread::get_id()), 
_type(TaskType::UNKNOWN) {
+    ThreadContext() : _type(TaskType::UNKNOWN) {
         _thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr());
-        std::stringstream ss;
-        ss << _thread_id;
-        _thread_id_str = ss.str();
+        _thread_mem_tracker_mgr->init();
+        start_thread_mem_tracker = true;
+        _thread_id = get_thread_id();
     }
 
     void attach(const TaskType& type, const std::string& task_id,
                 const TUniqueId& fragment_instance_id,
-                const std::shared_ptr<MemTracker>& mem_tracker) {
+                const std::shared_ptr<doris::MemTracker>& mem_tracker) {
         DCHECK(_type == TaskType::UNKNOWN && _task_id == "")
                 << ",old tracker label: " << mem_tracker->label()
                 << ",new tracker label: " << 
_thread_mem_tracker_mgr->mem_tracker()->label();
@@ -111,10 +130,15 @@ public:
     }
 
     const std::string& task_id() const { return _task_id; }
-    const std::thread::id& thread_id() const { return _thread_id; }
-    const std::string& thread_id_str() const { return _thread_id_str; }
+    const std::string& thread_id_str() const { return _thread_id; }
     const TUniqueId& fragment_instance_id() const { return 
_fragment_instance_id; }
 
+    std::string get_thread_id() {
+        std::stringstream ss;
+        ss << std::this_thread::get_id();
+        return ss.str();
+    }
+
     void consume_mem(int64_t size) {
         if (start_thread_mem_tracker) {
             _thread_mem_tracker_mgr->cache_consume(size);
@@ -136,8 +160,7 @@ public:
     std::unique_ptr<ThreadMemTrackerMgr> _thread_mem_tracker_mgr;
 
 private:
-    std::thread::id _thread_id;
-    std::string _thread_id_str;
+    std::string _thread_id;
     TaskType _type;
     std::string _task_id;
     TUniqueId _fragment_instance_id;
@@ -178,55 +201,33 @@ private:
 
 inline thread_local ThreadContextPtr thread_local_ctx;
 
+static ThreadContext* tls_ctx() {
+    ThreadContext* tls = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+    if (tls != nullptr) {
+        return tls;
+    } else {
+        return thread_local_ctx.get();
+    }
+}
+
 class AttachTaskThread {
 public:
     explicit AttachTaskThread(const ThreadContext::TaskType& type, const 
std::string& task_id,
                               const TUniqueId& fragment_instance_id = 
TUniqueId(),
-                              const std::shared_ptr<MemTracker>& mem_tracker = 
nullptr) {
-        DCHECK(task_id != "");
-        thread_local_ctx.get()->attach(type, task_id, fragment_instance_id, 
mem_tracker);
-    }
+                              const std::shared_ptr<doris::MemTracker>& 
mem_tracker = nullptr);
 
     explicit AttachTaskThread(const ThreadContext::TaskType& type,
-                              const std::shared_ptr<MemTracker>& mem_tracker) {
-#ifndef BE_TEST
-        DCHECK(mem_tracker);
-#endif
-        thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker);
-    }
+                              const std::shared_ptr<doris::MemTracker>& 
mem_tracker);
 
     explicit AttachTaskThread(const TQueryType::type& query_type,
-                              const std::shared_ptr<MemTracker>& mem_tracker) {
-#ifndef BE_TEST
-        DCHECK(mem_tracker);
-#endif
-        thread_local_ctx.get()->attach(query_to_task_type(query_type), "", 
TUniqueId(),
-                                       mem_tracker);
-    }
+                              const std::shared_ptr<doris::MemTracker>& 
mem_tracker);
 
-    explicit AttachTaskThread(const TQueryType::type& query_type, const 
std::string& task_id,
-                              const TUniqueId& fragment_instance_id,
-                              const std::shared_ptr<MemTracker>& mem_tracker) {
-#ifndef BE_TEST
-        DCHECK(task_id != "");
-        DCHECK(fragment_instance_id != TUniqueId());
-        DCHECK(mem_tracker);
-#endif
-        thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id,
-                                       fragment_instance_id, mem_tracker);
-    }
+    explicit AttachTaskThread(const TQueryType::type& query_type,
+                              const std::shared_ptr<doris::MemTracker>& 
mem_tracker,
+                              const std::string& task_id, const TUniqueId& 
fragment_instance_id);
 
     explicit AttachTaskThread(const RuntimeState* runtime_state,
-                              const std::shared_ptr<MemTracker>& mem_tracker) {
-#ifndef BE_TEST
-        DCHECK(print_id(runtime_state->query_id()) != "");
-        DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
-        DCHECK(mem_tracker);
-#endif
-        
thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()),
-                                       print_id(runtime_state->query_id()),
-                                       runtime_state->fragment_instance_id(), 
mem_tracker);
-    }
+                              const std::shared_ptr<doris::MemTracker>& 
mem_tracker);
 
     const ThreadContext::TaskType query_to_task_type(const TQueryType::type& 
query_type) {
         switch (query_type) {
@@ -240,10 +241,7 @@ public:
         }
     }
 
-    ~AttachTaskThread() {
-        thread_local_ctx.get()->detach();
-        DorisMetrics::instance()->attach_task_thread_count->increment(1);
-    }
+    ~AttachTaskThread();
 };
 
 class StopThreadMemTracker {
@@ -263,36 +261,10 @@ private:
 template <bool Existed>
 class SwitchThreadMemTracker {
 public:
-    explicit SwitchThreadMemTracker(const std::shared_ptr<MemTracker>& 
mem_tracker,
-                                    bool in_task = true) {
-        if (config::memory_verbose_track) {
-#ifndef BE_TEST
-            DCHECK(mem_tracker);
-            // The thread tracker must be switched after the attach task, 
otherwise switching
-            // in the main thread will cause the cached tracker not be cleaned 
up in time.
-            DCHECK(in_task == false ||
-                   
thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task());
-            if (Existed) {
-                _old_tracker_id =
-                        
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<true>(
-                                mem_tracker);
-            } else {
-                _old_tracker_id =
-                        
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<false>(
-                                mem_tracker);
-            }
-#endif
-        }
-    }
+    explicit SwitchThreadMemTracker(const std::shared_ptr<doris::MemTracker>& 
mem_tracker,
+                                    bool in_task = true);
 
-    ~SwitchThreadMemTracker() {
-        if (config::memory_verbose_track) {
-#ifndef BE_TEST
-            
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
-            
DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
-#endif
-        }
-    }
+    ~SwitchThreadMemTracker();
 
 protected:
     int64_t _old_tracker_id = 0;
@@ -300,11 +272,11 @@ protected:
 
 class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker<false> {
 public:
-    explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr<MemTracker>& 
mem_tracker)
+    explicit SwitchThreadMemTrackerEndClear(const 
std::shared_ptr<doris::MemTracker>& mem_tracker)
             : SwitchThreadMemTracker<false>(mem_tracker, false) {}
 
     ~SwitchThreadMemTrackerEndClear() {
-        
thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
+        tls_ctx()->_thread_mem_tracker_mgr->clear_untracked_mems();
     }
 };
 
@@ -312,19 +284,22 @@ class SwitchThreadMemTrackerErrCallBack {
 public:
     explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type,
                                                bool cancel_work = true,
-                                               ERRCALLBACK err_call_back_func 
= nullptr) {
-        DCHECK(action_type != std::string());
-        _old_tracker_cb = 
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(
-                action_type, cancel_work, err_call_back_func);
-    }
+                                               ERRCALLBACK err_call_back_func 
= nullptr);
 
-    ~SwitchThreadMemTrackerErrCallBack() {
-        
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb);
-        
DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1);
-    }
+    ~SwitchThreadMemTrackerErrCallBack();
 
 private:
     ConsumeErrCallBackInfo _old_tracker_cb;
 };
 
+class SwitchBthread {
+public:
+    explicit SwitchBthread();
+
+    ~SwitchBthread();
+
+private:
+    ThreadContext* tls;
+};
+
 } // namespace doris
diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/thread_mem_tracker_mgr.cpp
index 06fd521faf..e55a4620f0 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/thread_mem_tracker_mgr.cpp
@@ -17,6 +17,8 @@
 
 #include "runtime/thread_mem_tracker_mgr.h"
 
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/mem_tracker_task_pool.h"
 #include "service/backend_options.h"
 
@@ -25,6 +27,7 @@ namespace doris {
 void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const 
std::string& task_id,
                                       const TUniqueId& fragment_instance_id,
                                       const std::shared_ptr<MemTracker>& 
mem_tracker) {
+    DCHECK(switch_count == 0) << print_debug_string();
     _task_id = task_id;
     _fragment_instance_id = fragment_instance_id;
     _consume_err_cb.cancel_msg = cancel_msg;
@@ -44,26 +47,15 @@ void ThreadMemTrackerMgr::attach_task(const std::string& 
cancel_msg, const std::
 }
 
 void ThreadMemTrackerMgr::detach_task() {
+    DCHECK(switch_count == 0) << print_debug_string();
     _task_id = "";
     _fragment_instance_id = TUniqueId();
     _consume_err_cb.init();
     clear_untracked_mems();
-    _tracker_id = 0;
-    // The following memory changes for the two map operations of 
_untracked_mems and _mem_trackers
-    // will be re-recorded in _untracked_mem.
-    _untracked_mems.clear();
-    _untracked_mems[0] = 0;
-    _mem_trackers.clear();
-    _mem_trackers[0] = MemTracker::get_process_tracker();
-    _mem_tracker_labels.clear();
-    _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
+    init();
 }
 
 void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& 
cancel_details) {
-    _temp_task_mem_tracker =
-            
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(
-                    _task_id);
-    DCHECK(_temp_task_mem_tracker);
     if (_fragment_instance_id != TUniqueId()) {
         ExecEnv::GetInstance()->fragment_mgr()->cancel(
                 _fragment_instance_id, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h 
b/be/src/runtime/thread_mem_tracker_mgr.h
index 4ca2adba3e..404837a73a 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -20,8 +20,6 @@
 #include <fmt/format.h>
 #include <parallel_hashmap/phmap.h>
 
-#include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
 #include "runtime/mem_tracker.h"
 
 namespace doris {
@@ -61,33 +59,20 @@ inline thread_local bool start_thread_mem_tracker = false;
 // need to manually call cosume after stop_mem_tracker, and then 
start_mem_tracker.
 class ThreadMemTrackerMgr {
 public:
-    ThreadMemTrackerMgr() {
-        _mem_trackers[0] = MemTracker::get_process_tracker();
-        _untracked_mems[0] = 0;
-        _tracker_id = 0;
-        _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
-        start_thread_mem_tracker = true;
-    }
+    ThreadMemTrackerMgr() {}
+
     ~ThreadMemTrackerMgr() {
         clear_untracked_mems();
         start_thread_mem_tracker = false;
     }
 
-    void clear_untracked_mems() {
-        for (const auto& untracked_mem : _untracked_mems) {
-            if (untracked_mem.second != 0) {
-                DCHECK(_mem_trackers[untracked_mem.first])
-                        << ", label: " << 
_mem_tracker_labels[untracked_mem.first];
-                if (_mem_trackers[untracked_mem.first]) {
-                    
_mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
-                } else {
-                    
MemTracker::get_process_tracker()->consume(untracked_mem.second);
-                }
-            }
-        }
-        mem_tracker()->consume(_untracked_mem);
-        _untracked_mem = 0;
-    }
+    // After thread initialization, calling `init` again must call 
`clear_untracked_mems` first
+    // to avoid memory tracking loss.
+    void init();
+
+    void init_bthread();
+
+    void clear_untracked_mems();
 
     // After attach, the current thread TCMalloc Hook starts to 
consume/release task mem_tracker
     void attach_task(const std::string& cancel_msg, const std::string& task_id,
@@ -96,21 +81,18 @@ public:
 
     void detach_task();
 
-    // Must be fast enough!
-    // Thread update_tracker may be called very frequently, adding a memory 
copy will be slow.
+    // Must be fast enough! Thread update_tracker may be called very 
frequently.
+    // So for performance, add tracker as early as possible, and then call 
update_tracker<Existed>.
     template <bool Existed>
     int64_t update_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
     void update_tracker_id(int64_t tracker_id);
 
-    void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
-        _mem_trackers[mem_tracker->id()] = mem_tracker;
-        DCHECK(_mem_trackers[mem_tracker->id()]);
-        _untracked_mems[mem_tracker->id()] = 0;
-        _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
-    }
+    // Before switching the same tracker multiple times, add tracker as early 
as possible,
+    // update_tracker<true> can reduce one map find.
+    void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
 
-    ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg,
-                                                        bool cancel_task, 
ERRCALLBACK cb_func) {
+    ConsumeErrCallBackInfo update_consume_err_cb(const std::string& 
cancel_msg, bool cancel_task,
+                                                 ERRCALLBACK cb_func) {
         _temp_consume_err_cb = _consume_err_cb;
         _consume_err_cb.cancel_msg = cancel_msg;
         _consume_err_cb.cancel_task = cancel_task;
@@ -127,17 +109,34 @@ public:
     // must increase the control to avoid entering infinite recursion, 
otherwise it may cause crash or stuck,
     void cache_consume(int64_t size);
 
-    void noncache_consume();
+    void noncache_consume(int64_t size);
 
     bool is_attach_task() { return _task_id != ""; }
 
-    std::shared_ptr<MemTracker> mem_tracker() {
-        DCHECK(_mem_trackers[_tracker_id]) << ", label: " << 
_mem_tracker_labels[_tracker_id];
-        if (_mem_trackers[_tracker_id]) {
-            return _mem_trackers[_tracker_id];
-        } else {
-            return MemTracker::get_process_tracker();
+    std::shared_ptr<MemTracker> mem_tracker();
+
+    int64_t switch_count = 0;
+
+    std::string print_debug_string() {
+        fmt::memory_buffer mem_trackers_buf;
+        for (const auto& [key, value] : _mem_trackers) {
+            fmt::format_to(mem_trackers_buf, "{}_{},", std::to_string(key), 
value->log_usage(1));
         }
+        fmt::memory_buffer untracked_mems_buf;
+        for (const auto& [key, value] : _untracked_mems) {
+            fmt::format_to(untracked_mems_buf, "{}_{},", std::to_string(key),
+                           std::to_string(value));
+        }
+        fmt::memory_buffer mem_tracker_labels_buf;
+        for (const auto& [key, value] : _mem_tracker_labels) {
+            fmt::format_to(mem_tracker_labels_buf, "{}_{},", 
std::to_string(key), value);
+        }
+        return fmt::format(
+                "ThreadMemTrackerMgr debug string, _tracker_id:{}, 
_untracked_mem:{}, _task_id:{}, "
+                "_mem_trackers:<{}>, _untracked_mems:<{}>, 
_mem_tracker_labels:<{}>",
+                std::to_string(_tracker_id), std::to_string(_untracked_mem), 
_task_id,
+                fmt::to_string(mem_trackers_buf), 
fmt::to_string(untracked_mems_buf),
+                fmt::to_string(mem_tracker_labels_buf));
     }
 
 private:
@@ -175,39 +174,71 @@ private:
     ConsumeErrCallBackInfo _consume_err_cb;
 };
 
+inline void ThreadMemTrackerMgr::init() {
+    _tracker_id = 0;
+    _mem_trackers.clear();
+    _mem_trackers[0] = MemTracker::get_process_tracker();
+    _untracked_mems.clear();
+    _untracked_mems[0] = 0;
+    _mem_tracker_labels.clear();
+    _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
+}
+
+inline void ThreadMemTrackerMgr::init_bthread() {
+    init();
+    _mem_trackers[1] = MemTracker::get_brpc_server_tracker();
+    _untracked_mems[1] = 0;
+    _mem_tracker_labels[1] = MemTracker::get_brpc_server_tracker()->label();
+    _tracker_id = 1;
+}
+
+inline void ThreadMemTrackerMgr::clear_untracked_mems() {
+    for (const auto& untracked_mem : _untracked_mems) {
+        if (untracked_mem.second != 0) {
+            DCHECK(_mem_trackers[untracked_mem.first]) << print_debug_string();
+            _mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
+        }
+    }
+    mem_tracker()->consume(_untracked_mem);
+    _untracked_mem = 0;
+}
+
 template <bool Existed>
 inline int64_t ThreadMemTrackerMgr::update_tracker(const 
std::shared_ptr<MemTracker>& mem_tracker) {
-    DCHECK(mem_tracker);
+    DCHECK(mem_tracker) << print_debug_string();
     _temp_tracker_id = mem_tracker->id();
     if (_temp_tracker_id == _tracker_id) {
         return _tracker_id;
     }
     if (Existed) {
-        DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end());
+        DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end()) << 
print_debug_string();
     } else {
+        // If the tracker has already been added, avoid `_untracked_mems[x] = 
0;` again causing the memory track to be lost.
         if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) {
             _mem_trackers[_temp_tracker_id] = mem_tracker;
-            DCHECK(_mem_trackers[_temp_tracker_id]);
+            DCHECK(_mem_trackers[_temp_tracker_id]) << print_debug_string();
             _untracked_mems[_temp_tracker_id] = 0;
             _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
         }
     }
 
+    DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << 
print_debug_string();
+    DCHECK(_mem_trackers[_tracker_id]) << print_debug_string();
     _untracked_mems[_tracker_id] += _untracked_mem;
     _untracked_mem = 0;
     std::swap(_tracker_id, _temp_tracker_id);
-    DCHECK(_mem_trackers[_tracker_id]) << ", label: " << 
_mem_tracker_labels[_tracker_id];
+    DCHECK(_mem_trackers[_tracker_id]) << print_debug_string();
     return _temp_tracker_id; // old tracker_id
 }
 
 inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) {
+    DCHECK(switch_count >= 0) << print_debug_string();
     if (tracker_id != _tracker_id) {
         _untracked_mems[_tracker_id] += _untracked_mem;
         _untracked_mem = 0;
         _tracker_id = tracker_id;
-        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end())
-                << ", label: " << _mem_tracker_labels[_tracker_id];
-        DCHECK(_mem_trackers[_tracker_id]);
+        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << 
print_debug_string();
+        DCHECK(_mem_trackers[_tracker_id]) << print_debug_string();
     }
 }
 
@@ -218,7 +249,7 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t 
size) {
     // it will cause tracker->consumption to be temporarily less than 0.
     if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
         _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
-        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
+        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << 
print_debug_string();
         // Allocating memory in the Hook command causes the TCMalloc Hook to 
be entered again, infinite recursion.
         // Needs to ensure that all memory allocated in 
mem_tracker.consume/try_consume is freed in time to avoid tracking misses.
         start_thread_mem_tracker = false;
@@ -227,21 +258,36 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t 
size) {
             _untracked_mem += _untracked_mems[_tracker_id];
             _untracked_mems[_tracker_id] = 0;
         }
-        noncache_consume();
+        noncache_consume(_untracked_mem);
+        _untracked_mem = 0;
         start_thread_mem_tracker = true;
     }
 }
 
-inline void ThreadMemTrackerMgr::noncache_consume() {
-    DCHECK(_mem_trackers[_tracker_id]) << ", label: " << 
_mem_tracker_labels[_tracker_id];
-    Status st = mem_tracker()->try_consume(_untracked_mem);
+inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) {
+    Status st = mem_tracker()->try_consume(size);
     if (!st) {
         // The memory has been allocated, so when TryConsume fails, need to 
continue to complete
         // the consume to ensure the accuracy of the statistics.
-        mem_tracker()->consume(_untracked_mem);
-        exceeded(_untracked_mem, st);
+        mem_tracker()->consume(size);
+        exceeded(size, st);
     }
-    _untracked_mem = 0;
+}
+
+inline void ThreadMemTrackerMgr::add_tracker(const 
std::shared_ptr<MemTracker>& mem_tracker) {
+    DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) << 
print_debug_string();
+    _mem_trackers[mem_tracker->id()] = mem_tracker;
+    DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string();
+    _untracked_mems[mem_tracker->id()] = 0;
+    _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
+}
+
+inline std::shared_ptr<MemTracker> ThreadMemTrackerMgr::mem_tracker() {
+    // Whether the key _tracker_id exists in _mem_trackers.
+    DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << 
print_debug_string();
+    // If the key _tracker_id exists in _mem_trackers, check whether the value 
is null.
+    DCHECK(_mem_trackers[_tracker_id]) << print_debug_string();
+    return _mem_trackers[_tracker_id];
 }
 
 } // namespace doris
diff --git a/be/src/service/brpc.h b/be/src/service/brpc.h
index 031a9d6697..6e1b348ac7 100644
--- a/be/src/service/brpc.h
+++ b/be/src/service/brpc.h
@@ -17,33 +17,10 @@
 
 #pragma once
 
-// This file is used to fixed macro conflict between butil and gutil
 // all header need by brpc is contain in this file.
-// include this file instead of include <brpc/xxx.h>
-// and this file must put the first include in source file
+// include this file instead of include <brpc/xxx.h>.
 
-#include "gutil/macros.h"
-// Macros in the guti/macros.h, use butil's define
-#ifdef DISALLOW_IMPLICIT_CONSTRUCTORS
-#undef DISALLOW_IMPLICIT_CONSTRUCTORS
-#endif
-
-#ifdef arraysize
-#undef arraysize
-#endif
-
-#undef OVERRIDE
-#undef FINAL
-
-// use be/src/gutil/integral_types.h override butil/basictypes.h
-#include "gutil/integral_types.h"
-#ifdef BASE_INTEGRAL_TYPES_H_
-#define BUTIL_BASICTYPES_H_
-#endif
-
-#ifdef DEBUG_MODE
-#undef DEBUG_MODE
-#endif
+#include <service/brpc_conflict.h>
 
 #include <brpc/channel.h>
 #include <brpc/closure_guard.h>
@@ -51,6 +28,8 @@
 #include <brpc/protocol.h>
 #include <brpc/reloadable_flags.h>
 #include <brpc/server.h>
+#include <bthread/bthread.h>
+#include <bthread/types.h>
 #include <butil/containers/flat_map.h>
 #include <butil/containers/flat_map_inl.h>
 #include <butil/endpoint.h>
diff --git a/be/src/service/brpc.h b/be/src/service/brpc_conflict.h
similarity index 75%
copy from be/src/service/brpc.h
copy to be/src/service/brpc_conflict.h
index 031a9d6697..35ef1b815c 100644
--- a/be/src/service/brpc.h
+++ b/be/src/service/brpc_conflict.h
@@ -18,8 +18,6 @@
 #pragma once
 
 // This file is used to fixed macro conflict between butil and gutil
-// all header need by brpc is contain in this file.
-// include this file instead of include <brpc/xxx.h>
 // and this file must put the first include in source file
 
 #include "gutil/macros.h"
@@ -32,6 +30,10 @@
 #undef arraysize
 #endif
 
+#ifdef ARRAY_SIZE
+#undef ARRAY_SIZE
+#endif
+
 #undef OVERRIDE
 #undef FINAL
 
@@ -44,15 +46,3 @@
 #ifdef DEBUG_MODE
 #undef DEBUG_MODE
 #endif
-
-#include <brpc/channel.h>
-#include <brpc/closure_guard.h>
-#include <brpc/controller.h>
-#include <brpc/protocol.h>
-#include <brpc/reloadable_flags.h>
-#include <brpc/server.h>
-#include <butil/containers/flat_map.h>
-#include <butil/containers/flat_map_inl.h>
-#include <butil/endpoint.h>
-#include <butil/fd_utility.h>
-#include <butil/macros.h>
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 4cb6b8f7ee..5080b1c218 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -29,6 +29,7 @@
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/routine_load/routine_load_task_executor.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "service/brpc.h"
 #include "util/brpc_client_cache.h"
 #include "util/md5.h"
@@ -42,16 +43,24 @@ namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, 
MetricUnit::NOUNIT);
 
+bthread_key_t btls_key;
+
+static void thread_context_deleter(void* d) {
+    delete static_cast<ThreadContext*>(d);
+}
+
 template <typename T>
 PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env), 
_tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
     REGISTER_HOOK_METRIC(add_batch_task_queue_size,
                          [this]() { return 
_tablet_worker_pool.get_queue_size(); });
+    CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
 }
 
 template <typename T>
 PInternalServiceImpl<T>::~PInternalServiceImpl() {
     DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
+    CHECK_EQ(0, bthread_key_delete(btls_key));
 }
 
 template <typename T>
@@ -59,6 +68,7 @@ void 
PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
                                             const PTransmitDataParams* request,
                                             PTransmitDataResult* response,
                                             google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_ROW << "transmit data: fragment_instance_id=" << 
print_id(request->finst_id())
              << " node=" << request->node_id();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -84,6 +94,7 @@ void 
PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController
                                                  const 
PTabletWriterOpenRequest* request,
                                                  PTabletWriterOpenResult* 
response,
                                                  google::protobuf::Closure* 
done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << 
request->index_id()
              << ", txn_id=" << request->txn_id();
     brpc::ClosureGuard closure_guard(done);
@@ -101,6 +112,7 @@ void 
PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
                                                  const 
PExecPlanFragmentRequest* request,
                                                  PExecPlanFragmentResult* 
response,
                                                  google::protobuf::Closure* 
done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
     bool compact = request->has_compact() ? request->compact() : false;
@@ -116,6 +128,7 @@ void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
                                                       const 
PTabletWriterAddBatchRequest* request,
                                                       
PTabletWriterAddBatchResult* response,
                                                       
google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer add batch, id=" << request->id()
              << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
              << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
@@ -150,6 +163,7 @@ void 
PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
                                                    const 
PTabletWriterCancelRequest* request,
                                                    PTabletWriterCancelResult* 
response,
                                                    google::protobuf::Closure* 
done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" 
<< request->index_id()
              << ", sender_id=" << request->sender_id();
     brpc::ClosureGuard closure_guard(done);
@@ -177,6 +191,7 @@ void 
PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll
                                                    const 
PCancelPlanFragmentRequest* request,
                                                    PCancelPlanFragmentResult* 
result,
                                                    google::protobuf::Closure* 
done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId tid;
     tid.__set_hi(request->finst_id().hi());
@@ -201,6 +216,7 @@ template <typename T>
 void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* 
cntl_base,
                                          const PFetchDataRequest* request, 
PFetchDataResult* result,
                                          google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
     _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
@@ -210,6 +226,7 @@ template <typename T>
 void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* 
controller,
                                        const PProxyRequest* request, 
PProxyResult* response,
                                        google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     // PProxyRequest is defined in gensrc/proto/internal_service.proto
     // Currently it supports 2 kinds of requests:
@@ -272,6 +289,7 @@ void 
PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController* cont
                                            const PUpdateCacheRequest* request,
                                            PCacheResponse* response,
                                            google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->update(request, response);
 }
@@ -281,6 +299,7 @@ void 
PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController* contr
                                           const PFetchCacheRequest* request,
                                           PFetchCacheResult* result,
                                           google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->fetch(request, result);
 }
@@ -290,6 +309,7 @@ void 
PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* contr
                                           const PClearCacheRequest* request,
                                           PCacheResponse* response,
                                           google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->clear(request, response);
 }
@@ -299,6 +319,7 @@ void 
PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* co
                                            const ::doris::PMergeFilterRequest* 
request,
                                            ::doris::PMergeFilterResponse* 
response,
                                            ::google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto buf = 
static_cast<brpc::Controller*>(controller)->request_attachment();
     Status st = _exec_env->fragment_mgr()->merge_filter(request, 
buf.to_string().data());
@@ -313,6 +334,7 @@ void 
PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
                                            const 
::doris::PPublishFilterRequest* request,
                                            ::doris::PPublishFilterResponse* 
response,
                                            ::google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
     UniqueId unique_id(request->query_id());
@@ -329,6 +351,7 @@ template <typename T>
 void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* 
controller,
                                         const PSendDataRequest* request, 
PSendDataResult* response,
                                         google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -352,6 +375,7 @@ template <typename T>
 void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* 
controller,
                                      const PCommitRequest* request, 
PCommitResult* response,
                                      google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -370,6 +394,7 @@ template <typename T>
 void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* 
controller,
                                        const PRollbackRequest* request, 
PRollbackResult* response,
                                        google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -389,6 +414,7 @@ void 
PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController
                                                  const PConstantExprRequest* 
request,
                                                  PConstantExprResult* response,
                                                  google::protobuf::Closure* 
done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
 
@@ -425,6 +451,7 @@ void 
PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn
                                              const PTransmitDataParams* 
request,
                                              PTransmitDataResult* response,
                                              google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_ROW << "transmit data: fragment_instance_id=" << 
print_id(request->finst_id())
              << " node=" << request->node_id();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -450,6 +477,7 @@ void 
PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController*
                                                 const PCheckRPCChannelRequest* 
request,
                                                 PCheckRPCChannelResponse* 
response,
                                                 google::protobuf::Closure* 
done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(0);
     if (request->data().size() != request->size()) {
@@ -477,6 +505,7 @@ void 
PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController*
                                                 const PResetRPCChannelRequest* 
request,
                                                 PResetRPCChannelResponse* 
response,
                                                 google::protobuf::Closure* 
done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(0);
     if (request->all()) {
@@ -511,6 +540,7 @@ void 
PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_b
                                          const PHandShakeRequest* request,
                                          PHandShakeResponse* response,
                                          google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     if (request->has_hello()) {
         response->set_hello(request->hello());
diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h
index 7526a7728a..dabf87ee0f 100644
--- a/be/src/util/bit_util.h
+++ b/be/src/util/bit_util.h
@@ -25,7 +25,6 @@
 
 #include "common/compiler_util.h"
 #include "gutil/bits.h"
-#include "gutil/port.h"
 #include "util/cpu_info.h"
 #ifdef __aarch64__
 #include "sse2neon.h"
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 0163815c76..8bd9b05847 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -137,6 +137,7 @@ 
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MIC
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count, 
MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_count, 
MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_err_cb_count, 
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_bthread_count, MetricUnit::NOUNIT);
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES);
 DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num, 
MetricUnit::NOUNIT);
@@ -286,6 +287,7 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
attach_task_thread_count);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
switch_thread_mem_tracker_count);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
switch_thread_mem_tracker_err_cb_count);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_bthread_count);
 
     _server_metric_entity->register_hook(_s_hook_name, 
std::bind(&DorisMetrics::_update, this));
 
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index aa59d1770b..602eb78a7e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -130,6 +130,8 @@ public:
     IntCounter* attach_task_thread_count;
     IntCounter* switch_thread_mem_tracker_count;
     IntCounter* switch_thread_mem_tracker_err_cb_count;
+    // brpc server response count
+    IntCounter* switch_bthread_count;
 
     IntGauge* memory_pool_bytes_total;
     IntGauge* process_thread_num;
diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp
index 3fd4a3bba6..a971acce86 100644
--- a/be/src/util/file_utils.cpp
+++ b/be/src/util/file_utils.cpp
@@ -34,6 +34,7 @@
 #include "gutil/strings/strip.h"
 #include "gutil/strings/substitute.h"
 #include "olap/file_helper.h"
+#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 
 namespace doris {
@@ -196,11 +197,13 @@ Status FileUtils::md5sum(const std::string& file, 
std::string* md5sum) {
         return Status::InternalError("failed to stat file");
     }
     size_t file_len = statbuf.st_size;
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(file_len);
     void* buf = mmap(0, file_len, PROT_READ, MAP_SHARED, fd, 0);
 
     unsigned char result[MD5_DIGEST_LENGTH];
     MD5((unsigned char*)buf, file_len, result);
     munmap(buf, file_len);
+    RELEASE_THREAD_LOCAL_MEM_TRACKER(file_len);
 
     std::stringstream ss;
     for (int32_t i = 0; i < MD5_DIGEST_LENGTH; i++) {
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 2a50dabf5c..216864dbb8 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -29,6 +29,7 @@
 #include <exception>
 
 #include "common/status.h"
+#include "runtime/thread_context.h"
 
 #ifdef NDEBUG
 #define ALLOCATOR_ASLR 0
@@ -137,15 +138,18 @@ public:
         } else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) {
             /// Resize mmap'd memory region.
             // CurrentMemoryTracker::realloc(old_size, new_size);
+            CONSUME_THREAD_LOCAL_MEM_TRACKER(new_size - old_size);
 
             // On apple and freebsd self-implemented mremap used 
(common/mremap.h)
             buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, 
PROT_READ | PROT_WRITE,
                                     mmap_flags, -1, 0);
-            if (MAP_FAILED == buf)
+            if (MAP_FAILED == buf){
+                RELEASE_THREAD_LOCAL_MEM_TRACKER(new_size - old_size);
                 doris::vectorized::throwFromErrno("Allocator: Cannot mremap 
memory chunk from " +
                                                           
std::to_string(old_size) + " to " +
                                                           
std::to_string(new_size) + ".",
                                                   
doris::TStatusCode::VEC_CANNOT_MREMAP);
+            }
 
             /// No need for zero-fill, because mmap guarantees it.
         } else if (new_size < MMAP_THRESHOLD) {
@@ -197,10 +201,13 @@ private:
                                 alignment, size),
                         doris::TStatusCode::VEC_BAD_ARGUMENTS);
 
+            CONSUME_THREAD_LOCAL_MEM_TRACKER(size);
             buf = mmap(get_mmap_hint(), size, PROT_READ | PROT_WRITE, 
mmap_flags, -1, 0);
-            if (MAP_FAILED == buf)
+            if (MAP_FAILED == buf) {
+                RELEASE_THREAD_LOCAL_MEM_TRACKER(size);
                 doris::vectorized::throwFromErrno(fmt::format("Allocator: 
Cannot mmap {}.", size),
                                                   
doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY);
+            }
 
             /// No need for zero-fill, because mmap guarantees it.
         } else {
@@ -231,9 +238,12 @@ private:
 
     void free_no_track(void* buf, size_t size) {
         if (size >= MMAP_THRESHOLD) {
-            if (0 != munmap(buf, size))
+            if (0 != munmap(buf, size)) {
                 doris::vectorized::throwFromErrno(fmt::format("Allocator: 
Cannot munmap {}.", size),
                                                   
doris::TStatusCode::VEC_CANNOT_MUNMAP);
+            } else {
+                RELEASE_THREAD_LOCAL_MEM_TRACKER(size);
+            }
         } else {
             ::free(buf);
         }
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index ea4e61e7fe..57cbcff921 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -66,6 +66,7 @@ Status VExchangeNode::prepare(RuntimeState* state) {
 Status VExchangeNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
 
     if (_is_merging) {
@@ -84,7 +85,6 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* 
row_batch, bool* e
 Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     SCOPED_TIMER(runtime_profile()->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
-    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     auto status = _stream_recvr->get_next(block, eos);
     if (block != nullptr) {
         if (_num_rows_returned + block->rows() < _limit) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to