This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 26bc462e1c [feature-wip] (memory tracker) (step5) Fix track bthread, fix track vectorized query (#9145) 26bc462e1c is described below commit 26bc462e1c46dd751de15d3b477d59015a948476 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Apr 27 20:34:02 2022 +0800 [feature-wip] (memory tracker) (step5) Fix track bthread, fix track vectorized query (#9145) 1. fix track bthread - Bthread, a high performance M:N thread library used by brpc. In Doris, a brpc server response runs on one bthread, possibly on multiple pthreads. Currently, MemTracker consumption relies on pthread local variables (TLS). - This caused pthread TLS MemTracker confusion when switching pthread TLS MemTracker in brpc server response. So replacing pthread TLS with bthread TLS in the brpc server response saves the MemTracker. Ref: https://github.com/apache/incubator-brpc/blob/731730da85f6af5c25012b4c83ab5bb371320cf8/docs/en/server.md#bthread-local 2. fix track vectorized query - Added track mmap. Currently, mmap allocates memory in many places of the vectorized execution engine. - Refactored ThreadContext to avoid dependency conflicts and make it easier to debug. - Fix some bugs. --- be/src/common/utils.h | 2 + be/src/exec/exchange_node.cpp | 2 +- be/src/exec/olap_scanner.cpp | 2 +- be/src/exec/tablet_sink.cpp | 3 +- be/src/exec/tablet_sink.h | 4 +- be/src/olap/byte_buffer.cpp | 11 +- be/src/olap/lru_cache.cpp | 3 +- be/src/olap/out_stream.cpp | 2 +- be/src/olap/out_stream.h | 1 - be/src/olap/rowset/column_writer.cpp | 3 +- be/src/olap/rowset/column_writer.h | 1 - be/src/olap/rowset/segment_v2/segment.cpp | 5 +- be/src/runtime/bufferpool/system_allocator.cc | 6 + be/src/runtime/disk_io_mgr.cc | 8 +- be/src/runtime/fold_constant_executor.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 3 +- be/src/runtime/mem_pool.cpp | 2 +- be/src/runtime/mem_tracker.cpp | 20 ++- be/src/runtime/mem_tracker.h | 4 + be/src/runtime/memory/chunk_allocator.cpp | 16 +-- be/src/runtime/memory/system_allocator.cpp | 5 + be/src/runtime/plan_fragment_executor.cpp | 8 +- be/src/runtime/row_batch.cpp | 4 +- be/src/runtime/runtime_filter_mgr.cpp | 4 +- be/src/runtime/sorted_run_merger.cc | 2 +- be/src/runtime/tcmalloc_hook.h | 4 +- be/src/runtime/thread_context.cpp | 127 ++++++++++++++++++ be/src/runtime/thread_context.h | 181 +++++++++++--------------- be/src/runtime/thread_mem_tracker_mgr.cpp | 18 +-- be/src/runtime/thread_mem_tracker_mgr.h | 158 ++++++++++++++-------- be/src/service/brpc.h | 29 +---- be/src/service/{brpc.h => brpc_conflict.h} | 18 +-- be/src/service/internal_service.cpp | 30 +++++ be/src/util/bit_util.h | 1 - be/src/util/doris_metrics.cpp | 2 + be/src/util/doris_metrics.h | 2 + be/src/util/file_utils.cpp | 3 + be/src/vec/common/allocator.h | 16 ++- be/src/vec/exec/vexchange_node.cpp | 2 +- 39 files changed, 452 insertions(+), 262 deletions(-) diff --git a/be/src/common/utils.h b/be/src/common/utils.h index c0ca459423..91eb4427c9 100644 --- a/be/src/common/utils.h +++ b/be/src/common/utils.h @@ -21,7 +21,9 @@ namespace doris { +#ifndef ARRAY_SIZE #define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0])) +#endif struct AuthInfo { std::string user; diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 53473f90a2..f1c221b65c 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -80,6 +80,7 @@ Status ExchangeNode::prepare(RuntimeState* state) { Status ExchangeNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); RETURN_IF_ERROR(ExecNode::open(state)); if (_is_merging) { RETURN_IF_ERROR(_sort_exec_exprs.open(state)); @@ -215,7 +216,6 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next.")); - ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos)); while ((_num_rows_skipped < _offset)) { _num_rows_skipped += output_batch->num_rows(); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 53f9af57a8..68a522236c 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -52,7 +52,7 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool _version(-1), _mem_tracker(MemTracker::create_tracker( tracker->limit(), - tracker->label() + ":OlapScanner:" + thread_local_ctx.get()->thread_id_str(), + tracker->label() + ":OlapScanner:" + tls_ctx()->thread_id_str(), tracker)) {} Status OlapScanner::prepare( diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index e8b35c16d1..9928b7e26f 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -50,7 +50,7 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int _tuple_data_buffer_ptr = &_tuple_data_buffer; } _node_channel_tracker = - MemTracker::create_tracker(-1, "NodeChannel" + thread_local_ctx.get()->thread_id_str()); + MemTracker::create_tracker(-1, "NodeChannel" + tls_ctx()->thread_id_str()); } NodeChannel::~NodeChannel() noexcept { @@ -654,6 +654,7 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) { void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker); const auto& it = _tablets_by_channel.find(node_id); if (it == _tablets_by_channel.end()) { return; diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 2fa75587e7..1a902e834b 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -33,6 +33,7 @@ #include "exec/tablet_info.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" +#include "runtime/thread_context.h" #include "util/bitmap.h" #include "util/countdown_latch.h" #include "util/ref_count_closure.h" @@ -325,6 +326,7 @@ public: void for_each_node_channel( const std::function<void(const std::shared_ptr<NodeChannel>&)>& func) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker); for (auto& it : _node_channels) { func(it.second); } @@ -365,7 +367,7 @@ private: std::unordered_map<int64_t, std::string> _failed_channels_msgs; Status _intolerable_failure_status = Status::OK(); - std::shared_ptr<MemTracker> _index_channel_tracker; // TODO(zxy) use after + std::shared_ptr<MemTracker> _index_channel_tracker; }; // Write data to Olap Table. diff --git a/be/src/olap/byte_buffer.cpp b/be/src/olap/byte_buffer.cpp index a3099e4e25..822b18cc50 100644 --- a/be/src/olap/byte_buffer.cpp +++ b/be/src/olap/byte_buffer.cpp @@ -20,6 +20,7 @@ #include <sys/mman.h> #include "olap/utils.h" +#include "runtime/thread_context.h" namespace doris { @@ -42,6 +43,8 @@ void StorageByteBuffer::BufDeleter::operator()(char* p) { if (0 != munmap(p, _mmap_length)) { LOG(FATAL) << "fail to munmap: mem=" << p << ", len=" << _mmap_length << ", errno=" << Errno::no() << ", errno_str=" << Errno::str(); + } else { + RELEASE_THREAD_LOCAL_MEM_TRACKER(_mmap_length); } } else { delete[] p; @@ -93,10 +96,12 @@ StorageByteBuffer* StorageByteBuffer::reference_buffer(StorageByteBuffer* refere StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int prot, int flags, int fd, uint64_t offset) { + CONSUME_THREAD_LOCAL_MEM_TRACKER(length); char* memory = (char*)::mmap(start, length, prot, flags, fd, offset); if (MAP_FAILED == memory) { OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", Errno::no(), Errno::str()); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } @@ -108,6 +113,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int pro if (nullptr == buf) { deleter(memory); OLAP_LOG_WARNING("fail to allocate StorageByteBuffer."); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } @@ -128,10 +134,12 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset size_t length = handler->length(); int fd = handler->fd(); + CONSUME_THREAD_LOCAL_MEM_TRACKER(length); char* memory = (char*)::mmap(nullptr, length, prot, flags, fd, offset); if (MAP_FAILED == memory) { OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", Errno::no(), Errno::str()); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } @@ -143,6 +151,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset if (nullptr == buf) { deleter(memory); OLAP_LOG_WARNING("fail to allocate StorageByteBuffer."); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } @@ -173,7 +182,7 @@ Status StorageByteBuffer::put(uint64_t index, char src) { } Status StorageByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset, - uint64_t length) { + uint64_t length) { //没有足够的空间可以写 if (length > remaining()) { return Status::OLAPInternalError(OLAP_ERR_BUFFER_OVERFLOW); diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 0f5ebaa395..1a045b24a3 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -475,8 +475,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t CachePriority priority) { // The memory of the parameter value should be recorded in the tls mem tracker, // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker. - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), - charge); + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge); SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const uint32_t hash = _hash_slice(key); return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority); diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp index 475d0ca98f..b5d19b841a 100644 --- a/be/src/olap/out_stream.cpp +++ b/be/src/olap/out_stream.cpp @@ -25,7 +25,7 @@ namespace doris { OutStreamFactory::OutStreamFactory(CompressKind compress_kind, uint32_t stream_buffer_size) - : _compress_kind(compress_kind), _stream_buffer_size(stream_buffer_size) { + : _stream_buffer_size(stream_buffer_size) { switch (compress_kind) { case COMPRESS_NONE: _compressor = nullptr; diff --git a/be/src/olap/out_stream.h b/be/src/olap/out_stream.h index a67a6c5204..b1954ce6cf 100644 --- a/be/src/olap/out_stream.h +++ b/be/src/olap/out_stream.h @@ -141,7 +141,6 @@ public: private: std::map<StreamName, OutStream*> _streams; // All created streams - CompressKind _compress_kind; Compressor _compressor; uint32_t _stream_buffer_size; diff --git a/be/src/olap/rowset/column_writer.cpp b/be/src/olap/rowset/column_writer.cpp index d750db1ec1..6128ab4f6f 100644 --- a/be/src/olap/rowset/column_writer.cpp +++ b/be/src/olap/rowset/column_writer.cpp @@ -488,8 +488,7 @@ void ByteColumnWriter::record_position() { IntegerColumnWriter::IntegerColumnWriter(uint32_t column_id, uint32_t unique_column_id, OutStreamFactory* stream_factory, bool is_singed) - : _column_id(column_id), - _unique_column_id(unique_column_id), + : _unique_column_id(unique_column_id), _stream_factory(stream_factory), _writer(nullptr), _is_signed(is_singed) {} diff --git a/be/src/olap/rowset/column_writer.h b/be/src/olap/rowset/column_writer.h index 9fe2d60f2a..a40a3780ba 100644 --- a/be/src/olap/rowset/column_writer.h +++ b/be/src/olap/rowset/column_writer.h @@ -179,7 +179,6 @@ public: Status flush() { return _writer->flush(); } private: - uint32_t _column_id; uint32_t _unique_column_id; OutStreamFactory* _stream_factory; RunLengthIntegerWriter* _writer; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index b06083e996..7970d0da5b 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -51,10 +51,9 @@ Segment::Segment(const FilePathDesc& path_desc, uint32_t segment_id, const TabletSchema* tablet_schema) : _path_desc(path_desc), _segment_id(segment_id), _tablet_schema(tablet_schema) { #ifndef BE_TEST - _mem_tracker = MemTracker::create_virtual_tracker( - -1, "Segment", StorageEngine::instance()->tablet_mem_tracker()); + _mem_tracker = StorageEngine::instance()->tablet_mem_tracker(); #else - _mem_tracker = MemTracker::create_virtual_tracker(-1, "Segment"); + _mem_tracker = MemTracker::get_process_tracker(); #endif } diff --git a/be/src/runtime/bufferpool/system_allocator.cc b/be/src/runtime/bufferpool/system_allocator.cc index a2dfc394b1..3fa69e981e 100644 --- a/be/src/runtime/bufferpool/system_allocator.cc +++ b/be/src/runtime/bufferpool/system_allocator.cc @@ -22,6 +22,7 @@ #include "common/config.h" #include "gutil/strings/substitute.h" +#include "runtime/thread_context.h" #include "util/bit_util.h" #include "util/error_util.h" @@ -75,9 +76,11 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { // Map an extra huge page so we can fix up the alignment if needed. map_len += HUGE_PAGE_SIZE; } + CONSUME_THREAD_LOCAL_MEM_TRACKER(map_len); uint8_t* mem = reinterpret_cast<uint8_t*>( mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0)); if (mem == MAP_FAILED) { + RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len); return Status::BufferAllocFailed("mmap failed"); } @@ -89,10 +92,12 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { if (misalignment != 0) { uintptr_t fixup = HUGE_PAGE_SIZE - misalignment; munmap(mem, fixup); + RELEASE_THREAD_LOCAL_MEM_TRACKER(fixup); mem += fixup; map_len -= fixup; } munmap(mem + len, map_len - len); + RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len - len); DCHECK_EQ(reinterpret_cast<uintptr_t>(mem) % HUGE_PAGE_SIZE, 0) << mem; // Mark the buffer as a candidate for promotion to huge pages. The Linux Transparent // Huge Pages implementation will try to back the memory with a huge page if it is @@ -142,6 +147,7 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) { void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) { if (config::mmap_buffers) { int rc = munmap(buffer.data(), buffer.len()); + RELEASE_THREAD_LOCAL_MEM_TRACKER(buffer.len()); DCHECK_EQ(rc, 0) << "Unexpected munmap() error: " << errno; } else { bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages; diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index ebedb58057..1f65851809 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -220,7 +220,7 @@ void DiskIoMgr::BufferDescriptor::reset(RequestContext* reader, ScanRange* range _eosr = false; _status = Status::OK(); // Consume in the tls mem tracker when the buffer is allocated. - _buffer_mem_tracker = thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(); + _buffer_mem_tracker = tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(); } void DiskIoMgr::BufferDescriptor::return_buffer() { @@ -739,7 +739,7 @@ char* DiskIoMgr::get_free_buffer(int64_t* buffer_size) { buffer = new char[*buffer_size]; } else { // This means the buffer's memory ownership is transferred from DiskIoMgr to tls tracker. - _mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), *buffer_size); + _mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), *buffer_size); buffer = _free_buffers[idx].front(); _free_buffers[idx].pop_front(); } @@ -767,7 +767,7 @@ void DiskIoMgr::gc_io_buffers(int64_t bytes_to_free) { // The deleted buffer is released in the tls mem tracker, the deleted buffer belongs to DiskIoMgr, // so the freed memory should be recorded in the DiskIoMgr mem tracker. So if the tls mem tracker // and the DiskIoMgr tracker are different, transfer memory ownership. - _mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), bytes_freed); + _mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), bytes_freed); } void DiskIoMgr::return_free_buffer(BufferDescriptor* desc) { @@ -793,7 +793,7 @@ void DiskIoMgr::return_free_buffer(char* buffer, int64_t buffer_size, MemTracker // The deleted buffer is released in the tls mem tracker. When the buffer was allocated, // it was consumed in BufferDescriptor->buffer_mem_tracker, so if the tls mem tracker and // the tracker in the parameters are different, transfer memory ownership. - tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), buffer_size); + tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), buffer_size); } } diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 4a51b91932..5cdfcb084e 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -44,6 +44,7 @@ TUniqueId FoldConstantExecutor::_dummy_id; Status FoldConstantExecutor::fold_constant_expr( const TFoldConstantParams& params, PConstantExprResult* response) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const auto& expr_map = params.expr_map; auto expr_result_map = response->mutable_expr_result_map(); @@ -53,7 +54,6 @@ Status FoldConstantExecutor::fold_constant_expr( if (UNLIKELY(!status.ok())) { return status; } - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); for (const auto& m : expr_map) { PExprResultMap pexpr_result_map; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 34ac356909..d12bc85f26 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -467,8 +467,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi .instance_id(exec_state->fragment_instance_id()) .tag("pthread_id", std::to_string((uintptr_t)pthread_self())); #ifndef BE_TEST - SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state()->query_type(), - print_id(exec_state->query_id()), exec_state->fragment_instance_id(), + SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state(), exec_state->executor()->runtime_state()->instance_mem_tracker()); #endif exec_state->execute(); diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index a2b9bf3424..adc86f0e67 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -65,7 +65,7 @@ MemPool::MemPool() total_allocated_bytes_(0), total_reserved_bytes_(0), peak_allocated_bytes_(0), - _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get()) {} + _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get()) {} MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) { DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size); diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index 86f7c0370b..49f8862b0b 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -60,6 +60,19 @@ MemTracker* MemTracker::get_raw_process_tracker() { return raw_process_tracker; } +// Track memory for all brpc server responses. +static std::shared_ptr<MemTracker> brpc_server_tracker; +static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT; + +void MemTracker::create_brpc_server_tracker() { + brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", get_process_tracker(), MemTrackerLevel::OVERVIEW); +} + +std::shared_ptr<MemTracker> MemTracker::get_brpc_server_tracker() { + GoogleOnceInit(&brpc_server_tracker_once, &MemTracker::create_brpc_server_tracker); + return brpc_server_tracker; +} + void MemTracker::list_process_trackers(std::vector<std::shared_ptr<MemTracker>>* trackers) { trackers->clear(); std::deque<std::shared_ptr<MemTracker>> to_process; @@ -88,7 +101,8 @@ std::shared_ptr<MemTracker> MemTracker::create_tracker(int64_t byte_limit, const const std::shared_ptr<MemTracker>& parent, MemTrackerLevel level, RuntimeProfile* profile) { - std::shared_ptr<MemTracker> reset_parent = parent ? parent : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker(); + std::shared_ptr<MemTracker> reset_parent = + parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); DCHECK(reset_parent); std::shared_ptr<MemTracker> tracker( @@ -102,7 +116,8 @@ std::shared_ptr<MemTracker> MemTracker::create_tracker(int64_t byte_limit, const std::shared_ptr<MemTracker> MemTracker::create_virtual_tracker( int64_t byte_limit, const std::string& label, const std::shared_ptr<MemTracker>& parent, MemTrackerLevel level) { - std::shared_ptr<MemTracker> reset_parent = parent ? parent : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker(); + std::shared_ptr<MemTracker> reset_parent = + parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); DCHECK(reset_parent); std::shared_ptr<MemTracker> tracker( @@ -121,6 +136,7 @@ MemTracker::MemTracker(int64_t byte_limit, const std::string& label, RuntimeProfile* profile) : _limit(byte_limit), _label(label), + // Not 100% sure the id is unique. This is generated because it is faster than converting to int after hash. _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()), _parent(parent), _level(level) { diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 3d0e3f7271..74a7b4bef6 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -97,6 +97,8 @@ public: // Gets a shared_ptr to the "process" tracker, creating it if necessary. static std::shared_ptr<MemTracker> get_process_tracker(); static MemTracker* get_raw_process_tracker(); + // Gets a shared_ptr to the "brpc server" tracker, creating it if necessary. + static std::shared_ptr<MemTracker> get_brpc_server_tracker(); Status check_sys_mem_info(int64_t bytes) { if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) { @@ -464,6 +466,8 @@ private: // Creates the process tracker. static void create_process_tracker(); + // Creates the brpc server tracker. + static void create_brpc_server_tracker(); // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. int64_t _limit; diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 2d2a9a5c01..7f8259c034 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -17,6 +17,8 @@ #include "runtime/memory/chunk_allocator.h" +#include <sanitizer/asan_interface.h> + #include <list> #include <mutex> @@ -134,8 +136,7 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit) Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, bool check_limits) { MemTracker* reset_tracker = - tracker ? tracker - : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(); + tracker ? tracker : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(); // In advance, transfer the memory ownership of allocate from ChunkAllocator::tracker to the parameter tracker. // Next, if the allocate is successful, it will exit normally; // if the allocate fails, return this part of the memory to the parameter tracker. @@ -178,7 +179,7 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, chunk->data = SystemAllocator::allocate(size); // The allocated chunk is consumed in the tls mem tracker, we want to consume in the ChunkAllocator tracker, // transfer memory ownership. TODO(zxy) replace with switch tls tracker - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size); + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size); } chunk_pool_system_alloc_count->increment(1); chunk_pool_system_alloc_cost_ns->increment(cost_ns); @@ -208,9 +209,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { // it was consumed in the parameter tracker, so if the tls mem tracker and the parameter // tracker are different, transfer memory ownership. if (tracker) - tracker->transfer_to( - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), - chunk.size); + tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), + chunk.size); } chunk_pool_system_free_count->increment(1); chunk_pool_system_free_cost_ns->increment(cost_ns); @@ -223,8 +223,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { if (tracker) { tracker->transfer_to(_mem_tracker.get(), chunk.size); } else { - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to( - _mem_tracker.get(), chunk.size); + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), + chunk.size); } _arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size); } diff --git a/be/src/runtime/memory/system_allocator.cpp b/be/src/runtime/memory/system_allocator.cpp index 374cec5557..6ed5906f00 100644 --- a/be/src/runtime/memory/system_allocator.cpp +++ b/be/src/runtime/memory/system_allocator.cpp @@ -23,6 +23,7 @@ #include "common/config.h" #include "common/logging.h" +#include "runtime/thread_context.h" namespace doris { @@ -43,6 +44,8 @@ void SystemAllocator::free(uint8_t* ptr, size_t length) { char buf[64]; LOG(ERROR) << "fail to free memory via munmap, errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64); + } else { + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); } } else { ::free(ptr); @@ -63,12 +66,14 @@ uint8_t* SystemAllocator::allocate_via_malloc(size_t length) { } uint8_t* SystemAllocator::allocate_via_mmap(size_t length) { + CONSUME_THREAD_LOCAL_MEM_TRACKER(length); auto ptr = (uint8_t*)mmap(nullptr, length, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); if (ptr == MAP_FAILED) { char buf[64]; LOG(ERROR) << "fail to allocate memory via mmap, errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } return ptr; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 6cdbce39d2..2745215bfb 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -89,9 +89,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, _runtime_state->set_query_fragments_ctx(fragments_ctx); RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id)); - SCOPED_ATTACH_TASK_THREAD(_runtime_state->query_type(), print_id(_runtime_state->query_id()), - _runtime_state->fragment_instance_id(), - _runtime_state->instance_mem_tracker()); + SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), _runtime_state->instance_mem_tracker()); _runtime_state->set_be_number(request.backend_num); if (request.__isset.backend_id) { _runtime_state->set_backend_id(request.backend_id); @@ -442,9 +440,7 @@ void PlanFragmentExecutor::_collect_node_statistics() { } void PlanFragmentExecutor::report_profile() { - SCOPED_ATTACH_TASK_THREAD(_runtime_state->query_type(), print_id(_runtime_state->query_id()), - _runtime_state->fragment_instance_id(), - _runtime_state->instance_mem_tracker()); + SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), _runtime_state->instance_mem_tracker()); VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); DCHECK(_report_status_cb); diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 614f7888f1..06306df512 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -44,7 +44,7 @@ const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024; const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2; RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity) - : _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()), + : _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()), _has_in_flight_row(false), _num_rows(0), _num_uncommitted_rows(0), @@ -70,7 +70,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity) // to allocated string data in special mempool // (change via python script that runs over Data_types.cc) RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch) - : _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()), + : _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()), _has_in_flight_row(false), _num_rows(input_batch.num_rows()), _num_uncommitted_rows(0), diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 4eade51b46..008e9d2e58 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -156,8 +156,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( // LOG(INFO) << "entity filter id:" << filter_id; cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, _fragment_instance_id); cntVal->tracker = MemTracker::create_tracker( - -1, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id, - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()); + -1, tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id, + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()); _filter_map.emplace(filter_id, cntVal); return Status::OK(); } diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc index 11042a4d9d..5bf518178d 100644 --- a/be/src/runtime/sorted_run_merger.cc +++ b/be/src/runtime/sorted_run_merger.cc @@ -129,7 +129,7 @@ public: *done = false; _pull_task_thread = std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, - this, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()); + this, tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()); RETURN_IF_ERROR(next(nullptr, done)); return Status::OK(); diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 9ba55fde8e..548b886299 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -36,11 +36,11 @@ // destructor to control the behavior of consume can lead to unexpected behavior, // like this: if (LIKELY(doris::start_thread_mem_tracker)) { void new_hook(const void* ptr, size_t size) { - doris::thread_local_ctx.get()->consume_mem(tc_nallocx(size, 0)); + doris::tls_ctx()->consume_mem(tc_nallocx(size, 0)); } void delete_hook(const void* ptr) { - doris::thread_local_ctx.get()->release_mem(tc_malloc_size(const_cast<void*>(ptr))); + doris::tls_ctx()->release_mem(tc_malloc_size(const_cast<void*>(ptr))); } void init_hook() { diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 871cd1ebf0..0b71101d97 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -17,6 +17,9 @@ #include "runtime/thread_context.h" +#include "runtime/runtime_state.h" +#include "util/doris_metrics.h" + namespace doris { DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, thread_local_ctx); @@ -29,4 +32,128 @@ ThreadContext* ThreadContextPtr::get() { return thread_local_ctx; } +AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id, + const TUniqueId& fragment_instance_id, + const std::shared_ptr<doris::MemTracker>& mem_tracker) { + DCHECK(task_id != ""); + tls_ctx()->attach(type, task_id, fragment_instance_id, mem_tracker); +} + +AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, + const std::shared_ptr<doris::MemTracker>& mem_tracker) { +#ifndef BE_TEST + DCHECK(mem_tracker); +#endif + tls_ctx()->attach(type, "", TUniqueId(), mem_tracker); +} + +AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type, + const std::shared_ptr<doris::MemTracker>& mem_tracker) { +#ifndef BE_TEST + DCHECK(mem_tracker); +#endif + tls_ctx()->attach(query_to_task_type(query_type), "", TUniqueId(), mem_tracker); +} + +AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type, + const std::shared_ptr<doris::MemTracker>& mem_tracker, + const std::string& task_id, + const TUniqueId& fragment_instance_id) { +#ifndef BE_TEST + DCHECK(task_id != ""); + DCHECK(fragment_instance_id != TUniqueId()); + DCHECK(mem_tracker); +#endif + tls_ctx()->attach(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker); +} + +AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state, + const std::shared_ptr<doris::MemTracker>& mem_tracker) { +#ifndef BE_TEST + DCHECK(print_id(runtime_state->query_id()) != ""); + DCHECK(runtime_state->fragment_instance_id() != TUniqueId()); + DCHECK(mem_tracker); +#endif + tls_ctx()->attach(query_to_task_type(runtime_state->query_type()), + print_id(runtime_state->query_id()), runtime_state->fragment_instance_id(), + mem_tracker); +} + +AttachTaskThread::~AttachTaskThread() { + tls_ctx()->detach(); + DorisMetrics::instance()->attach_task_thread_count->increment(1); +} + +template <bool Existed> +SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker( + const std::shared_ptr<doris::MemTracker>& mem_tracker, bool in_task) { + if (config::memory_verbose_track) { +#ifndef BE_TEST + DCHECK(mem_tracker); + // The thread tracker must be switched after the attach task, otherwise switching + // in the main thread will cause the cached tracker not be cleaned up in time. + DCHECK(in_task == false || tls_ctx()->_thread_mem_tracker_mgr->is_attach_task()); + if (Existed) { + _old_tracker_id = tls_ctx()->_thread_mem_tracker_mgr->update_tracker<true>(mem_tracker); + } else { + _old_tracker_id = + tls_ctx()->_thread_mem_tracker_mgr->update_tracker<false>(mem_tracker); + } +#endif +#ifndef NDEBUG + tls_ctx()->_thread_mem_tracker_mgr->switch_count += 1; +#endif + } +} + +template <bool Existed> +SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() { + if (config::memory_verbose_track) { +#ifndef NDEBUG + tls_ctx()->_thread_mem_tracker_mgr->switch_count -= 1; + DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1); +#endif +#ifndef BE_TEST + tls_ctx()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id); +#endif + } +} + +SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack( + const std::string& action_type, bool cancel_work, ERRCALLBACK err_call_back_func) { + DCHECK(action_type != std::string()); + _old_tracker_cb = tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb( + action_type, cancel_work, err_call_back_func); +} + +SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() { + tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); + DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); +} + +SwitchBthread::SwitchBthread() { + tls = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); + // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL + if (tls == nullptr) { + // Create thread-local data on demand. + tls = new ThreadContext; + tls->_thread_mem_tracker_mgr->init_bthread(); + // set the data so that next time bthread_getspecific in the thread returns the data. + CHECK_EQ(0, bthread_setspecific(btls_key, tls)); + } else { + tls->_thread_mem_tracker_mgr->init_bthread(); + } +} + +SwitchBthread::~SwitchBthread() { + DCHECK(tls != nullptr); + tls->_thread_mem_tracker_mgr->clear_untracked_mems(); +#ifndef NDEBUG + DorisMetrics::instance()->switch_bthread_count->increment(1); +#endif +} + +template class SwitchThreadMemTracker<true>; +template class SwitchThreadMemTracker<false>; + } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 4d9d60078c..8ab72be634 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -17,15 +17,17 @@ #pragma once +#include <service/brpc_conflict.h> +// After brpc_conflict.h +#include <bthread/bthread.h> + #include <string> #include <thread> #include "common/logging.h" -#include "gen_cpp/Types_types.h" -#include "runtime/runtime_state.h" +#include "gen_cpp/PaloInternalService_types.h" // for TQueryType #include "runtime/thread_mem_tracker_mgr.h" #include "runtime/threadlocal.h" -#include "util/doris_metrics.h" // Attach to task when thread starts #define SCOPED_ATTACH_TASK_THREAD(type, ...) \ @@ -34,34 +36,50 @@ // may be different from the order of execution of instructions, which will cause the position of // the memory track to be unexpected. #define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \ - auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(true) + auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(true) #define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \ - auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(false) + auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(false) // Switch thread mem tracker during task execution. // After the non-query thread switches the mem tracker, if the thread will not switch the mem // tracker again in the short term, can consider manually clear_untracked_mems. // The query thread will automatically clear_untracked_mems when detach_task. #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ - auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<false>(mem_tracker, false) + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker<false>(mem_tracker, false) #define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ - auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<false>(mem_tracker, true); + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker<false>(mem_tracker, true); +#define SCOPED_SWITCH_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker<true>(mem_tracker, false) #define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ - auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<true>(mem_tracker, true) + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker<true>(mem_tracker, true) // After the non-query thread switches the mem tracker, if the thread will not switch the mem // tracker again in the short term, can consider manually clear_untracked_mems. // The query thread will automatically clear_untracked_mems when detach_task. #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(mem_tracker) \ - auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTrackerEndClear(mem_tracker) + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTrackerEndClear(mem_tracker) #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \ auto VARNAME_LINENUM(witch_tracker_cb) = \ - SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__) + doris::SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__) +#define SCOPED_SWITCH_BTHREAD() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() +// Before switching the same tracker multiple times, add tracker as early as possible, +// and then call `SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER` to reduce one map find. +// For example, in the exec_node open phase `add tracker`, it is no longer necessary to determine +// whether the tracker exists in TLS when switching the tracker in the exec_node get_next phase. +// TODO(zxy): Duplicate add tracker is currently prohibited, because it will, +// 1. waste time 2. `_untracked_mems[mem_tracker->id()] = 0` will cause the memory track to be lost. +// Some places may have to repeat the add tracker. optimize after. #define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ - thread_local_ctx.get()->_thread_mem_tracker_mgr->add_tracker(mem_tracker) + doris::tls_ctx()->_thread_mem_tracker_mgr->add_tracker(mem_tracker) +#define CONSUME_THREAD_LOCAL_MEM_TRACKER(size) \ + doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(size) +#define RELEASE_THREAD_LOCAL_MEM_TRACKER(size) \ + doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(-size) namespace doris { class TUniqueId; +extern bthread_key_t btls_key; + // The thread context saves some info about a working thread. // 2 requried info: // 1. thread_id: Current thread id, Auto generated. @@ -80,19 +98,20 @@ public: STORAGE = 4 // to be added ... }; - inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", "STORAGE"}; + inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", + "STORAGE"}; public: - ThreadContext() : _thread_id(std::this_thread::get_id()), _type(TaskType::UNKNOWN) { + ThreadContext() : _type(TaskType::UNKNOWN) { _thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr()); - std::stringstream ss; - ss << _thread_id; - _thread_id_str = ss.str(); + _thread_mem_tracker_mgr->init(); + start_thread_mem_tracker = true; + _thread_id = get_thread_id(); } void attach(const TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id, - const std::shared_ptr<MemTracker>& mem_tracker) { + const std::shared_ptr<doris::MemTracker>& mem_tracker) { DCHECK(_type == TaskType::UNKNOWN && _task_id == "") << ",old tracker label: " << mem_tracker->label() << ",new tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label(); @@ -111,10 +130,15 @@ public: } const std::string& task_id() const { return _task_id; } - const std::thread::id& thread_id() const { return _thread_id; } - const std::string& thread_id_str() const { return _thread_id_str; } + const std::string& thread_id_str() const { return _thread_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } + std::string get_thread_id() { + std::stringstream ss; + ss << std::this_thread::get_id(); + return ss.str(); + } + void consume_mem(int64_t size) { if (start_thread_mem_tracker) { _thread_mem_tracker_mgr->cache_consume(size); @@ -136,8 +160,7 @@ public: std::unique_ptr<ThreadMemTrackerMgr> _thread_mem_tracker_mgr; private: - std::thread::id _thread_id; - std::string _thread_id_str; + std::string _thread_id; TaskType _type; std::string _task_id; TUniqueId _fragment_instance_id; @@ -178,55 +201,33 @@ private: inline thread_local ThreadContextPtr thread_local_ctx; +static ThreadContext* tls_ctx() { + ThreadContext* tls = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); + if (tls != nullptr) { + return tls; + } else { + return thread_local_ctx.get(); + } +} + class AttachTaskThread { public: explicit AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id = TUniqueId(), - const std::shared_ptr<MemTracker>& mem_tracker = nullptr) { - DCHECK(task_id != ""); - thread_local_ctx.get()->attach(type, task_id, fragment_instance_id, mem_tracker); - } + const std::shared_ptr<doris::MemTracker>& mem_tracker = nullptr); explicit AttachTaskThread(const ThreadContext::TaskType& type, - const std::shared_ptr<MemTracker>& mem_tracker) { -#ifndef BE_TEST - DCHECK(mem_tracker); -#endif - thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker); - } + const std::shared_ptr<doris::MemTracker>& mem_tracker); explicit AttachTaskThread(const TQueryType::type& query_type, - const std::shared_ptr<MemTracker>& mem_tracker) { -#ifndef BE_TEST - DCHECK(mem_tracker); -#endif - thread_local_ctx.get()->attach(query_to_task_type(query_type), "", TUniqueId(), - mem_tracker); - } + const std::shared_ptr<doris::MemTracker>& mem_tracker); - explicit AttachTaskThread(const TQueryType::type& query_type, const std::string& task_id, - const TUniqueId& fragment_instance_id, - const std::shared_ptr<MemTracker>& mem_tracker) { -#ifndef BE_TEST - DCHECK(task_id != ""); - DCHECK(fragment_instance_id != TUniqueId()); - DCHECK(mem_tracker); -#endif - thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id, - fragment_instance_id, mem_tracker); - } + explicit AttachTaskThread(const TQueryType::type& query_type, + const std::shared_ptr<doris::MemTracker>& mem_tracker, + const std::string& task_id, const TUniqueId& fragment_instance_id); explicit AttachTaskThread(const RuntimeState* runtime_state, - const std::shared_ptr<MemTracker>& mem_tracker) { -#ifndef BE_TEST - DCHECK(print_id(runtime_state->query_id()) != ""); - DCHECK(runtime_state->fragment_instance_id() != TUniqueId()); - DCHECK(mem_tracker); -#endif - thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()), - print_id(runtime_state->query_id()), - runtime_state->fragment_instance_id(), mem_tracker); - } + const std::shared_ptr<doris::MemTracker>& mem_tracker); const ThreadContext::TaskType query_to_task_type(const TQueryType::type& query_type) { switch (query_type) { @@ -240,10 +241,7 @@ public: } } - ~AttachTaskThread() { - thread_local_ctx.get()->detach(); - DorisMetrics::instance()->attach_task_thread_count->increment(1); - } + ~AttachTaskThread(); }; class StopThreadMemTracker { @@ -263,36 +261,10 @@ private: template <bool Existed> class SwitchThreadMemTracker { public: - explicit SwitchThreadMemTracker(const std::shared_ptr<MemTracker>& mem_tracker, - bool in_task = true) { - if (config::memory_verbose_track) { -#ifndef BE_TEST - DCHECK(mem_tracker); - // The thread tracker must be switched after the attach task, otherwise switching - // in the main thread will cause the cached tracker not be cleaned up in time. - DCHECK(in_task == false || - thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task()); - if (Existed) { - _old_tracker_id = - thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<true>( - mem_tracker); - } else { - _old_tracker_id = - thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<false>( - mem_tracker); - } -#endif - } - } + explicit SwitchThreadMemTracker(const std::shared_ptr<doris::MemTracker>& mem_tracker, + bool in_task = true); - ~SwitchThreadMemTracker() { - if (config::memory_verbose_track) { -#ifndef BE_TEST - thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id); - DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1); -#endif - } - } + ~SwitchThreadMemTracker(); protected: int64_t _old_tracker_id = 0; @@ -300,11 +272,11 @@ protected: class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker<false> { public: - explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr<MemTracker>& mem_tracker) + explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr<doris::MemTracker>& mem_tracker) : SwitchThreadMemTracker<false>(mem_tracker, false) {} ~SwitchThreadMemTrackerEndClear() { - thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems(); + tls_ctx()->_thread_mem_tracker_mgr->clear_untracked_mems(); } }; @@ -312,19 +284,22 @@ class SwitchThreadMemTrackerErrCallBack { public: explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type, bool cancel_work = true, - ERRCALLBACK err_call_back_func = nullptr) { - DCHECK(action_type != std::string()); - _old_tracker_cb = thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb( - action_type, cancel_work, err_call_back_func); - } + ERRCALLBACK err_call_back_func = nullptr); - ~SwitchThreadMemTrackerErrCallBack() { - thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); - DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); - } + ~SwitchThreadMemTrackerErrCallBack(); private: ConsumeErrCallBackInfo _old_tracker_cb; }; +class SwitchBthread { +public: + explicit SwitchBthread(); + + ~SwitchBthread(); + +private: + ThreadContext* tls; +}; + } // namespace doris diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp index 06fd521faf..e55a4620f0 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/thread_mem_tracker_mgr.cpp @@ -17,6 +17,8 @@ #include "runtime/thread_mem_tracker_mgr.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" #include "runtime/mem_tracker_task_pool.h" #include "service/backend_options.h" @@ -25,6 +27,7 @@ namespace doris { void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr<MemTracker>& mem_tracker) { + DCHECK(switch_count == 0) << print_debug_string(); _task_id = task_id; _fragment_instance_id = fragment_instance_id; _consume_err_cb.cancel_msg = cancel_msg; @@ -44,26 +47,15 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: } void ThreadMemTrackerMgr::detach_task() { + DCHECK(switch_count == 0) << print_debug_string(); _task_id = ""; _fragment_instance_id = TUniqueId(); _consume_err_cb.init(); clear_untracked_mems(); - _tracker_id = 0; - // The following memory changes for the two map operations of _untracked_mems and _mem_trackers - // will be re-recorded in _untracked_mem. - _untracked_mems.clear(); - _untracked_mems[0] = 0; - _mem_trackers.clear(); - _mem_trackers[0] = MemTracker::get_process_tracker(); - _mem_tracker_labels.clear(); - _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); + init(); } void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { - _temp_task_mem_tracker = - ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( - _task_id); - DCHECK(_temp_task_mem_tracker); if (_fragment_instance_id != TUniqueId()) { ExecEnv::GetInstance()->fragment_mgr()->cancel( _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 4ca2adba3e..404837a73a 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -20,8 +20,6 @@ #include <fmt/format.h> #include <parallel_hashmap/phmap.h> -#include "runtime/exec_env.h" -#include "runtime/fragment_mgr.h" #include "runtime/mem_tracker.h" namespace doris { @@ -61,33 +59,20 @@ inline thread_local bool start_thread_mem_tracker = false; // need to manually call cosume after stop_mem_tracker, and then start_mem_tracker. class ThreadMemTrackerMgr { public: - ThreadMemTrackerMgr() { - _mem_trackers[0] = MemTracker::get_process_tracker(); - _untracked_mems[0] = 0; - _tracker_id = 0; - _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); - start_thread_mem_tracker = true; - } + ThreadMemTrackerMgr() {} + ~ThreadMemTrackerMgr() { clear_untracked_mems(); start_thread_mem_tracker = false; } - void clear_untracked_mems() { - for (const auto& untracked_mem : _untracked_mems) { - if (untracked_mem.second != 0) { - DCHECK(_mem_trackers[untracked_mem.first]) - << ", label: " << _mem_tracker_labels[untracked_mem.first]; - if (_mem_trackers[untracked_mem.first]) { - _mem_trackers[untracked_mem.first]->consume(untracked_mem.second); - } else { - MemTracker::get_process_tracker()->consume(untracked_mem.second); - } - } - } - mem_tracker()->consume(_untracked_mem); - _untracked_mem = 0; - } + // After thread initialization, calling `init` again must call `clear_untracked_mems` first + // to avoid memory tracking loss. + void init(); + + void init_bthread(); + + void clear_untracked_mems(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_task(const std::string& cancel_msg, const std::string& task_id, @@ -96,21 +81,18 @@ public: void detach_task(); - // Must be fast enough! - // Thread update_tracker may be called very frequently, adding a memory copy will be slow. + // Must be fast enough! Thread update_tracker may be called very frequently. + // So for performance, add tracker as early as possible, and then call update_tracker<Existed>. template <bool Existed> int64_t update_tracker(const std::shared_ptr<MemTracker>& mem_tracker); void update_tracker_id(int64_t tracker_id); - void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { - _mem_trackers[mem_tracker->id()] = mem_tracker; - DCHECK(_mem_trackers[mem_tracker->id()]); - _untracked_mems[mem_tracker->id()] = 0; - _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); - } + // Before switching the same tracker multiple times, add tracker as early as possible, + // update_tracker<true> can reduce one map find. + void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker); - ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg, - bool cancel_task, ERRCALLBACK cb_func) { + ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg, bool cancel_task, + ERRCALLBACK cb_func) { _temp_consume_err_cb = _consume_err_cb; _consume_err_cb.cancel_msg = cancel_msg; _consume_err_cb.cancel_task = cancel_task; @@ -127,17 +109,34 @@ public: // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, void cache_consume(int64_t size); - void noncache_consume(); + void noncache_consume(int64_t size); bool is_attach_task() { return _task_id != ""; } - std::shared_ptr<MemTracker> mem_tracker() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - if (_mem_trackers[_tracker_id]) { - return _mem_trackers[_tracker_id]; - } else { - return MemTracker::get_process_tracker(); + std::shared_ptr<MemTracker> mem_tracker(); + + int64_t switch_count = 0; + + std::string print_debug_string() { + fmt::memory_buffer mem_trackers_buf; + for (const auto& [key, value] : _mem_trackers) { + fmt::format_to(mem_trackers_buf, "{}_{},", std::to_string(key), value->log_usage(1)); } + fmt::memory_buffer untracked_mems_buf; + for (const auto& [key, value] : _untracked_mems) { + fmt::format_to(untracked_mems_buf, "{}_{},", std::to_string(key), + std::to_string(value)); + } + fmt::memory_buffer mem_tracker_labels_buf; + for (const auto& [key, value] : _mem_tracker_labels) { + fmt::format_to(mem_tracker_labels_buf, "{}_{},", std::to_string(key), value); + } + return fmt::format( + "ThreadMemTrackerMgr debug string, _tracker_id:{}, _untracked_mem:{}, _task_id:{}, " + "_mem_trackers:<{}>, _untracked_mems:<{}>, _mem_tracker_labels:<{}>", + std::to_string(_tracker_id), std::to_string(_untracked_mem), _task_id, + fmt::to_string(mem_trackers_buf), fmt::to_string(untracked_mems_buf), + fmt::to_string(mem_tracker_labels_buf)); } private: @@ -175,39 +174,71 @@ private: ConsumeErrCallBackInfo _consume_err_cb; }; +inline void ThreadMemTrackerMgr::init() { + _tracker_id = 0; + _mem_trackers.clear(); + _mem_trackers[0] = MemTracker::get_process_tracker(); + _untracked_mems.clear(); + _untracked_mems[0] = 0; + _mem_tracker_labels.clear(); + _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); +} + +inline void ThreadMemTrackerMgr::init_bthread() { + init(); + _mem_trackers[1] = MemTracker::get_brpc_server_tracker(); + _untracked_mems[1] = 0; + _mem_tracker_labels[1] = MemTracker::get_brpc_server_tracker()->label(); + _tracker_id = 1; +} + +inline void ThreadMemTrackerMgr::clear_untracked_mems() { + for (const auto& untracked_mem : _untracked_mems) { + if (untracked_mem.second != 0) { + DCHECK(_mem_trackers[untracked_mem.first]) << print_debug_string(); + _mem_trackers[untracked_mem.first]->consume(untracked_mem.second); + } + } + mem_tracker()->consume(_untracked_mem); + _untracked_mem = 0; +} + template <bool Existed> inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { - DCHECK(mem_tracker); + DCHECK(mem_tracker) << print_debug_string(); _temp_tracker_id = mem_tracker->id(); if (_temp_tracker_id == _tracker_id) { return _tracker_id; } if (Existed) { - DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end()); + DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end()) << print_debug_string(); } else { + // If the tracker has already been added, avoid `_untracked_mems[x] = 0;` again causing the memory track to be lost. if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) { _mem_trackers[_temp_tracker_id] = mem_tracker; - DCHECK(_mem_trackers[_temp_tracker_id]); + DCHECK(_mem_trackers[_temp_tracker_id]) << print_debug_string(); _untracked_mems[_temp_tracker_id] = 0; _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); } } + DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << print_debug_string(); + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); _untracked_mems[_tracker_id] += _untracked_mem; _untracked_mem = 0; std::swap(_tracker_id, _temp_tracker_id); - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); return _temp_tracker_id; // old tracker_id } inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) { + DCHECK(switch_count >= 0) << print_debug_string(); if (tracker_id != _tracker_id) { _untracked_mems[_tracker_id] += _untracked_mem; _untracked_mem = 0; _tracker_id = tracker_id; - DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) - << ", label: " << _mem_tracker_labels[_tracker_id]; - DCHECK(_mem_trackers[_tracker_id]); + DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string(); + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); } } @@ -218,7 +249,7 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { // it will cause tracker->consumption to be temporarily less than 0. if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes || _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) { - DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()); + DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string(); // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion. // Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses. start_thread_mem_tracker = false; @@ -227,21 +258,36 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - noncache_consume(); + noncache_consume(_untracked_mem); + _untracked_mem = 0; start_thread_mem_tracker = true; } } -inline void ThreadMemTrackerMgr::noncache_consume() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - Status st = mem_tracker()->try_consume(_untracked_mem); +inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) { + Status st = mem_tracker()->try_consume(size); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. - mem_tracker()->consume(_untracked_mem); - exceeded(_untracked_mem, st); + mem_tracker()->consume(size); + exceeded(size, st); } - _untracked_mem = 0; +} + +inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { + DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) << print_debug_string(); + _mem_trackers[mem_tracker->id()] = mem_tracker; + DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string(); + _untracked_mems[mem_tracker->id()] = 0; + _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); +} + +inline std::shared_ptr<MemTracker> ThreadMemTrackerMgr::mem_tracker() { + // Whether the key _tracker_id exists in _mem_trackers. + DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << print_debug_string(); + // If the key _tracker_id exists in _mem_trackers, check whether the value is null. + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); + return _mem_trackers[_tracker_id]; } } // namespace doris diff --git a/be/src/service/brpc.h b/be/src/service/brpc.h index 031a9d6697..6e1b348ac7 100644 --- a/be/src/service/brpc.h +++ b/be/src/service/brpc.h @@ -17,33 +17,10 @@ #pragma once -// This file is used to fixed macro conflict between butil and gutil // all header need by brpc is contain in this file. -// include this file instead of include <brpc/xxx.h> -// and this file must put the first include in source file +// include this file instead of include <brpc/xxx.h>. -#include "gutil/macros.h" -// Macros in the guti/macros.h, use butil's define -#ifdef DISALLOW_IMPLICIT_CONSTRUCTORS -#undef DISALLOW_IMPLICIT_CONSTRUCTORS -#endif - -#ifdef arraysize -#undef arraysize -#endif - -#undef OVERRIDE -#undef FINAL - -// use be/src/gutil/integral_types.h override butil/basictypes.h -#include "gutil/integral_types.h" -#ifdef BASE_INTEGRAL_TYPES_H_ -#define BUTIL_BASICTYPES_H_ -#endif - -#ifdef DEBUG_MODE -#undef DEBUG_MODE -#endif +#include <service/brpc_conflict.h> #include <brpc/channel.h> #include <brpc/closure_guard.h> @@ -51,6 +28,8 @@ #include <brpc/protocol.h> #include <brpc/reloadable_flags.h> #include <brpc/server.h> +#include <bthread/bthread.h> +#include <bthread/types.h> #include <butil/containers/flat_map.h> #include <butil/containers/flat_map_inl.h> #include <butil/endpoint.h> diff --git a/be/src/service/brpc.h b/be/src/service/brpc_conflict.h similarity index 75% copy from be/src/service/brpc.h copy to be/src/service/brpc_conflict.h index 031a9d6697..35ef1b815c 100644 --- a/be/src/service/brpc.h +++ b/be/src/service/brpc_conflict.h @@ -18,8 +18,6 @@ #pragma once // This file is used to fixed macro conflict between butil and gutil -// all header need by brpc is contain in this file. -// include this file instead of include <brpc/xxx.h> // and this file must put the first include in source file #include "gutil/macros.h" @@ -32,6 +30,10 @@ #undef arraysize #endif +#ifdef ARRAY_SIZE +#undef ARRAY_SIZE +#endif + #undef OVERRIDE #undef FINAL @@ -44,15 +46,3 @@ #ifdef DEBUG_MODE #undef DEBUG_MODE #endif - -#include <brpc/channel.h> -#include <brpc/closure_guard.h> -#include <brpc/controller.h> -#include <brpc/protocol.h> -#include <brpc/reloadable_flags.h> -#include <brpc/server.h> -#include <butil/containers/flat_map.h> -#include <butil/containers/flat_map_inl.h> -#include <butil/endpoint.h> -#include <butil/fd_utility.h> -#include <butil/macros.h> diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 4cb6b8f7ee..5080b1c218 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -29,6 +29,7 @@ #include "runtime/result_buffer_mgr.h" #include "runtime/routine_load/routine_load_task_executor.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "service/brpc.h" #include "util/brpc_client_cache.h" #include "util/md5.h" @@ -42,16 +43,24 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT); +bthread_key_t btls_key; + +static void thread_context_deleter(void* d) { + delete static_cast<ThreadContext*>(d); +} + template <typename T> PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), _tablet_worker_pool(config::number_tablet_writer_threads, 10240) { REGISTER_HOOK_METRIC(add_batch_task_queue_size, [this]() { return _tablet_worker_pool.get_queue_size(); }); + CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); } template <typename T> PInternalServiceImpl<T>::~PInternalServiceImpl() { DEREGISTER_HOOK_METRIC(add_batch_task_queue_size); + CHECK_EQ(0, bthread_key_delete(btls_key)); } template <typename T> @@ -59,6 +68,7 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); @@ -84,6 +94,7 @@ void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); brpc::ClosureGuard closure_guard(done); @@ -101,6 +112,7 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; @@ -116,6 +128,7 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer add batch, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); @@ -150,6 +163,7 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); brpc::ClosureGuard closure_guard(done); @@ -177,6 +191,7 @@ void PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId tid; tid.__set_hi(request->finst_id().hi()); @@ -201,6 +216,7 @@ template <typename T> void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_base, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); @@ -210,6 +226,7 @@ template <typename T> void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: @@ -272,6 +289,7 @@ void PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController* cont const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); } @@ -281,6 +299,7 @@ void PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController* contr const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); } @@ -290,6 +309,7 @@ void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* contr const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); } @@ -299,6 +319,7 @@ void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* co const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto buf = static_cast<brpc::Controller*>(controller)->request_attachment(); Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data()); @@ -313,6 +334,7 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); UniqueId unique_id(request->query_id()); @@ -329,6 +351,7 @@ template <typename T> void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -352,6 +375,7 @@ template <typename T> void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -370,6 +394,7 @@ template <typename T> void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -389,6 +414,7 @@ void PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); @@ -425,6 +451,7 @@ void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); @@ -450,6 +477,7 @@ void PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController* const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->data().size() != request->size()) { @@ -477,6 +505,7 @@ void PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController* const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { @@ -511,6 +540,7 @@ void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_b const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { response->set_hello(request->hello()); diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h index 7526a7728a..dabf87ee0f 100644 --- a/be/src/util/bit_util.h +++ b/be/src/util/bit_util.h @@ -25,7 +25,6 @@ #include "common/compiler_util.h" #include "gutil/bits.h" -#include "gutil/port.h" #include "util/cpu_info.h" #ifdef __aarch64__ #include "sse2neon.h" diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 0163815c76..8bd9b05847 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -137,6 +137,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MIC DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_count, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_err_cb_count, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_bthread_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES); DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num, MetricUnit::NOUNIT); @@ -286,6 +287,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, attach_task_thread_count); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_thread_mem_tracker_count); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_thread_mem_tracker_err_cb_count); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_bthread_count); _server_metric_entity->register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this)); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index aa59d1770b..602eb78a7e 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -130,6 +130,8 @@ public: IntCounter* attach_task_thread_count; IntCounter* switch_thread_mem_tracker_count; IntCounter* switch_thread_mem_tracker_err_cb_count; + // brpc server response count + IntCounter* switch_bthread_count; IntGauge* memory_pool_bytes_total; IntGauge* process_thread_num; diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp index 3fd4a3bba6..a971acce86 100644 --- a/be/src/util/file_utils.cpp +++ b/be/src/util/file_utils.cpp @@ -34,6 +34,7 @@ #include "gutil/strings/strip.h" #include "gutil/strings/substitute.h" #include "olap/file_helper.h" +#include "runtime/thread_context.h" #include "util/defer_op.h" namespace doris { @@ -196,11 +197,13 @@ Status FileUtils::md5sum(const std::string& file, std::string* md5sum) { return Status::InternalError("failed to stat file"); } size_t file_len = statbuf.st_size; + CONSUME_THREAD_LOCAL_MEM_TRACKER(file_len); void* buf = mmap(0, file_len, PROT_READ, MAP_SHARED, fd, 0); unsigned char result[MD5_DIGEST_LENGTH]; MD5((unsigned char*)buf, file_len, result); munmap(buf, file_len); + RELEASE_THREAD_LOCAL_MEM_TRACKER(file_len); std::stringstream ss; for (int32_t i = 0; i < MD5_DIGEST_LENGTH; i++) { diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 2a50dabf5c..216864dbb8 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -29,6 +29,7 @@ #include <exception> #include "common/status.h" +#include "runtime/thread_context.h" #ifdef NDEBUG #define ALLOCATOR_ASLR 0 @@ -137,15 +138,18 @@ public: } else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) { /// Resize mmap'd memory region. // CurrentMemoryTracker::realloc(old_size, new_size); + CONSUME_THREAD_LOCAL_MEM_TRACKER(new_size - old_size); // On apple and freebsd self-implemented mremap used (common/mremap.h) buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); - if (MAP_FAILED == buf) + if (MAP_FAILED == buf){ + RELEASE_THREAD_LOCAL_MEM_TRACKER(new_size - old_size); doris::vectorized::throwFromErrno("Allocator: Cannot mremap memory chunk from " + std::to_string(old_size) + " to " + std::to_string(new_size) + ".", doris::TStatusCode::VEC_CANNOT_MREMAP); + } /// No need for zero-fill, because mmap guarantees it. } else if (new_size < MMAP_THRESHOLD) { @@ -197,10 +201,13 @@ private: alignment, size), doris::TStatusCode::VEC_BAD_ARGUMENTS); + CONSUME_THREAD_LOCAL_MEM_TRACKER(size); buf = mmap(get_mmap_hint(), size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); - if (MAP_FAILED == buf) + if (MAP_FAILED == buf) { + RELEASE_THREAD_LOCAL_MEM_TRACKER(size); doris::vectorized::throwFromErrno(fmt::format("Allocator: Cannot mmap {}.", size), doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY); + } /// No need for zero-fill, because mmap guarantees it. } else { @@ -231,9 +238,12 @@ private: void free_no_track(void* buf, size_t size) { if (size >= MMAP_THRESHOLD) { - if (0 != munmap(buf, size)) + if (0 != munmap(buf, size)) { doris::vectorized::throwFromErrno(fmt::format("Allocator: Cannot munmap {}.", size), doris::TStatusCode::VEC_CANNOT_MUNMAP); + } else { + RELEASE_THREAD_LOCAL_MEM_TRACKER(size); + } } else { ::free(buf); } diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index ea4e61e7fe..57cbcff921 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -66,6 +66,7 @@ Status VExchangeNode::prepare(RuntimeState* state) { Status VExchangeNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); RETURN_IF_ERROR(ExecNode::open(state)); if (_is_merging) { @@ -84,7 +85,6 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { SCOPED_TIMER(runtime_profile()->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker()); - ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); auto status = _stream_recvr->get_next(block, eos); if (block != nullptr) { if (_num_rows_returned + block->rows() < _limit) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org