This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new ca05d1ee01 [fix](memory tracker) Fix lru cache, compaction tracker, add USE_MEM_TRACKER compile (#9661) ca05d1ee01 is described below commit ca05d1ee017f491e896ffdc41dea2b70f2a4de0b Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed May 25 08:56:17 2022 +0800 [fix](memory tracker) Fix lru cache, compaction tracker, add USE_MEM_TRACKER compile (#9661) 1. Fix Lru Cache MemTracker consumption value is negative. 2. Fix compaction Cache MemTracker has no track. 3. Add USE_MEM_TRACKER compile option. 4. Make sure the malloc/free hook is not stopped at any time. --- be/CMakeLists.txt | 8 ++++++++ be/src/agent/task_worker_pool.cpp | 2 ++ be/src/exec/json_scanner.h | 4 ++++ be/src/gutil/strings/numbers.cc | 25 +++++++++++++++--------- be/src/olap/compaction.cpp | 13 ++++++++++--- be/src/olap/compaction.h | 2 ++ be/src/olap/lru_cache.cpp | 20 ++++++++----------- be/src/olap/lru_cache.h | 8 +++++++- be/src/olap/olap_server.cpp | 2 ++ be/src/olap/tablet.cpp | 9 +++++++++ be/src/olap/tablet.h | 2 ++ be/src/runtime/load_channel.cpp | 2 +- be/src/runtime/mem_tracker.cpp | 2 +- be/src/runtime/mem_tracker.h | 14 ++++++++++++++ be/src/runtime/result_sink.cpp | 7 +++---- be/src/runtime/thread_context.cpp | 34 ++++++++++++++++++++++++++++----- be/src/runtime/thread_context.h | 29 ++++++++++++++++++++++------ be/src/runtime/thread_mem_tracker_mgr.h | 28 ++++++++++++++++++++------- be/src/service/doris_main.cpp | 2 ++ be/src/vec/sink/result_sink.cpp | 3 +++ be/test/olap/lru_cache_test.cpp | 21 ++++++++++---------- build.sh | 5 +++++ run-be-ut.sh | 1 + 23 files changed, 183 insertions(+), 60 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 76edc3ba27..7c53258326 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -438,6 +438,14 @@ if (WITH_LZO) set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_LZO") endif() +# Enable memory tracker, which allows BE to limit the memory of tasks such as query, load, +# and compaction,and observe the memory of BE through be_ip:http_port/MemTracker. +# Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn off the memory tracker, +# which will bring about a 2% performance improvement, which may be useful in performance POC. +if (USE_MEM_TRACKER) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DUSE_MEM_TRACKER") +endif() + if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new") endif() diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 0ef2325d2f..0a30aad20a 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1609,6 +1609,8 @@ void TaskWorkerPool::_random_sleep(int second) { } void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() { + SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION, + StorageEngine::instance()->compaction_mem_tracker()); while (_is_work) { TAgentTaskRequest agent_task_req; TCompactionReq compaction_req; diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index ab2f479e60..5dc61c18cb 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -64,6 +64,10 @@ public: // Get next tuple Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override; + Status get_next(vectorized::Block* block, bool* eof) override { + return Status::NotSupported("Not Implemented get block"); + } + // Close this scanner void close() override; diff --git a/be/src/gutil/strings/numbers.cc b/be/src/gutil/strings/numbers.cc index 568b0ce2a4..46be289fb8 100644 --- a/be/src/gutil/strings/numbers.cc +++ b/be/src/gutil/strings/numbers.cc @@ -1442,7 +1442,6 @@ char* SimpleItoaWithCommas(__int128_t i, char* buffer, int32_t buffer_size) { return p; } - // ---------------------------------------------------------------------- // ItoaKMGT() // Description: converts an integer to a string @@ -1480,7 +1479,7 @@ string ItoaKMGT(int64 i) { } string AccurateItoaKMGT(int64 i) { - const char *sign = ""; + const char* sign = ""; if (i < 0) { // We lose some accuracy if the caller passes LONG_LONG_MIN, but // that's OK as this function is only for human readability @@ -1489,31 +1488,39 @@ string AccurateItoaKMGT(int64 i) { i = -i; } - string ret = std::to_string(i) + " : " + StringPrintf("%s", sign); + string ret = std::to_string(i) + " = " + StringPrintf("%s", sign); int64 val; if ((val = (i >> 40)) > 1) { - ret += StringPrintf("%" PRId64 "%s", val, "T"); + ret += StringPrintf("%" PRId64 + "%s" + " + ", + val, "T"); i = i - (val << 40); } if ((val = (i >> 30)) > 1) { - ret += StringPrintf(" %" PRId64 "%s", val, "G"); + ret += StringPrintf("%" PRId64 + "%s" + " + ", + val, "G"); i = i - (val << 30); } if ((val = (i >> 20)) > 1) { - ret += StringPrintf(" %" PRId64 "%s", val, "M"); + ret += StringPrintf("%" PRId64 + "%s" + " + ", + val, "M"); i = i - (val << 20); } if ((val = (i >> 10)) > 1) { - ret += StringPrintf(" %" PRId64 "%s", val, "K"); + ret += StringPrintf("%" PRId64 "%s", val, "K"); i = i - (val << 10); } else { - ret += StringPrintf(" %" PRId64 "%s", i, "K"); + ret += StringPrintf("%" PRId64 "%s", i, "K"); } return ret; } - // DEPRECATED(wadetregaskis). // These are non-inline because some BUILD files turn on -Wformat-non-literal. diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index fdc07fc725..14569881a7 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -26,11 +26,18 @@ using std::vector; namespace doris { Compaction::Compaction(TabletSharedPtr tablet, const std::string& label) - : _mem_tracker(MemTracker::create_tracker(-1, label, nullptr, MemTrackerLevel::INSTANCE)), - _tablet(tablet), + : _tablet(tablet), _input_rowsets_size(0), _input_row_num(0), - _state(CompactionState::INITED) {} + _state(CompactionState::INITED) { +#ifndef BE_TEST + _mem_tracker = MemTracker::create_tracker(-1, label, + StorageEngine::instance()->compaction_mem_tracker(), + MemTrackerLevel::INSTANCE); +#else + _mem_tracker = MemTracker::get_process_tracker(); +#endif +} Compaction::~Compaction() {} diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 27c33dcc5a..7306bbacea 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -53,6 +53,8 @@ public: Status execute_compact(); virtual Status execute_compact_impl() = 0; + std::shared_ptr<MemTracker>& get_mem_tracker() { return _mem_tracker; } + protected: virtual Status pick_rowsets_to_compact() = 0; virtual std::string compaction_name() const = 0; diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 584d7dccb9..90b2e4ddf5 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -283,7 +283,7 @@ void LRUCache::_evict_one_entry(LRUHandle* e) { Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), - CachePriority priority) { + CachePriority priority, MemTracker* tracker) { size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(handle_size)); e->value = value; @@ -296,7 +296,12 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, e->next = e->prev = nullptr; e->in_cache = true; e->priority = priority; + e->mem_tracker = tracker; memcpy(e->key_data, key.data(), key.size()); + // The memory of the parameter value should be recorded in the tls mem tracker, + // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker. + if (tracker) + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(tracker, e->total_size); LRUHandle* to_remove_head = nullptr; { std::lock_guard<std::mutex> l(_mutex); @@ -433,7 +438,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity, : _name(name), _last_id(1), _mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(_mem_tracker); const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards; for (int s = 0; s < kNumShards; s++) { _shards[s] = new LRUCache(type); @@ -452,7 +456,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity, } ShardedLRUCache::~ShardedLRUCache() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); for (int s = 0; s < kNumShards; s++) { delete _shards[s]; } @@ -463,12 +466,9 @@ ShardedLRUCache::~ShardedLRUCache() { Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), CachePriority priority) { - // The memory of the parameter value should be recorded in the tls mem tracker, - // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker. - tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge); - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const uint32_t hash = _hash_slice(key); - return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority); + return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority, + _mem_tracker.get()); } Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) { @@ -477,13 +477,11 @@ Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) { } void ShardedLRUCache::release(Handle* handle) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); LRUHandle* h = reinterpret_cast<LRUHandle*>(handle); _shards[_shard(h->hash)]->release(handle); } void ShardedLRUCache::erase(const CacheKey& key) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const uint32_t hash = _hash_slice(key); _shards[_shard(hash)]->erase(key, hash); } @@ -502,7 +500,6 @@ uint64_t ShardedLRUCache::new_id() { } int64_t ShardedLRUCache::prune() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t num_prune = 0; for (int s = 0; s < kNumShards; s++) { num_prune += _shards[s]->prune(); @@ -511,7 +508,6 @@ int64_t ShardedLRUCache::prune() { } int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t num_prune = 0; for (int s = 0; s < kNumShards; s++) { num_prune += _shards[s]->prune_if(pred); diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index b6b1f95754..6fb87744f4 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -15,6 +15,7 @@ #include "olap/olap_common.h" #include "runtime/mem_tracker.h" +#include "runtime/thread_context.h" #include "util/metrics.h" #include "util/slice.h" @@ -236,6 +237,7 @@ typedef struct LRUHandle { uint32_t refs; uint32_t hash; // Hash of key(); used for fast sharding and comparisons CachePriority priority = CachePriority::NORMAL; + MemTracker* mem_tracker; char key_data[1]; // Beginning of key CacheKey key() const { @@ -250,6 +252,9 @@ typedef struct LRUHandle { void free() { (*deleter)(key(), value); + if (mem_tracker) + mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), + total_size); ::free(this); } @@ -308,7 +313,8 @@ public: // Like Cache methods, but with an extra "hash" parameter. Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), - CachePriority priority = CachePriority::NORMAL); + CachePriority priority = CachePriority::NORMAL, + MemTracker* tracker = nullptr); Cache::Handle* lookup(const CacheKey& key, uint32_t hash); void release(Cache::Handle* handle); void erase(const CacheKey& key, uint32_t hash); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index f7437693b1..f4732cbe04 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -564,6 +564,8 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits); if (st.ok() && permits > 0 && _permit_limiter.request(permits)) { auto st = _compaction_thread_pool->submit_func([=]() { + SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION, + tablet->get_compaction_mem_tracker(compaction_type)); CgroupsMgr::apply_system_cgroup(); tablet->execute_compaction(compaction_type); _permit_limiter.release(permits); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index cb073a8558..668c9ce67f 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1504,6 +1504,7 @@ Status Tablet::create_rowset_writer(const int64_t& txn_id, const PUniqueId& load void Tablet::_init_context_common_fields(RowsetWriterContext& context) { context.rowset_id = StorageEngine::instance()->next_rowset_id(); context.tablet_uid = tablet_uid(); + context.tablet_id = tablet_id(); context.partition_id = partition_id(); context.tablet_schema_hash = schema_hash(); @@ -1522,4 +1523,12 @@ Status Tablet::create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* r return RowsetFactory::create_rowset(&tablet_schema(), tablet_path_desc(), rowset_meta, rowset); } +std::shared_ptr<MemTracker>& Tablet::get_compaction_mem_tracker(CompactionType compaction_type) { + if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { + return _cumulative_compaction->get_mem_tracker(); + } else { + return _base_compaction->get_mem_tracker(); + } +} + } // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 1874afa4e8..3fb3144af1 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -253,6 +253,8 @@ public: return _cumulative_compaction_policy; } + std::shared_ptr<MemTracker>& get_compaction_mem_tracker(CompactionType compaction_type); + inline bool all_beta() const { std::shared_lock rdlock(_meta_lock); return _tablet_meta->all_beta(); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 85f2c5da3f..55038e2742 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -137,7 +137,7 @@ bool LoadChannel::is_finished() { } Status LoadChannel::cancel() { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard<std::mutex> l(_lock); for (auto& it : _tablets_channels) { it.second->cancel(); diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index bc2ae3efc1..359b0f5368 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -178,7 +178,7 @@ void MemTracker::init_virtual() { MemTracker::~MemTracker() { consume(_untracked_mem.exchange(0)); // before memory_leak_check // TCMalloc hook will be triggered during destructor memtracker, may cause crash. - if (_label == "Process") GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER(); + if (_label == "Process") STOP_THREAD_LOCAL_MEM_TRACKER(false); if (!_virtual && config::memory_leak_detection) MemTracker::memory_leak_check(this); if (!_virtual && parent()) { // Do not call release on the parent tracker to avoid repeated releases. diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 3d4eb740d2..c21d6d3db5 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -122,6 +122,7 @@ public: // Increases consumption of this tracker and its ancestors by 'bytes'. void consume(int64_t bytes) { +#ifdef USE_MEM_TRACKER if (bytes <= 0) { release(-bytes); return; @@ -129,6 +130,7 @@ public: for (auto& tracker : _all_trackers) { tracker->_consumption->add(bytes); } +#endif } // Increases consumption of this tracker and its ancestors by 'bytes' only if @@ -136,6 +138,7 @@ public: // no MemTrackers are updated. Returns true if the consumption was successfully updated. WARN_UNUSED_RESULT Status try_consume(int64_t bytes) { +#ifdef USE_MEM_TRACKER if (bytes <= 0) { release(-bytes); return Status::OK(); @@ -166,11 +169,13 @@ public: } // Everyone succeeded, return. DCHECK_EQ(i, -1); +#endif return Status::OK(); } // Decreases consumption of this tracker and its ancestors by 'bytes'. void release(int64_t bytes) { +#ifdef USE_MEM_TRACKER if (bytes < 0) { consume(-bytes); return; @@ -181,6 +186,7 @@ public: for (auto& tracker : _all_trackers) { tracker->_consumption->add(-bytes); } +#endif } static void batch_consume(int64_t bytes, @@ -247,22 +253,26 @@ public: // ancestor. This happens when we want to update tracking on a particular mem tracker but the consumption // against the limit recorded in one of its ancestors already happened. void consume_local(int64_t bytes, MemTracker* end_tracker) { +#ifdef USE_MEM_TRACKER DCHECK(end_tracker); if (bytes == 0) return; for (auto& tracker : _all_trackers) { if (tracker == end_tracker) return; tracker->_consumption->add(bytes); } +#endif } // up to (but not including) end_tracker. void release_local(int64_t bytes, MemTracker* end_tracker) { +#ifdef USE_MEM_TRACKER DCHECK(end_tracker); if (bytes == 0) return; for (auto& tracker : _all_trackers) { if (tracker == end_tracker) return; tracker->_consumption->add(-bytes); } +#endif } // Transfer 'bytes' of consumption from this tracker to 'dst'. @@ -273,6 +283,7 @@ public: WARN_UNUSED_RESULT Status try_transfer_to(MemTracker* dst, int64_t bytes) { +#ifdef USE_MEM_TRACKER if (id() == dst->id()) return Status::OK(); // Must release first, then consume release_cache(bytes); @@ -281,14 +292,17 @@ public: consume_cache(bytes); return st; } +#endif return Status::OK(); } // Forced transfer, 'dst' may limit exceed, and more ancestor trackers will be updated. void transfer_to(MemTracker* dst, int64_t bytes) { +#ifdef USE_MEM_TRACKER if (id() == dst->id()) return; release_cache(bytes); dst->consume_cache(bytes); +#endif } // Returns true if a valid limit of this tracker or one of its ancestors is exceeded. diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index cefe5d2181..7e67a8bfcb 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -88,10 +88,9 @@ Status ResultSink::open(RuntimeState* state) { } Status ResultSink::send(RuntimeState* state, RowBatch* batch) { - // The memory consumption in the process of sending the results is not recorded in the query memory. - // 1. Avoid the query being cancelled when the memory limit is reached after the query result comes out. - // 2. If record this memory, also need to record on the receiving end, need to consider the life cycle of MemTracker. - SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER(); + // The memory consumption in the process of sending the results is not check query memory limit. + // Avoid the query being cancelled when the memory limit is reached after the query result comes out. + STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER(); return _writer->append_row_batch(batch); } diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index fade4fd51e..b08642bf12 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -36,7 +36,9 @@ AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, const st const TUniqueId& fragment_instance_id, const std::shared_ptr<doris::MemTracker>& mem_tracker) { DCHECK(task_id != ""); +#ifdef USE_MEM_TRACKER tls_ctx()->attach(type, task_id, fragment_instance_id, mem_tracker); +#endif } AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, @@ -44,7 +46,9 @@ AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, #ifndef BE_TEST DCHECK(mem_tracker); #endif +#ifdef USE_MEM_TRACKER tls_ctx()->attach(type, "", TUniqueId(), mem_tracker); +#endif } AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type, @@ -52,7 +56,9 @@ AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type, #ifndef BE_TEST DCHECK(mem_tracker); #endif +#ifdef USE_MEM_TRACKER tls_ctx()->attach(query_to_task_type(query_type), "", TUniqueId(), mem_tracker); +#endif } AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type, @@ -64,7 +70,9 @@ AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type, DCHECK(fragment_instance_id != TUniqueId()); DCHECK(mem_tracker); #endif +#ifdef USE_MEM_TRACKER tls_ctx()->attach(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker); +#endif } AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state, @@ -74,19 +82,24 @@ AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state, DCHECK(runtime_state->fragment_instance_id() != TUniqueId()); DCHECK(mem_tracker); #endif +#ifdef USE_MEM_TRACKER tls_ctx()->attach(query_to_task_type(runtime_state->query_type()), print_id(runtime_state->query_id()), runtime_state->fragment_instance_id(), mem_tracker); +#endif } AttachTaskThread::~AttachTaskThread() { +#ifdef USE_MEM_TRACKER tls_ctx()->detach(); DorisMetrics::instance()->attach_task_thread_count->increment(1); +#endif } template <bool Existed> SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker( const std::shared_ptr<doris::MemTracker>& mem_tracker, bool in_task) { +#ifdef USE_MEM_TRACKER if (config::memory_verbose_track) { #ifndef BE_TEST DCHECK(mem_tracker); @@ -100,41 +113,49 @@ SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker( _old_tracker_id = tls_ctx()->_thread_mem_tracker_mgr->update_tracker<false>(mem_tracker); } -#endif +#endif // BE_TEST #ifndef NDEBUG tls_ctx()->_thread_mem_tracker_mgr->switch_count += 1; -#endif +#endif // NDEBUG } +#endif // USE_MEM_TRACKER } template <bool Existed> SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() { +#ifdef USE_MEM_TRACKER if (config::memory_verbose_track) { #ifndef NDEBUG tls_ctx()->_thread_mem_tracker_mgr->switch_count -= 1; DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1); -#endif +#endif // NDEBUG #ifndef BE_TEST tls_ctx()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id); -#endif +#endif // BE_TEST } +#endif // USE_MEM_TRACKER } SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack( const std::string& action_type, bool cancel_work, ERRCALLBACK err_call_back_func) { +#ifdef USE_MEM_TRACKER DCHECK(action_type != std::string()); _old_tracker_cb = tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb( action_type, cancel_work, err_call_back_func); +#endif } SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() { +#ifdef USE_MEM_TRACKER tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); #ifndef NDEBUG DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); #endif +#endif // USE_MEM_TRACKER } SwitchBthread::SwitchBthread() { +#ifdef USE_MEM_TRACKER tls = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL if (tls == nullptr) { @@ -148,16 +169,19 @@ SwitchBthread::SwitchBthread() { } tls->_thread_mem_tracker_mgr->init(); tls->set_type(ThreadContext::TaskType::BRPC); +#endif } SwitchBthread::~SwitchBthread() { +#ifdef USE_MEM_TRACKER DCHECK(tls != nullptr); tls->_thread_mem_tracker_mgr->clear_untracked_mems(); tls->_thread_mem_tracker_mgr->init(); tls->set_type(ThreadContext::TaskType::UNKNOWN); #ifndef NDEBUG DorisMetrics::instance()->switch_bthread_count->increment(1); -#endif +#endif // NDEBUG +#endif // USE_MEM_TRACKER } template class SwitchThreadMemTracker<true>; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 8dc8f5267e..0572c2b08d 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -35,10 +35,8 @@ // Be careful to stop the thread mem tracker, because the actual order of malloc and free memory // may be different from the order of execution of instructions, which will cause the position of // the memory track to be unexpected. -#define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \ - auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(true) -#define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \ - auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(false) +#define STOP_THREAD_LOCAL_MEM_TRACKER(scope) \ + auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(scope) // Switch thread mem tracker during task execution. // After the non-query thread switches the mem tracker, if the thread will not switch the mem // tracker again in the short term, can consider manually clear_untracked_mems. @@ -80,9 +78,11 @@ #define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ doris::tls_ctx()->_thread_mem_tracker_mgr->add_tracker(mem_tracker) #define CONSUME_THREAD_LOCAL_MEM_TRACKER(size) \ - doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(size) + doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_try_consume(size) #define RELEASE_THREAD_LOCAL_MEM_TRACKER(size) \ - doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(-size) + doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_try_consume(-size) +#define STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER() \ + auto VARNAME_LINENUM(switch_bthread) = StopCheckLimitThreadMemTracker() namespace doris { @@ -281,7 +281,9 @@ public: ~SwitchThreadMemTracker(); protected: +#ifdef USE_MEM_TRACKER int64_t _old_tracker_id = 0; +#endif }; class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker<false> { @@ -303,7 +305,9 @@ public: ~SwitchThreadMemTrackerErrCallBack(); private: +#ifdef USE_MEM_TRACKER ConsumeErrCallBackInfo _old_tracker_cb; +#endif }; class SwitchBthread { @@ -313,7 +317,20 @@ public: ~SwitchBthread(); private: +#ifdef USE_MEM_TRACKER ThreadContext* tls; +#endif +}; + +class StopCheckLimitThreadMemTracker { +public: + explicit StopCheckLimitThreadMemTracker() { + tls_ctx()->_thread_mem_tracker_mgr->update_check_limit(false); + } + + ~StopCheckLimitThreadMemTracker() { + tls_ctx()->_thread_mem_tracker_mgr->update_check_limit(true); + } }; } // namespace doris diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 84033ff4e7..754e231747 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -63,6 +63,10 @@ public: ~ThreadMemTrackerMgr() { clear_untracked_mems(); + _consume_err_cb.init(); + _mem_trackers.clear(); + _untracked_mems.clear(); + _mem_tracker_labels.clear(); start_thread_mem_tracker = false; } @@ -107,12 +111,14 @@ public: // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, void cache_consume(int64_t size); - void noncache_consume(int64_t size); + void noncache_try_consume(int64_t size); bool is_attach_task() { return _task_id != ""; } std::shared_ptr<MemTracker> mem_tracker(); + void update_check_limit(bool check_limit) { _check_limit = check_limit; } + int64_t switch_count = 0; std::string print_debug_string() { @@ -163,6 +169,8 @@ private: // we can confirm the tracker label that was added through _mem_tracker_labels. // Because for performance, all map keys are tracker id. phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels; + // If true, call memtracker try_consume, otherwise call consume. + bool _check_limit; int64_t _tracker_id; // Avoid memory allocation in functions. @@ -184,6 +192,7 @@ inline void ThreadMemTrackerMgr::init() { _untracked_mems[0] = 0; _mem_tracker_labels.clear(); _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); + _check_limit = true; } inline void ThreadMemTrackerMgr::clear_untracked_mems() { @@ -244,21 +253,26 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes || _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) { DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string(); - // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion. - // Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses. - start_thread_mem_tracker = false; // When switching to the current tracker last time, the remaining untracked memory. if (_untracked_mems[_tracker_id] != 0) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - noncache_consume(_untracked_mem); + // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, + // will enter infinite recursion. So the temporary memory allocated in mem_tracker.try_consume + // and mem_limit_exceeded will directly call consume. + if (_check_limit) { + _check_limit = false; + noncache_try_consume(_untracked_mem); + _check_limit = true; + } else { + mem_tracker()->consume(_untracked_mem); + } _untracked_mem = 0; - start_thread_mem_tracker = true; } } -inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) { +inline void ThreadMemTrackerMgr::noncache_try_consume(int64_t size) { Status st = mem_tracker()->try_consume(size); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index dd8804a56d..cf486b2c98 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -332,9 +332,11 @@ int main(int argc, char** argv) { fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n"); return -1; } +#ifdef USE_MEM_TRACKER if (doris::config::track_new_delete) { init_hook(); } +#endif // USE_MEM_TRACKER #endif std::vector<doris::StorePath> paths; diff --git a/be/src/vec/sink/result_sink.cpp b/be/src/vec/sink/result_sink.cpp index 9441fec8b9..ce3ac086cc 100644 --- a/be/src/vec/sink/result_sink.cpp +++ b/be/src/vec/sink/result_sink.cpp @@ -87,6 +87,9 @@ Status VResultSink::send(RuntimeState* state, RowBatch* batch) { } Status VResultSink::send(RuntimeState* state, Block* block) { + // The memory consumption in the process of sending the results is not check query memory limit. + // Avoid the query being cancelled when the memory limit is reached after the query result comes out. + STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER(); return _writer->append_block(*block); } diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp index a23a608f39..b7e18cc01e 100644 --- a/be/test/olap/lru_cache_test.cpp +++ b/be/test/olap/lru_cache_test.cpp @@ -226,45 +226,44 @@ static void insert_LRUCache(LRUCache& cache, const CacheKey& key, int value, TEST_F(CacheTest, Usage) { LRUCache cache(LRUCacheType::SIZE); - cache.set_capacity(1050); + cache.set_capacity(1040); // The lru usage is handle_size + charge. - // handle_size = sizeof(handle) - 1 + key size = 88 - 1 + 3 = 90 + // handle_size = sizeof(handle) - 1 + key size = 96 - 1 + 3 = 98 CacheKey key1("100"); insert_LRUCache(cache, key1, 100, CachePriority::NORMAL); - ASSERT_EQ(190, cache.get_usage()); // 100 + 90 + ASSERT_EQ(198, cache.get_usage()); // 100 + 98 CacheKey key2("200"); insert_LRUCache(cache, key2, 200, CachePriority::DURABLE); - ASSERT_EQ(480, cache.get_usage()); // 190 + 290(d) + ASSERT_EQ(496, cache.get_usage()); // 198 + 298(d), d = DURABLE CacheKey key3("300"); insert_LRUCache(cache, key3, 300, CachePriority::NORMAL); - ASSERT_EQ(870, cache.get_usage()); // 190 + 290(d) + 390 + ASSERT_EQ(894, cache.get_usage()); // 198 + 298(d) + 398 CacheKey key4("400"); insert_LRUCache(cache, key4, 400, CachePriority::NORMAL); - ASSERT_EQ(780, cache.get_usage()); // 290(d) + 490 + ASSERT_EQ(796, cache.get_usage()); // 298(d) + 498, evict 198 398 CacheKey key5("500"); insert_LRUCache(cache, key5, 500, CachePriority::NORMAL); - ASSERT_EQ(880, cache.get_usage()); // 290(d) + 590 + ASSERT_EQ(896, cache.get_usage()); // 298(d) + 598, evict 498 CacheKey key6("600"); insert_LRUCache(cache, key6, 600, CachePriority::NORMAL); - ASSERT_EQ(980, cache.get_usage()); // 290(d) + 690 + ASSERT_EQ(996, cache.get_usage()); // 298(d) + 698, evict 498 CacheKey key7("950"); insert_LRUCache(cache, key7, 950, CachePriority::DURABLE); - ASSERT_EQ(1040, cache.get_usage()); // 1040(d) + ASSERT_EQ(0, cache.get_usage()); // evict 298 698, because 950 + 98 > 1040, so insert failed } TEST_F(CacheTest, Prune) { LRUCache cache(LRUCacheType::NUMBER); cache.set_capacity(5); - // The lru usage is handle_size + charge = 96 - 1 = 95 - // 95 + 3 means handle_size + key size + // The lru usage is 1, add one entry CacheKey key1("100"); insert_LRUCache(cache, key1, 100, CachePriority::NORMAL); EXPECT_EQ(1, cache.get_usage()); diff --git a/build.sh b/build.sh index e3b3243eb8..907f5510ff 100755 --- a/build.sh +++ b/build.sh @@ -214,6 +214,9 @@ fi if [[ -z ${STRIP_DEBUG_INFO} ]]; then STRIP_DEBUG_INFO=OFF fi +if [[ -z ${USE_MEM_TRACKER} ]]; then + USE_MEM_TRACKER=ON +fi if [[ -z ${USE_DWARF} ]]; then USE_DWARF=OFF @@ -238,6 +241,7 @@ echo "Get params: USE_LLD -- $USE_LLD USE_DWARF -- $USE_DWARF STRIP_DEBUG_INFO -- $STRIP_DEBUG_INFO + USE_MEM_TRACKER -- $USE_MEM_TRACKER " # Clean and build generated code @@ -300,6 +304,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then -DBUILD_JAVA_UDF=${BUILD_JAVA_UDF} \ -DSTRIP_DEBUG_INFO=${STRIP_DEBUG_INFO} \ -DUSE_DWARF=${USE_DWARF} \ + -DUSE_MEM_TRACKER=${USE_MEM_TRACKER} \ -DUSE_AVX2=${USE_AVX2} \ -DGLIBC_COMPATIBILITY=${GLIBC_COMPATIBILITY} ../ ${BUILD_SYSTEM} -j ${PARALLEL} diff --git a/run-be-ut.sh b/run-be-ut.sh index a8b0950cce..d075693454 100755 --- a/run-be-ut.sh +++ b/run-be-ut.sh @@ -138,6 +138,7 @@ ${CMAKE_CMD} -G "${GENERATOR}" \ -DWITH_MYSQL=OFF \ -DWITH_KERBEROS=OFF \ -DUSE_DWARF=${USE_DWARF} \ + -DUSE_MEM_TRACKER=ON \ ${CMAKE_USE_CCACHE} ../ ${BUILD_SYSTEM} -j ${PARALLEL} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org