This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dd11d5c0a5 [enhancement](memory) Support try catch bad alloc (#14135) dd11d5c0a5 is described below commit dd11d5c0a59be9d2b5aa8e565dcf3152869e9da3 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Sun Nov 13 11:22:56 2022 +0800 [enhancement](memory) Support try catch bad alloc (#14135) --- be/src/common/config.h | 2 +- be/src/exec/hash_table.cpp | 3 +- be/src/exec/partitioned_aggregation_node.cc | 11 +- be/src/exec/partitioned_hash_table.cc | 3 +- be/src/http/default_path_handlers.cpp | 3 +- be/src/http/ev_http_server.cpp | 1 - be/src/olap/snapshot_manager.cpp | 6 +- be/src/olap/snapshot_manager.h | 4 +- be/src/olap/storage_engine.cpp | 6 +- be/src/olap/storage_engine.h | 4 +- be/src/olap/tablet_manager.cpp | 14 +-- be/src/olap/tablet_manager.h | 2 +- be/src/runtime/buffered_block_mgr2.cc | 19 +--- be/src/runtime/disk_io_mgr.cc | 1 - be/src/runtime/exec_env_init.cpp | 3 +- be/src/runtime/mem_pool.cpp | 4 +- be/src/runtime/memory/jemalloc_hook.cpp | 48 ++++---- be/src/runtime/memory/mem_tracker.cpp | 9 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 15 ++- be/src/runtime/memory/mem_tracker_limiter.h | 4 +- be/src/runtime/memory/system_allocator.cpp | 3 +- be/src/runtime/memory/tcmalloc_hook.h | 4 +- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 6 +- be/src/runtime/memory/thread_mem_tracker_mgr.h | 49 ++++++--- be/src/runtime/runtime_state.cpp | 2 +- be/src/runtime/sorted_run_merger.cc | 2 +- be/src/runtime/thread_context.cpp | 17 ++- be/src/runtime/thread_context.h | 133 ++++++++++++++++------- be/src/vec/common/allocator.h | 45 ++++---- be/src/vec/common/sort/sorter.cpp | 6 +- be/src/vec/exec/join/vhash_join_node.cpp | 18 +-- be/src/vec/exec/vaggregation_node.cpp | 2 +- be/test/testutil/run_all_tests.cpp | 2 +- 33 files changed, 254 insertions(+), 197 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 2061172b6a..aa193300a7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -635,7 +635,7 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16"); CONF_Bool(enable_tcmalloc_hook, "true"); // Print more detailed logs, more detailed records, etc. -CONF_Bool(memory_debug, "false"); +CONF_mBool(memory_debug, "false"); // The minimum length when TCMalloc Hook consumes/releases MemTracker, consume size // smaller than this value will continue to accumulate. specified as number of bytes. diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp index b50b03460e..e0fbd8dd7b 100644 --- a/be/src/exec/hash_table.cpp +++ b/be/src/exec/hash_table.cpp @@ -175,8 +175,7 @@ Status HashTable::resize_buckets(int64_t num_buckets) { int64_t old_num_buckets = _num_buckets; int64_t delta_bytes = (num_buckets - old_num_buckets) * sizeof(Bucket); - Status st = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( - delta_bytes); + Status st = thread_context()->thread_mem_tracker()->check_limit(delta_bytes); if (!st) { LOG_EVERY_N(WARNING, 100) << "resize bucket failed: " << st.to_string(); return st; diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index 49c02fa5a8..03cecb794a 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -910,15 +910,12 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( << "to allocate $1 bytes for intermediate tuple. " << "Backend: " << BackendOptions::get_localhost() << ", " << "fragment: " << print_id(state_->fragment_instance_id()) << " " - << "Used: " - << thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->consumption() - << ", Limit: " - << thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->limit() << ". " + << "Used: " << thread_context()->thread_mem_tracker()->consumption() + << ", Limit: " << thread_context()->thread_mem_tracker()->limit() << ". " << "You can change the limit by session variable exec_mem_limit."; string details = Substitute(str.str(), _id, tuple_data_size); - *status = thread_context() - ->_thread_mem_tracker_mgr->limiter_mem_tracker() - ->fragment_mem_limit_exceeded(state_, details, tuple_data_size); + *status = thread_context()->thread_mem_tracker()->fragment_mem_limit_exceeded( + state_, details, tuple_data_size); return nullptr; } memset(tuple_data, 0, fixed_size); diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc index f9598b510b..1b819a1c39 100644 --- a/be/src/exec/partitioned_hash_table.cc +++ b/be/src/exec/partitioned_hash_table.cc @@ -307,8 +307,7 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_)); int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_); - if (UNLIKELY(!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( - mem_usage))) { + if (UNLIKELY(!thread_context()->thread_mem_tracker()->check_limit(mem_usage))) { capacity_ = 0; string details = Substitute( "PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes", mem_usage); diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 7edc3ec7f0..850f6e51b5 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -144,7 +144,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr } else { (*output) << "<h4>*Note: (see documentation for details)</h4>\n"; (*output) << "<h4> 1.`/mem_tracker?type=global` to view the memory statistics of each " - "type</h4>\n"; + "type, `global`life cycle is the same as the process, e.g. each Cache, " + "StorageEngine, each Manager.</h4>\n"; (*output) << "<h4> 2.`/mem_tracker` counts virtual memory, which is equal to `Actual " "memory used` in `/memz`</h4>\n"; (*output) << "<h4> 3.`process` is equal to the sum of all types of memory, " diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index 96a8709ace..3e7f55de04 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -102,7 +102,6 @@ void EvHttpServer::start() { _event_bases.resize(_num_workers); for (int i = 0; i < _num_workers; ++i) { CHECK(_workers->submit_func([this, i]() { - thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) { event_base_free(base); }); diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 904ef62baf..4cbb0d394a 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -65,7 +65,7 @@ SnapshotManager* SnapshotManager::instance() { Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* snapshot_path, bool* allow_incremental_clone) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); Status res = Status::OK(); if (snapshot_path == nullptr) { LOG(WARNING) << "output parameter cannot be null"; @@ -93,7 +93,7 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s Status SnapshotManager::release_snapshot(const string& snapshot_path) { // 如果请求的snapshot_path位于root/snapshot文件夹下,则认为是合法的,可以删除 // 否则认为是非法请求,返回错误结果 - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); auto stores = StorageEngine::instance()->get_stores(); for (auto store : stores) { std::string abs_path; @@ -117,7 +117,7 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) { Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t tablet_id, int64_t replica_id, const int32_t& schema_hash) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); Status res = Status::OK(); // check clone dir existed if (!FileUtils::check_exist(clone_dir)) { diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h index d4c71405b9..a7857c3ffb 100644 --- a/be/src/olap/snapshot_manager.h +++ b/be/src/olap/snapshot_manager.h @@ -64,7 +64,7 @@ public: private: SnapshotManager() : _snapshot_base_id(0) { - _mem_tracker = std::make_unique<MemTracker>("SnapshotManager"); + _mem_tracker = std::make_shared<MemTracker>("SnapshotManager"); } Status _calc_snapshot_id_path(const TabletSharedPtr& tablet, int64_t timeout_s, @@ -98,7 +98,7 @@ private: std::mutex _snapshot_mutex; uint64_t _snapshot_base_id; - std::unique_ptr<MemTracker> _mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; }; // SnapshotManager } // namespace doris diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 5eab6fa5f3..bd1095a1db 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -107,7 +107,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) _effective_cluster_id(-1), _is_all_cluster_id_exist(true), _mem_tracker(std::make_shared<MemTracker>("StorageEngine")), - _segcompaction_mem_tracker(std::make_unique<MemTracker>("SegCompaction")), + _segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")), _segment_meta_mem_tracker(std::make_unique<MemTracker>("SegmentMeta")), _stop_background_threads_latch(1), _tablet_manager(new TabletManager(config::tablet_map_shard_size)), @@ -152,7 +152,7 @@ void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) { std::vector<std::thread> threads; for (auto data_dir : data_dirs) { threads.emplace_back([this, data_dir] { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); auto res = data_dir->load(); if (!res.ok()) { LOG(WARNING) << "io error when init load tables. res=" << res @@ -198,7 +198,7 @@ Status StorageEngine::_init_store_map() { _tablet_manager.get(), _txn_manager.get()); tmp_stores.emplace_back(store); threads.emplace_back([this, store, &error_msg_lock, &error_msg]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); auto st = store->init(); if (!st.ok()) { { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 46607b241e..20069b6b40 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -180,7 +180,7 @@ public: Status get_compaction_status_json(std::string* result); MemTracker* segment_meta_mem_tracker() { return _segment_meta_mem_tracker.get(); } - MemTracker* segcompaction_mem_tracker() { return _segcompaction_mem_tracker.get(); } + std::shared_ptr<MemTracker> segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } // check cumulative compaction config void check_cumulative_compaction_config(); @@ -326,7 +326,7 @@ private: // StorageEngine oneself std::shared_ptr<MemTracker> _mem_tracker; // Count the memory consumption of segment compaction tasks. - std::unique_ptr<MemTracker> _segcompaction_mem_tracker; + std::shared_ptr<MemTracker> _segcompaction_mem_tracker; // This mem tracker is only for tracking memory use by segment meta data such as footer or index page. // The memory consumed by querying is tracked in segment iterator. std::unique_ptr<MemTracker> _segment_meta_mem_tracker; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 40e700736d..c877a8d0ec 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -73,7 +73,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE mem_consumption, Labels({{"type", "tablet_meta"}})); TabletManager::TabletManager(int32_t tablet_map_lock_shard_size) - : _mem_tracker(std::make_unique<MemTracker>("TabletManager")), + : _mem_tracker(std::make_shared<MemTracker>("TabletManager")), _tablets_shards_size(tablet_map_lock_shard_size), _tablets_shards_mask(tablet_map_lock_shard_size - 1) { CHECK_GT(_tablets_shards_size, 0); @@ -224,7 +224,7 @@ bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { } Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); DorisMetrics::instance()->create_tablet_requests_total->increment(1); int64_t tablet_id = request.tablet_id; @@ -433,7 +433,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, LOG(INFO) << "tablet " << tablet_id << " is under clone, skip drop task"; return Status::Aborted("aborted"); } - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition); } @@ -493,7 +493,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl Status TabletManager::drop_tablets_on_error_root_path( const std::vector<TabletInfo>& tablet_info_vec) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); Status res = Status::OK(); if (tablet_info_vec.empty()) { // This is a high probability event return res; @@ -908,7 +908,7 @@ Status TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet> } Status TabletManager::start_trash_sweep() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); { std::vector<TabletSharedPtr> all_tablets; // we use this vector to save all tablet ptr for saving lock time. @@ -1027,7 +1027,7 @@ void TabletManager::unregister_clone_tablet(int64_t tablet_id) { void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id, SchemaHash schema_hash, const string& schema_hash_path) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); // acquire the read lock, so that there is no creating tablet or load tablet from meta tasks // create tablet and load tablet task should check whether the dir exists tablets_shard& shard = _get_tablets_shard(tablet_id); @@ -1089,7 +1089,7 @@ void TabletManager::get_partition_related_tablets(int64_t partition_id, } void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); std::vector<TabletSharedPtr> related_tablets; { for (auto& tablets_shard : _tablets_shards) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 1154a22d32..1bd62381cf 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -201,7 +201,7 @@ private: }; // trace the memory use by meta of tablet - std::unique_ptr<MemTracker> _mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; const int32_t _tablets_shards_size; const int32_t _tablets_shards_mask; diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index ffa3c52b9b..0a17ad0b3a 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -250,9 +250,7 @@ int64_t BufferedBlockMgr2::available_buffers(Client* client) const { int64_t BufferedBlockMgr2::remaining_unreserved_buffers() const { int64_t num_buffers = _free_io_buffers.size() + _unpinned_blocks.size() + _non_local_outstanding_writes; - num_buffers += - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity() / - max_block_size(); + num_buffers += thread_context()->thread_mem_tracker()->spare_capacity() / max_block_size(); num_buffers -= _unfullfilled_reserved_buffers; return num_buffers; } @@ -358,9 +356,7 @@ Status BufferedBlockMgr2::get_new_block(Client* client, Block* unpin_block, Bloc if (len > 0 && len < _max_block_size) { DCHECK(unpin_block == nullptr); - Status st = - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( - len); + Status st = thread_context()->thread_mem_tracker()->check_limit(len); WARN_IF_ERROR(st, "get_new_block failed"); if (st) { client->_tracker->consume(len); @@ -986,8 +982,7 @@ Status BufferedBlockMgr2::find_buffer(unique_lock<mutex>& lock, BufferDescriptor // First, try to allocate a new buffer. if (_free_io_buffers.size() < _block_write_threshold && - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( - _max_block_size)) { + thread_context()->thread_mem_tracker()->check_limit(_max_block_size)) { _mem_tracker->consume(_max_block_size); uint8_t* new_buffer = new uint8_t[_max_block_size]; *buffer_desc = _obj_pool.add(new BufferDescriptor(new_buffer, _max_block_size)); @@ -1155,11 +1150,9 @@ string BufferedBlockMgr2::debug_internal() const { << " Unfullfilled reserved buffers: " << _unfullfilled_reserved_buffers << endl << " BUffer Block Mgr Used memory: " << _mem_tracker->consumption() << " Instance remaining memory: " - << thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity() - << " (#blocks=" - << (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity() / - _max_block_size) - << ")" << endl + << thread_context()->thread_mem_tracker()->spare_capacity() << " (#blocks=" + << (thread_context()->thread_mem_tracker()->spare_capacity() / _max_block_size) << ")" + << endl << " Block write threshold: " << _block_write_threshold; return ss.str(); } diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index 2f330f250c..702ee127a2 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -955,7 +955,6 @@ void DiskIoMgr::work_loop(DiskQueue* disk_queue) { // 3. Perform the read or write as specified. // Cancellation checking needs to happen in both steps 1 and 3. - thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); while (!_shut_down) { RequestContext* worker_context = nullptr; ; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index bec666859c..e4cd3c7bab 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -179,8 +179,7 @@ Status ExecEnv::_init_mem_env() { _orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan"); _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); - thread_context()->_thread_mem_tracker_mgr->init(); - thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); + thread_context()->thread_mem_tracker_mgr->init(); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) if (doris::config::enable_tcmalloc_hook) { diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 6d2760b389..c289e29acc 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -133,9 +133,7 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) { } chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); - if (check_limits && - !thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( - chunk_size)) { + if (check_limits && !thread_context()->thread_mem_tracker()->check_limit(chunk_size)) { return Status::MemoryAllocFailed("MemPool find new chunk {} bytes faild, exceed limit", chunk_size); } diff --git a/be/src/runtime/memory/jemalloc_hook.cpp b/be/src/runtime/memory/jemalloc_hook.cpp index 9a664cae17..5bba0877a6 100644 --- a/be/src/runtime/memory/jemalloc_hook.cpp +++ b/be/src/runtime/memory/jemalloc_hook.cpp @@ -28,16 +28,16 @@ extern "C" { void* doris_malloc(size_t size) __THROW { - MEM_MALLOC_HOOK(je_nallocx(size, 0)); + TRY_CONSUME_MEM_TRACKER(je_nallocx(size, 0), nullptr); void* ptr = je_malloc(size); if (UNLIKELY(ptr == nullptr)) { - MEM_FREE_HOOK(je_nallocx(size, 0)); + TRY_RELEASE_MEM_TRACKER(je_nallocx(size, 0)); } return ptr; } void doris_free(void* p) __THROW { - MEM_FREE_HOOK(je_malloc_usable_size(p)); + RELEASE_MEM_TRACKER(je_malloc_usable_size(p)); je_free(p); } @@ -50,10 +50,10 @@ void* doris_realloc(void* p, size_t size) __THROW { int64_t old_size = je_malloc_usable_size(p); #endif - MEM_MALLOC_HOOK(je_nallocx(size, 0) - old_size); + TRY_CONSUME_MEM_TRACKER(je_nallocx(size, 0) - old_size, nullptr); void* ptr = je_realloc(p, size); if (UNLIKELY(ptr == nullptr)) { - MEM_FREE_HOOK(je_nallocx(size, 0) - old_size); + TRY_RELEASE_MEM_TRACKER(je_nallocx(size, 0) - old_size); } return ptr; } @@ -63,72 +63,72 @@ void* doris_calloc(size_t n, size_t size) __THROW { return nullptr; } - MEM_MALLOC_HOOK(n * size); + TRY_CONSUME_MEM_TRACKER(n * size, nullptr); void* ptr = je_calloc(n, size); if (UNLIKELY(ptr == nullptr)) { - MEM_FREE_HOOK(n * size); + TRY_RELEASE_MEM_TRACKER(n * size); } else { - MEM_FREE_HOOK(je_malloc_usable_size(ptr) - n * size); + CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - n * size); } return ptr; } void doris_cfree(void* ptr) __THROW { - MEM_FREE_HOOK(je_malloc_usable_size(ptr)); + RELEASE_MEM_TRACKER(je_malloc_usable_size(ptr)); je_free(ptr); } void* doris_memalign(size_t align, size_t size) __THROW { - MEM_MALLOC_HOOK(size); + TRY_CONSUME_MEM_TRACKER(size, nullptr); void* ptr = je_aligned_alloc(align, size); if (UNLIKELY(ptr == nullptr)) { - MEM_FREE_HOOK(size); + TRY_RELEASE_MEM_TRACKER(size); } else { - MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size); + CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size); } return ptr; } void* doris_aligned_alloc(size_t align, size_t size) __THROW { - MEM_MALLOC_HOOK(size); + TRY_CONSUME_MEM_TRACKER(size, nullptr); void* ptr = je_aligned_alloc(align, size); if (UNLIKELY(ptr == nullptr)) { - MEM_FREE_HOOK(size); + TRY_RELEASE_MEM_TRACKER(size); } else { - MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size); + CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size); } return ptr; } void* doris_valloc(size_t size) __THROW { - MEM_MALLOC_HOOK(size); + TRY_CONSUME_MEM_TRACKER(size, nullptr); void* ptr = je_valloc(size); if (UNLIKELY(ptr == nullptr)) { - MEM_FREE_HOOK(size); + TRY_RELEASE_MEM_TRACKER(size); } else { - MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size); + CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size); } return ptr; } void* doris_pvalloc(size_t size) __THROW { - MEM_MALLOC_HOOK(size); + TRY_CONSUME_MEM_TRACKER(size, nullptr); void* ptr = je_valloc(size); if (UNLIKELY(ptr == nullptr)) { - MEM_FREE_HOOK(size); + TRY_RELEASE_MEM_TRACKER(size); } else { - MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size); + CONSUME_MEM_TRACKER(je_malloc_usable_size(ptr) - size); } return ptr; } int doris_posix_memalign(void** r, size_t align, size_t size) __THROW { - MEM_MALLOC_HOOK(size); + TRY_CONSUME_MEM_TRACKER(size, ENOMEM); int ret = je_posix_memalign(r, align, size); if (UNLIKELY(ret != 0)) { - MEM_FREE_HOOK(size); + TRY_RELEASE_MEM_TRACKER(size); } else { - MEM_MALLOC_HOOK(je_malloc_usable_size(*r) - size); + CONSUME_MEM_TRACKER(je_malloc_usable_size(*r) - size); } return ret; } diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index bf7e308ff3..4248ae99f2 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -62,10 +62,9 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTra _parent_label = parent->label(); _parent_group_num = parent->group_num(); } else { - DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr); - _parent_label = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label(); - _parent_group_num = - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num(); + DCHECK(thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr); + _parent_label = thread_context()->thread_mem_tracker()->label(); + _parent_group_num = thread_context()->thread_mem_tracker()->group_num(); } { std::lock_guard<std::mutex> l(mem_tracker_pool[_parent_group_num].group_lock); @@ -106,7 +105,7 @@ void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) { return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", - snapshot.label, snapshot.type, print_bytes(snapshot.cur_consumption), + snapshot.label, snapshot.parent_label, print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 1fca6009ee..e2ec8bf028 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -170,7 +170,7 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { std::string detail = msg; detail += "\n " + MemTrackerLimiter::process_mem_log_str(); if (_enable_print_log_usage) { - detail += log_usage(); + detail += "\n " + log_usage(); std::string child_trackers_usage; std::vector<MemTracker::Snapshot> snapshots; MemTracker::make_group_snapshot(&snapshots, _group_num, _label); @@ -195,22 +195,26 @@ void MemTrackerLimiter::print_log_process_usage(const std::string& msg) { MemTrackerLimiter::make_process_snapshots(&snapshots); MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); for (const auto& snapshot : snapshots) { - detail += "\n " + MemTrackerLimiter::log_usage(snapshot); + if (snapshot.parent_label == "") { + detail += "\n " + MemTrackerLimiter::log_usage(snapshot); + } else { + detail += "\n " + MemTracker::log_usage(snapshot); + } } LOG(WARNING) << detail; + // LOG(WARNING) << boost::stacktrace::to_string(boost::stacktrace::stacktrace()); // TODO } std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, - const std::string& limit_exceeded_errmsg_prefix) { + const std::string& limit_exceeded_errmsg) { DCHECK(_limit != -1); STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); std::string detail = fmt::format( "Memory limit exceeded:<consuming tracker:<{}>, {}>, executing msg:<{}>. backend {} " "process memory used {}, limit {}. If query tracker exceed, `set " "exec_mem_limit=8G` to change limit, details mem usage see be.INFO.", - _label, limit_exceeded_errmsg_prefix, msg, BackendOptions::get_localhost(), + _label, limit_exceeded_errmsg, msg, BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str()); - print_log_usage(detail); return detail; } @@ -218,6 +222,7 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const int64_t failed_alloc_size) { auto failed_msg = mem_limit_exceeded(msg, tracker_limit_exceeded_errmsg_str(failed_alloc_size, this)); + print_log_usage(failed_msg); state->log_error(failed_msg); return Status::MemoryLimitExceeded(failed_msg); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 2182e84f00..26489075da 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -135,7 +135,7 @@ public: // Log the memory usage when memory limit is exceeded. std::string mem_limit_exceeded(const std::string& msg, - const std::string& limit_exceeded_errmsg_prefix); + const std::string& limit_exceeded_errmsg); Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg, int64_t failed_allocation_size = 0); @@ -237,8 +237,6 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms _consumption->add(bytes); // No limit at this tracker. } else { if (!_consumption->try_add(bytes, _limit)) { - // Failed for this mem tracker. Roll back the ones that succeeded. - _consumption->add(-bytes); failed_msg = tracker_limit_exceeded_errmsg_str(bytes, this); return false; } diff --git a/be/src/runtime/memory/system_allocator.cpp b/be/src/runtime/memory/system_allocator.cpp index f8dd402ad1..c0721a046e 100644 --- a/be/src/runtime/memory/system_allocator.cpp +++ b/be/src/runtime/memory/system_allocator.cpp @@ -45,8 +45,9 @@ uint8_t* SystemAllocator::allocate_via_malloc(size_t length) { char buf[64]; auto err = fmt::format("fail to allocate mem via posix_memalign, res={}, errmsg={}.", res, strerror_r(res, buf, 64)); - MemTrackerLimiter::print_log_process_usage(err); LOG(ERROR) << err; + if (enable_thread_cache_bad_alloc) throw std::bad_alloc {}; + MemTrackerLimiter::print_log_process_usage(err); return nullptr; } return (uint8_t*)ptr; diff --git a/be/src/runtime/memory/tcmalloc_hook.h b/be/src/runtime/memory/tcmalloc_hook.h index 6ec9352ad3..afd15d0b32 100644 --- a/be/src/runtime/memory/tcmalloc_hook.h +++ b/be/src/runtime/memory/tcmalloc_hook.h @@ -36,11 +36,11 @@ // destructor to control the behavior of consume can lead to unexpected behavior, // like this: if (LIKELY(doris::start_thread_mem_tracker)) { void new_hook(const void* ptr, size_t size) { - MEM_MALLOC_HOOK(tc_nallocx(size, 0)); + CONSUME_MEM_TRACKER(tc_nallocx(size, 0)); } void delete_hook(const void* ptr) { - MEM_FREE_HOOK(tc_malloc_size(const_cast<void*>(ptr))); + RELEASE_MEM_TRACKER(tc_malloc_size(const_cast<void*>(ptr))); } void init_hook() { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 4273860468..ac853d7c87 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -49,14 +49,12 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details } } -void ThreadMemTrackerMgr::exceeded(const std::string& failed_msg) { +void ThreadMemTrackerMgr::exceeded() { if (_cb_func != nullptr) { _cb_func(); } - auto cancel_msg = _limiter_tracker_raw->mem_limit_exceeded( - fmt::format("execute:<{}>", last_consumer_tracker()), failed_msg); if (is_attach_query()) { - exceeded_cancel_task(cancel_msg); + exceeded_cancel_task(_exceed_mem_limit_msg); } _check_limit = false; // Make sure it will only be canceled once } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 1ab3e7bce4..27c2f892e1 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -78,6 +78,7 @@ public: // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, void consume(int64_t size); + bool try_consume(int64_t size); template <bool CheckLimit> void flush_untracked_mem(); @@ -95,7 +96,8 @@ public: bool check_limit() { return _check_limit; } void set_check_limit(bool check_limit) { _check_limit = check_limit; } - void set_check_attach(bool check_attach) { _check_attach = check_attach; } + std::string exceed_mem_limit_msg() { return _exceed_mem_limit_msg; } + void clear_exceed_mem_limit_msg() { _exceed_mem_limit_msg = ""; } std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; @@ -113,7 +115,12 @@ private: // If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled void exceeded_cancel_task(const std::string& cancel_details); - void exceeded(const std::string& failed_msg); + void exceeded(); + + void save_exceed_mem_limit_msg() { + _exceed_mem_limit_msg = _limiter_tracker_raw->mem_limit_exceeded( + fmt::format("execute:<{}>", last_consumer_tracker()), _bad_consume_msg); + } private: // is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized @@ -125,7 +132,8 @@ private: bool _count_scope_mem = false; int64_t _scope_mem = 0; - std::string failed_msg = std::string(); + std::string _bad_consume_msg = std::string(); + std::string _exceed_mem_limit_msg = std::string(); std::shared_ptr<MemTrackerLimiter> _limiter_tracker; MemTrackerLimiter* _limiter_tracker_raw = nullptr; @@ -135,7 +143,6 @@ private: bool _check_limit = false; // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; - bool _check_attach = true; TUniqueId _fragment_instance_id = TUniqueId(); ExceedCallBack _cb_func = nullptr; }; @@ -191,21 +198,13 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { old_untracked_mem = _untracked_mem; if (_count_scope_mem) _scope_mem += _untracked_mem; if (CheckLimit) { -#ifndef BE_TEST - // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker. - // If _check_attach is true and it is not in the brpc server (the protobuf will be operated when bthread is started), - // it will check whether the tracker label is equal to the default "Process" when flushing. - // If you do not want this check, set_check_attach=true - // TODO(zxy) The current p0 test cannot guarantee that all threads are checked, - // so disable it and try to open it when memory tracking is not on time. - // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || - // _limiter_tracker_raw->label() != "Process"); -#endif - if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) { + if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _bad_consume_msg)) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. _limiter_tracker_raw->consume(old_untracked_mem); - exceeded(failed_msg); + save_exceed_mem_limit_msg(); + _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg); + exceeded(); } } else { _limiter_tracker_raw->consume(old_untracked_mem); @@ -217,4 +216,22 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { _stop_consume = false; } +inline bool ThreadMemTrackerMgr::try_consume(int64_t size) { + if (!_stop_consume) { + // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering + // the Memory Hook again, so suspend consumption to avoid falling into an infinite loop. + _stop_consume = true; + if (!_limiter_tracker_raw->try_consume(size, _bad_consume_msg)) { + save_exceed_mem_limit_msg(); + _stop_consume = false; + return false; + } + _stop_consume = false; + return true; + } else { + _untracked_mem += size; + return true; + } +} + } // namespace doris diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 07b04f5f09..b18671a9b8 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -281,7 +281,7 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) { Status RuntimeState::check_query_state(const std::string& msg) { // TODO: it would be nice if this also checked for cancellation, but doing so breaks // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached. - if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->limit_exceeded()) { + if (thread_context()->thread_mem_tracker()->limit_exceeded()) { RETURN_LIMIT_EXCEEDED(this, msg); } return query_status(); diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc index 32bdad9a6d..31ae06f47a 100644 --- a/be/src/runtime/sorted_run_merger.cc +++ b/be/src/runtime/sorted_run_merger.cc @@ -129,7 +129,7 @@ public: *done = false; _pull_task_thread = std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, - this, thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()); + this, thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()); RETURN_IF_ERROR(next(nullptr, done)); return Status::OK(); diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index cd593f05ad..6caafdc5db 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -31,11 +31,11 @@ ThreadContextPtr::ThreadContextPtr() { ScopeMemCount::ScopeMemCount(int64_t* scope_mem) { _scope_mem = scope_mem; - thread_context()->_thread_mem_tracker_mgr->start_count_scope_mem(); + thread_context()->thread_mem_tracker_mgr->start_count_scope_mem(); } ScopeMemCount::~ScopeMemCount() { - *_scope_mem = thread_context()->_thread_mem_tracker_mgr->stop_count_scope_mem(); + *_scope_mem += thread_context()->thread_mem_tracker_mgr->stop_count_scope_mem(); } AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, @@ -58,30 +58,29 @@ AttachTask::~AttachTask() { SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter( const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { - _old_mem_tracker = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->_thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId()); + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId()); } SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() { - thread_context()->_thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); + thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { - _need_pop = thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); + _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer( const std::shared_ptr<MemTracker>& mem_tracker) : _mem_tracker(mem_tracker) { - _need_pop = - thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); + _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); } AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { #ifndef NDEBUG DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1); #endif // NDEBUG - if (_need_pop) thread_context()->_thread_mem_tracker_mgr->pop_consumer_tracker(); + if (_need_pop) thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); } } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index d07cbe7056..68f09ce085 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -27,6 +27,7 @@ #include "gutil/macros.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/threadlocal.h" +#include "util/defer_op.h" // Used to observe the memory usage of the specified code segment #ifdef USE_MEM_TRACKER @@ -66,16 +67,13 @@ auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit() // If the thread MemTrackerLimiter exceeds the limit, an error status is returned. // Usually used after SCOPED_ATTACH_TASK, during query execution. -#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ - return doris::thread_context() \ - ->_thread_mem_tracker_mgr->limiter_mem_tracker() \ - ->fragment_mem_limit_exceeded( \ - state, \ - fmt::format("exec node:<{}>, {}", \ - doris::thread_context() \ - ->_thread_mem_tracker_mgr->last_consumer_tracker(), \ - msg), \ - ##__VA_ARGS__); +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ + return doris::thread_context()->thread_mem_tracker()->fragment_mem_limit_exceeded( \ + state, \ + fmt::format("exec node:<{}>, {}", \ + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), \ + msg), \ + ##__VA_ARGS__); #else #define SCOPED_ATTACH_TASK(arg1, ...) (void)0 #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) (void)0 @@ -125,6 +123,7 @@ public: }; inline thread_local ThreadContextPtr thread_context_ptr; +inline thread_local bool enable_thread_cache_bad_alloc = false; // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. @@ -142,8 +141,8 @@ inline thread_local bthread_t bthread_id; class ThreadContext { public: ThreadContext() { - _thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr()); - if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->init(); + thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr()); + if (ExecEnv::GetInstance()->initialized()) thread_mem_tracker_mgr->init(); } ~ThreadContext() { thread_context_ptr.init = false; } @@ -154,18 +153,18 @@ public: // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. DCHECK(mem_tracker); // Orphan is thread default tracker. - DCHECK(_thread_mem_tracker_mgr->limiter_mem_tracker()->label() == "Orphan") + DCHECK(thread_mem_tracker()->label() == "Orphan") << ", attach mem tracker label: " << mem_tracker->label(); #endif _task_id = task_id; _fragment_instance_id = fragment_instance_id; - _thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, fragment_instance_id); + thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, fragment_instance_id); } void detach_task() { _task_id = ""; _fragment_instance_id = TUniqueId(); - _thread_mem_tracker_mgr->detach_limiter_tracker(); + thread_mem_tracker_mgr->detach_limiter_tracker(); } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } @@ -176,13 +175,16 @@ public: return ss.str(); } - // After _thread_mem_tracker_mgr is initialized, the current thread TCMalloc Hook starts to + // After thread_mem_tracker_mgr is initialized, the current thread TCMalloc Hook starts to // consume/release mem_tracker. // Note that the use of shared_ptr will cause a crash. The guess is that there is an // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal // to nullptr, but the object it points to is not initialized. At this time, when the memory // is released somewhere, the TCMalloc hook is triggered to cause the crash. - std::unique_ptr<ThreadMemTrackerMgr> _thread_mem_tracker_mgr; + std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr; + MemTrackerLimiter* thread_mem_tracker() { + return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); + } private: std::string _task_id = ""; @@ -271,12 +273,12 @@ private: class StopCheckThreadMemTrackerLimit { public: explicit StopCheckThreadMemTrackerLimit() { - _pre = thread_context()->_thread_mem_tracker_mgr->check_limit(); - thread_context()->_thread_mem_tracker_mgr->set_check_limit(false); + _pre = thread_context()->thread_mem_tracker_mgr->check_limit(); + thread_context()->thread_mem_tracker_mgr->set_check_limit(false); } ~StopCheckThreadMemTrackerLimit() { - thread_context()->_thread_mem_tracker_mgr->set_check_limit(_pre); + thread_context()->thread_mem_tracker_mgr->set_check_limit(_pre); } private: @@ -287,45 +289,102 @@ private: #ifdef USE_MEM_TRACKER // For the memory that cannot be counted by mem hook, manually count it into the mem tracker, such as mmap. #define CONSUME_THREAD_MEM_TRACKER(size) \ - doris::thread_context()->_thread_mem_tracker_mgr->consume(size) + doris::thread_context()->thread_mem_tracker_mgr->consume(size) #define RELEASE_THREAD_MEM_TRACKER(size) \ - doris::thread_context()->_thread_mem_tracker_mgr->consume(-size) + doris::thread_context()->thread_mem_tracker_mgr->consume(-size) // used to fix the tracking accuracy of caches. -#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ - doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ size, tracker) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ tracker->transfer_to( \ - size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()) + size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()) + +// Consider catching other memory errors, such as memset failure, etc. +#define RETURN_IF_CATCH_BAD_ALLOC(stmt) \ + do { \ + doris::thread_context()->thread_mem_tracker_mgr->clear_exceed_mem_limit_msg(); \ + if (doris::enable_thread_cache_bad_alloc) { \ + try { \ + { stmt; } \ + } catch (std::bad_alloc const& e) { \ + doris::thread_context()->thread_mem_tracker()->print_log_usage( \ + doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \ + return Status::MemoryLimitExceeded(fmt::format( \ + "PreCatch {}, {}", e.what(), \ + doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \ + } \ + } else { \ + try { \ + doris::enable_thread_cache_bad_alloc = true; \ + Defer defer {[&]() { doris::enable_thread_cache_bad_alloc = false; }}; \ + { stmt; } \ + } catch (std::bad_alloc const& e) { \ + doris::thread_context()->thread_mem_tracker()->print_log_usage( \ + doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \ + return Status::MemoryLimitExceeded(fmt::format( \ + "PreCatch {}, {}", e.what(), \ + doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \ + } \ + } \ + } while (0) // Mem Hook to consume thread mem tracker // TODO: In the original design, the MemTracker consume method is called before the memory is allocated. // If the consume succeeds, the memory is actually allocated, otherwise an exception is thrown. // But the statistics of memory through TCMalloc new/delete Hook are after the memory is actually allocated, // which is different from the previous behavior. -#define MEM_MALLOC_HOOK(size) \ +#define CONSUME_MEM_TRACKER(size) \ + do { \ + if (doris::thread_context_ptr.init) { \ + doris::thread_context()->thread_mem_tracker_mgr->consume(size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(size); \ + } \ + } while (0) +// NOTE, The LOG cannot be printed in the mem hook. If the LOG statement triggers the mem hook LOG, +// the nested LOG may cause an unknown crash. +#define TRY_CONSUME_MEM_TRACKER(size, fail_ret) \ + do { \ + if (doris::thread_context_ptr.init) { \ + if (doris::enable_thread_cache_bad_alloc) { \ + if (!doris::thread_context()->thread_mem_tracker_mgr->try_consume(size)) { \ + return fail_ret; \ + } \ + } else { \ + doris::thread_context()->thread_mem_tracker_mgr->consume(size); \ + } \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(size); \ + } \ + } while (0) +#define RELEASE_MEM_TRACKER(size) \ do { \ if (doris::thread_context_ptr.init) { \ - doris::thread_context()->_thread_mem_tracker_mgr->consume(size); \ + doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \ } else { \ - doris::ThreadMemTrackerMgr::consume_no_attach(size); \ + doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ } \ } while (0) -#define MEM_FREE_HOOK(size) \ - do { \ - if (doris::thread_context_ptr.init) { \ - doris::thread_context()->_thread_mem_tracker_mgr->consume(-size); \ - } else { \ - doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ - } \ +#define TRY_RELEASE_MEM_TRACKER(size) \ + do { \ + if (doris::thread_context_ptr.init) { \ + if (!doris::enable_thread_cache_bad_alloc) { \ + doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \ + } \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ + } \ } while (0) #else #define CONSUME_THREAD_MEM_TRACKER(size) (void)0 #define RELEASE_THREAD_MEM_TRACKER(size) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 -#define MEM_MALLOC_HOOK(size) (void)0 -#define MEM_FREE_HOOK(size) (void)0 +#define CONSUME_MEM_TRACKER(size) (void)0 +#define TRY_CONSUME_MEM_TRACKER(size) (void)0 +#define RELEASE_MEM_TRACKER(size) (void)0 +#define TRY_RELEASE_MEM_TRACKER(size) (void)0 #endif } // namespace doris diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 36a11fc6f9..c1c7e81297 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -102,6 +102,14 @@ static constexpr size_t CHUNK_THRESHOLD = 1024; static constexpr size_t MMAP_MIN_ALIGNMENT = 4096; static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; +#define RETURN_BAD_ALLOC(err) \ + do { \ + LOG(ERROR) << err; \ + if (doris::enable_thread_cache_bad_alloc) throw std::bad_alloc {}; \ + doris::MemTrackerLimiter::print_log_process_usage(err); \ + return nullptr; \ + } while (0) + /** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena. * Also used in hash tables. * The interface is different from std::allocator @@ -131,20 +139,14 @@ public: buf = mmap(get_mmap_hint(), size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); if (MAP_FAILED == buf) { RELEASE_THREAD_MEM_TRACKER(size); - auto err = fmt::format("Allocator: Cannot mmap {}.", size); - doris::MemTrackerLimiter::print_log_process_usage(err); - doris::vectorized::throwFromErrno(err, - doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY); + RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot mmap {}.", size)); } /// No need for zero-fill, because mmap guarantees it. } else if (!doris::config::disable_chunk_allocator_in_vec && size >= CHUNK_THRESHOLD) { doris::Chunk chunk; if (!doris::ChunkAllocator::instance()->allocate_align(size, &chunk)) { - auto err = fmt::format("Allocator: Cannot allocate chunk {}.", size); - doris::MemTrackerLimiter::print_log_process_usage(err); - doris::vectorized::throwFromErrno(err, - doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY); + RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot allocate chunk {}.", size)); } buf = chunk.data; if constexpr (clear_memory) memset(buf, 0, chunk.size); @@ -156,20 +158,15 @@ public: buf = ::malloc(size); if (nullptr == buf) { - auto err = fmt::format("Allocator: Cannot malloc {}.", size); - doris::MemTrackerLimiter::print_log_process_usage(err); - doris::vectorized::throwFromErrno( - err, doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY); + RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot malloc {}.", size)); } } else { buf = nullptr; int res = posix_memalign(&buf, alignment, size); if (0 != res) { - auto err = fmt::format("Cannot allocate memory (posix_memalign) {}.", size); - doris::MemTrackerLimiter::print_log_process_usage(err); - doris::vectorized::throwFromErrno( - err, doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY, res); + RETURN_BAD_ALLOC( + fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } if constexpr (clear_memory) memset(buf, 0, size); @@ -183,8 +180,9 @@ public: if (size >= MMAP_THRESHOLD) { if (0 != munmap(buf, size)) { auto err = fmt::format("Allocator: Cannot munmap {}.", size); + LOG(ERROR) << err; + if (doris::enable_thread_cache_bad_alloc) throw std::bad_alloc {}; doris::MemTrackerLimiter::print_log_process_usage(err); - doris::vectorized::throwFromErrno(err, doris::TStatusCode::VEC_CANNOT_MUNMAP); } else { RELEASE_THREAD_MEM_TRACKER(size); } @@ -210,11 +208,8 @@ public: /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = ::realloc(buf, new_size); if (nullptr == new_buf) { - auto err = - fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size); - doris::MemTrackerLimiter::print_log_process_usage(err); - doris::vectorized::throwFromErrno(err, - doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY); + RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, + new_size)); } buf = new_buf; @@ -230,10 +225,8 @@ public: mmap_flags, -1, 0); if (MAP_FAILED == buf) { RELEASE_THREAD_MEM_TRACKER(new_size - old_size); - auto err = fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.", - old_size, new_size); - doris::MemTrackerLimiter::print_log_process_usage(err); - doris::vectorized::throwFromErrno(err, doris::TStatusCode::VEC_CANNOT_MREMAP); + RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.", + old_size, new_size)); } /// No need for zero-fill, because mmap guarantees it. diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 94ad261165..f41ceeb5b7 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -17,6 +17,8 @@ #include "vec/common/sort/sorter.h" +#include "runtime/thread_context.h" + namespace doris::vectorized { void MergeSorterState::build_merge_tree(SortDescription& sort_description) { @@ -136,8 +138,8 @@ Status FullSorter::append_block(Block* block) { auto sz = block->rows(); for (int i = 0; i < data.size(); ++i) { DCHECK(data[i].type->equals(*(arrival_data[i].type))); - data[i].column->assume_mutable()->insert_range_from( - *arrival_data[i].column->convert_to_full_column_if_const().get(), 0, sz); + RETURN_IF_CATCH_BAD_ALLOC(data[i].column->assume_mutable()->insert_range_from( + *arrival_data[i].column->convert_to_full_column_if_const().get(), 0, sz)); } block->clear_column_data(); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 8bea283f07..5a7d364401 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -81,7 +81,7 @@ struct ProcessHashTableBuild { _build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {} template <bool ignore_null, bool short_circuit_for_null> - void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { + Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; int64_t old_bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); @@ -99,7 +99,7 @@ struct ProcessHashTableBuild { // only not build_unique, we need expanse hash table before insert data if (!_join_node->_build_unique) { // _rows contains null row, which will cause hash table resize to be large. - hash_table_ctx.hash_table.expanse_for_add_elem(_rows); + RETURN_IF_CATCH_BAD_ALLOC(hash_table_ctx.hash_table.expanse_for_add_elem(_rows)); } hash_table_ctx.hash_table.reset_resize_timer(); @@ -125,7 +125,7 @@ struct ProcessHashTableBuild { if ((*null_map)[k]) { DCHECK(has_null_key); *has_null_key = true; - return; + return Status::OK(); } } if constexpr (IsSerializedHashTableContextTraits<KeyGetter>::value) { @@ -187,6 +187,7 @@ struct ProcessHashTableBuild { COUNTER_UPDATE(_join_node->_build_table_expanse_timer, hash_table_ctx.hash_table.get_resize_timer_value()); + return Status::OK(); } private: @@ -1279,7 +1280,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { if (block.rows() != 0) { SCOPED_TIMER(_build_side_merge_block_timer); - mutable_block.merge(block); + RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block)); } if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) { @@ -1464,19 +1465,20 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin // Get the key column that needs to be built Status st = _extract_join_column<true>(block, null_map_val, raw_ptrs, res_col_ids); - std::visit( + st = std::visit( Overload { [&](std::monostate& arg, auto has_null_value, - auto short_circuit_for_null_in_build_side) { + auto short_circuit_for_null_in_build_side) -> Status { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); + return Status::OK(); }, [&](auto&& arg, auto has_null_value, - auto short_circuit_for_null_in_build_side) { + auto short_circuit_for_null_in_build_side) -> Status { using HashTableCtxType = std::decay_t<decltype(arg)>; ProcessHashTableBuild<HashTableCtxType> hash_table_build_process( rows, block, raw_ptrs, this, state->batch_size(), offset); - hash_table_build_process + return hash_table_build_process .template run<has_null_value, short_circuit_for_null_in_build_side>( arg, has_null_value || short_circuit_for_null_in_build_side diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 54a7659471..4d6965ed4d 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -1054,7 +1054,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i _agg_data._aggregated_method_variant); if (!ret_flag) { - _emplace_into_hash_table(_places.data(), key_columns, rows); + RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows)); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i], diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 570f428720..99a338a5aa 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -32,7 +32,7 @@ int main(int argc, char** argv) { std::make_shared<doris::MemTrackerLimiter>(doris::MemTrackerLimiter::Type::GLOBAL, "Orphan"); doris::ExecEnv::GetInstance()->set_orphan_mem_tracker(orphan_mem_tracker); - doris::thread_context()->_thread_mem_tracker_mgr->init(); + doris::thread_context()->thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10); doris::SegmentLoader::create_global_instance(1000); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org