This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 73d8f5901d fix mem tracker limiter (#11376) 73d8f5901d is described below commit 73d8f5901d09eece433dbd3d366fd2719d9d3ee7 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon Aug 1 09:44:04 2022 +0800 fix mem tracker limiter (#11376) --- be/src/olap/base_compaction.cpp | 2 +- be/src/olap/compaction.cpp | 4 +- be/src/olap/compaction.h | 2 +- be/src/olap/cumulative_compaction.cpp | 2 +- be/src/olap/delta_writer.cpp | 20 ++++----- be/src/olap/delta_writer.h | 12 +++--- be/src/olap/memtable_flush_executor.cpp | 7 ++-- be/src/olap/memtable_flush_executor.h | 3 +- be/src/olap/olap_server.cpp | 4 +- be/src/olap/storage_engine.cpp | 16 ++++---- be/src/olap/storage_engine.h | 26 +++++++----- be/src/olap/task/engine_alter_tablet_task.cpp | 4 +- be/src/olap/task/engine_alter_tablet_task.h | 2 +- be/src/olap/task/engine_batch_load_task.cpp | 4 +- be/src/olap/task/engine_batch_load_task.h | 2 +- be/src/olap/task/engine_checksum_task.cpp | 4 +- be/src/olap/task/engine_checksum_task.h | 2 +- be/src/olap/task/engine_clone_task.cpp | 4 +- be/src/olap/task/engine_clone_task.h | 2 +- be/src/runtime/data_stream_mgr.cpp | 6 +-- be/src/runtime/data_stream_recvr.cc | 5 +-- be/src/runtime/data_stream_recvr.h | 5 +-- be/src/runtime/exec_env.h | 16 ++++---- be/src/runtime/exec_env_init.cpp | 12 +++--- be/src/runtime/load_channel.cpp | 6 +-- be/src/runtime/load_channel.h | 4 +- be/src/runtime/load_channel_mgr.cpp | 10 ++--- be/src/runtime/load_channel_mgr.h | 2 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 52 ++++++++++++------------ be/src/runtime/memory/mem_tracker_limiter.h | 18 ++++---- be/src/runtime/memory/mem_tracker_task_pool.cpp | 46 +++++++++------------ be/src/runtime/memory/mem_tracker_task_pool.h | 26 ++++++------ be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 11 +++-- be/src/runtime/memory/thread_mem_tracker_mgr.h | 6 +-- be/src/runtime/runtime_state.cpp | 5 ++- be/src/runtime/runtime_state.h | 8 ++-- be/src/runtime/sorted_run_merger.cc | 2 +- be/src/runtime/tablets_channel.cpp | 7 ++-- be/src/runtime/tablets_channel.h | 7 ++-- be/src/runtime/thread_context.cpp | 5 ++- be/src/runtime/thread_context.h | 5 ++- be/src/service/internal_service.cpp | 16 ++++---- be/src/vec/runtime/vdata_stream_mgr.cpp | 4 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 5 +-- be/src/vec/runtime/vdata_stream_recvr.h | 6 +-- be/test/runtime/mem_limit_test.cpp | 12 +++--- be/test/testutil/run_all_tests.cpp | 3 +- 47 files changed, 220 insertions(+), 212 deletions(-) diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 307fa2fee0..2e8d1f82aa 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -63,7 +63,7 @@ Status BaseCompaction::execute_compact_impl() { return Status::OLAPInternalError(OLAP_ERR_BE_CLONE_OCCURRED); } - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::COMPACTION); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION); // 2. do base compaction, merge rowsets int64_t permits = get_compaction_permits(); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 9a0e50d591..7ac55ab15e 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -34,10 +34,10 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label) _input_row_num(0), _state(CompactionState::INITED) { #ifndef BE_TEST - _mem_tracker = std::make_unique<MemTrackerLimiter>( + _mem_tracker = std::make_shared<MemTrackerLimiter>( -1, label, StorageEngine::instance()->compaction_mem_tracker()); #else - _mem_tracker = std::make_unique<MemTrackerLimiter>(-1, label); + _mem_tracker = std::make_shared<MemTrackerLimiter>(-1, label); #endif } diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 7d7a43bfd3..d73bfa9dba 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -76,7 +76,7 @@ protected: protected: // the root tracker for this compaction - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; TabletSharedPtr _tablet; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 9fc9fd8b62..ba52fbfdcb 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -70,7 +70,7 @@ Status CumulativeCompaction::execute_compact_impl() { return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED); } - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::COMPACTION); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION); // 3. do cumulative compaction, merge rowsets int64_t permits = get_compaction_permits(); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index e50a8567b4..60a096af28 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -30,14 +30,14 @@ namespace doris { -Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, MemTrackerLimiter* parent_tracker, - bool is_vec) { +Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, + const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec) { *writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, is_vec); return Status::OK(); } DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, - MemTrackerLimiter* parent_tracker, bool is_vec) + const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec) : _req(*req), _tablet(nullptr), _cur_rowset(nullptr), @@ -98,9 +98,9 @@ Status DeltaWriter::init() { << ", schema_hash=" << _req.schema_hash; return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND); } - _mem_tracker = std::make_unique<MemTrackerLimiter>( + _mem_tracker = std::make_shared<MemTrackerLimiter>( -1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), _parent_tracker); - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); // check tablet version number if (_tablet->version_count() > config::max_tablet_version_num) { //trigger quick compaction @@ -208,7 +208,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); _mem_table->insert(block, row_idxs); if (_mem_table->need_to_agg()) { @@ -226,7 +226,7 @@ Status DeltaWriter::_flush_memtable_async() { if (++_segment_counter > config::max_segment_num_per_rowset) { return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS); } - return _flush_token->submit(std::move(_mem_table), _mem_tracker.get()); + return _flush_token->submit(std::move(_mem_table), _mem_tracker); } Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { @@ -243,7 +243,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); if (mem_consumption() == _mem_table->memory_usage()) { // equal means there is no memtable in flush queue, just flush this memtable VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " @@ -297,7 +297,7 @@ Status DeltaWriter::close() { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); RETURN_NOT_OK(_flush_memtable_async()); _mem_table.reset(); return Status::OK(); @@ -312,7 +312,7 @@ Status DeltaWriter::close_wait() { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); // return error if previous flush failed RETURN_NOT_OK(_flush_token->wait()); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 1ce62de338..573b786402 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -56,7 +56,9 @@ struct WriteRequest { class DeltaWriter { public: static Status open(WriteRequest* req, DeltaWriter** writer, - MemTrackerLimiter* parent_tracker = nullptr, bool is_vec = false); + const std::shared_ptr<MemTrackerLimiter>& parent_tracker = + std::shared_ptr<MemTrackerLimiter>(), + bool is_vec = false); ~DeltaWriter(); @@ -101,8 +103,8 @@ public: int64_t get_mem_consumption_snapshot() const; private: - DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, MemTrackerLimiter* parent_tracker, - bool is_vec); + DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, + const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec); // push a full memtable to flush executor Status _flush_memtable_async(); @@ -133,8 +135,8 @@ private: StorageEngine* _storage_engine; std::unique_ptr<FlushToken> _flush_token; - std::unique_ptr<MemTrackerLimiter> _mem_tracker; - MemTrackerLimiter* _parent_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _parent_tracker; // The counter of number of segment flushed already. int64_t _segment_counter = 0; diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index b9e622a9a7..53fc5ac2f7 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -29,7 +29,7 @@ namespace doris { class MemtableFlushTask final : public Runnable { public: MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable, - int64_t submit_task_time, MemTrackerLimiter* tracker) + int64_t submit_task_time, const std::shared_ptr<MemTrackerLimiter>& tracker) : _flush_token(flush_token), _memtable(std::move(memtable)), _submit_task_time(submit_task_time), @@ -47,7 +47,7 @@ private: FlushToken* _flush_token; std::unique_ptr<MemTable> _memtable; int64_t _submit_task_time; - MemTrackerLimiter* _tracker; + std::shared_ptr<MemTrackerLimiter> _tracker; }; std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { @@ -58,7 +58,8 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { return os; } -Status FlushToken::submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker) { +Status FlushToken::submit(std::unique_ptr<MemTable> mem_table, + const std::shared_ptr<MemTrackerLimiter>& tracker) { ErrorCode s = _flush_status.load(); if (s != OLAP_SUCCESS) { return Status::OLAPInternalError(s); diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 6f986af3f5..4003126d58 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -57,7 +57,8 @@ public: explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token) : _flush_token(std::move(flush_pool_token)), _flush_status(OLAP_SUCCESS) {} - Status submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker); + Status submit(std::unique_ptr<MemTable> mem_table, + const std::shared_ptr<MemTrackerLimiter>& tracker); // error has happpens, so we cancel this token // And remove all tasks in the queue. diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index ef53eeaee4..cb080a0e70 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -112,7 +112,7 @@ Status StorageEngine::start_bg_threads() { RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_scan_thread", [this, data_dir]() { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); this->_path_scan_thread_callback(data_dir); }, &path_scan_thread)); @@ -122,7 +122,7 @@ Status StorageEngine::start_bg_threads() { RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_gc_thread", [this, data_dir]() { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); this->_path_gc_thread_callback(data_dir); }, &path_gc_thread)); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 4e28ee7d8b..f239f2a585 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -113,16 +113,16 @@ StorageEngine::StorageEngine(const EngineOptions& options) _index_stream_lru_cache(nullptr), _file_cache(nullptr), _compaction_mem_tracker( - std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::AutoCompaction")), + std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::AutoCompaction")), _segment_meta_mem_tracker(std::make_unique<MemTracker>("StorageEngine::SegmentMeta")), _schema_change_mem_tracker( - std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::SchemaChange")), - _clone_mem_tracker(std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Clone")), + std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::SchemaChange")), + _clone_mem_tracker(std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Clone")), _batch_load_mem_tracker( - std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::BatchLoad")), + std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::BatchLoad")), _consistency_mem_tracker( - std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Consistency")), - _mem_tracker(std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Self")), + std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Consistency")), + _mem_tracker(std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Self")), _stop_background_threads_latch(1), _tablet_manager(new TabletManager(config::tablet_map_shard_size)), _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)), @@ -168,7 +168,7 @@ void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) { std::vector<std::thread> threads; for (auto data_dir : data_dirs) { threads.emplace_back([this, data_dir] { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); auto res = data_dir->load(); if (!res.ok()) { LOG(WARNING) << "io error when init load tables. res=" << res @@ -220,7 +220,7 @@ Status StorageEngine::_init_store_map() { _tablet_manager.get(), _txn_manager.get()); tmp_stores.emplace_back(store); threads.emplace_back([this, store, &error_msg_lock, &error_msg]() { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); auto st = store->init(); if (!st.ok()) { { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index d508a617c5..a46218e442 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -179,12 +179,16 @@ public: Status get_compaction_status_json(std::string* result); - MemTrackerLimiter* compaction_mem_tracker() { return _compaction_mem_tracker.get(); } + std::shared_ptr<MemTrackerLimiter> compaction_mem_tracker() { return _compaction_mem_tracker; } MemTracker* segment_meta_mem_tracker() { return _segment_meta_mem_tracker.get(); } - MemTrackerLimiter* schema_change_mem_tracker() { return _schema_change_mem_tracker.get(); } - MemTrackerLimiter* clone_mem_tracker() { return _clone_mem_tracker.get(); } - MemTrackerLimiter* batch_load_mem_tracker() { return _batch_load_mem_tracker.get(); } - MemTrackerLimiter* consistency_mem_tracker() { return _consistency_mem_tracker.get(); } + std::shared_ptr<MemTrackerLimiter> schema_change_mem_tracker() { + return _schema_change_mem_tracker; + } + std::shared_ptr<MemTrackerLimiter> clone_mem_tracker() { return _clone_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> batch_load_mem_tracker() { return _batch_load_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> consistency_mem_tracker() { + return _consistency_mem_tracker; + } // check cumulative compaction config void check_cumulative_compaction_config(); @@ -333,21 +337,21 @@ private: std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets; // Count the memory consumption of all Base and Cumulative tasks. - std::unique_ptr<MemTrackerLimiter> _compaction_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _compaction_mem_tracker; // This mem tracker is only for tracking memory use by segment meta data such as footer or index page. // The memory consumed by querying is tracked in segment iterator. std::unique_ptr<MemTracker> _segment_meta_mem_tracker; // Count the memory consumption of all SchemaChange tasks. - std::unique_ptr<MemTrackerLimiter> _schema_change_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _schema_change_mem_tracker; // Count the memory consumption of all EngineCloneTask. // Note: Memory that does not contain make/release snapshots. - std::unique_ptr<MemTrackerLimiter> _clone_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _clone_mem_tracker; // Count the memory consumption of all EngineBatchLoadTask. - std::unique_ptr<MemTrackerLimiter> _batch_load_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _batch_load_mem_tracker; // Count the memory consumption of all EngineChecksumTask. - std::unique_ptr<MemTrackerLimiter> _consistency_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _consistency_mem_tracker; // StorageEngine oneself - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; CountDownLatch _stop_background_threads_latch; scoped_refptr<Thread> _unused_rowset_monitor_thread; diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index ce34cac4d3..7689df96b3 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -25,7 +25,7 @@ namespace doris { EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) : _alter_tablet_req(request) { - _mem_tracker = std::make_unique<MemTrackerLimiter>( + _mem_tracker = std::make_shared<MemTrackerLimiter>( config::memory_limitation_per_thread_for_schema_change_bytes, fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", std::to_string(_alter_tablet_req.base_tablet_id), @@ -34,7 +34,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) } Status EngineAlterTabletTask::execute() { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); DorisMetrics::instance()->create_rollup_requests_total->increment(1); Status res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req); diff --git a/be/src/olap/task/engine_alter_tablet_task.h b/be/src/olap/task/engine_alter_tablet_task.h index b6a736b357..1054c55eec 100644 --- a/be/src/olap/task/engine_alter_tablet_task.h +++ b/be/src/olap/task/engine_alter_tablet_task.h @@ -36,7 +36,7 @@ public: private: const TAlterTabletReqV2& _alter_tablet_req; - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; }; // EngineTask } // namespace doris diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index e98ad02471..c478adcd8c 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -53,7 +53,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet _signature(signature), _res_status(res_status) { _download_status = Status::OK(); - _mem_tracker = std::make_unique<MemTrackerLimiter>( + _mem_tracker = std::make_shared<MemTrackerLimiter>( -1, fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", _push_req.push_type, std::to_string(_push_req.tablet_id)), @@ -63,7 +63,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet EngineBatchLoadTask::~EngineBatchLoadTask() {} Status EngineBatchLoadTask::execute() { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); Status status = Status::OK(); if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_V2) { status = _init(); diff --git a/be/src/olap/task/engine_batch_load_task.h b/be/src/olap/task/engine_batch_load_task.h index f0bf0dba0d..e2aba71f00 100644 --- a/be/src/olap/task/engine_batch_load_task.h +++ b/be/src/olap/task/engine_batch_load_task.h @@ -76,7 +76,7 @@ private: Status* _res_status; std::string _remote_file_path; std::string _local_file_path; - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; }; // class EngineBatchLoadTask } // namespace doris #endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index f5d5f317f1..fc17c2fad3 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -26,13 +26,13 @@ namespace doris { EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash, TVersion version, uint32_t* checksum) : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { - _mem_tracker = std::make_unique<MemTrackerLimiter>( + _mem_tracker = std::make_shared<MemTrackerLimiter>( -1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id), StorageEngine::instance()->consistency_mem_tracker()); } Status EngineChecksumTask::execute() { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); return _compute_checksum(); } // execute diff --git a/be/src/olap/task/engine_checksum_task.h b/be/src/olap/task/engine_checksum_task.h index 233c6e84f2..04afa1a5cd 100644 --- a/be/src/olap/task/engine_checksum_task.h +++ b/be/src/olap/task/engine_checksum_task.h @@ -44,7 +44,7 @@ private: TSchemaHash _schema_hash; TVersion _version; uint32_t* _checksum; - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; }; // EngineTask } // namespace doris diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 73466cb72e..429a3d34be 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -57,14 +57,14 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo& _res_status(res_status), _signature(signature), _master_info(master_info) { - _mem_tracker = std::make_unique<MemTrackerLimiter>( + _mem_tracker = std::make_shared<MemTrackerLimiter>( -1, "EngineCloneTask#tabletId=" + std::to_string(_clone_req.tablet_id), StorageEngine::instance()->clone_mem_tracker()); } Status EngineCloneTask::execute() { // register the tablet to avoid it is deleted by gc thread during clone process - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id); Status st = _do_clone(); StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id); diff --git a/be/src/olap/task/engine_clone_task.h b/be/src/olap/task/engine_clone_task.h index ccb8a19581..90c4b43596 100644 --- a/be/src/olap/task/engine_clone_task.h +++ b/be/src/olap/task/engine_clone_task.h @@ -79,7 +79,7 @@ private: const TMasterInfo& _master_info; int64_t _copy_size; int64_t _copy_time_ms; - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; }; // EngineTask } // namespace doris diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp index 3e519f8987..b0d1dbd8f2 100644 --- a/be/src/runtime/data_stream_mgr.cpp +++ b/be/src/runtime/data_stream_mgr.cpp @@ -72,9 +72,9 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr( DCHECK(profile != nullptr); VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id << ", node=" << dest_node_id; - shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr( - this, row_desc, state->query_mem_tracker(), fragment_instance_id, dest_node_id, - num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr)); + shared_ptr<DataStreamRecvr> recvr( + new DataStreamRecvr(this, row_desc, fragment_instance_id, dest_node_id, num_senders, + is_merging, buffer_size, profile, sub_plan_query_statistics_recvr)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); lock_guard<mutex> l(_lock); _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index 0d456ca74a..b7988178fb 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -447,9 +447,8 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) { DataStreamRecvr::DataStreamRecvr( DataStreamMgr* stream_mgr, const RowDescriptor& row_desc, - MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, - RuntimeProfile* profile, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + bool is_merging, int total_buffer_limit, RuntimeProfile* profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr) : _mgr(stream_mgr), _fragment_instance_id(fragment_instance_id), diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index 31af9f2e37..efb036b5dd 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -116,9 +116,8 @@ private: class SenderQueue; DataStreamRecvr(DataStreamMgr* stream_mgr, const RowDescriptor& row_desc, - MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, bool is_merging, - int total_buffer_limit, RuntimeProfile* profile, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + bool is_merging, int total_buffer_limit, RuntimeProfile* profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr); // If receive queue is full, done is enqueue pending, and return with *done is nullptr diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index d879b417e0..f708f4b410 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -113,10 +113,12 @@ public: return nullptr; } - MemTrackerLimiter* process_mem_tracker() { return _process_mem_tracker; } - void set_process_mem_tracker(MemTrackerLimiter* tracker) { _process_mem_tracker = tracker; } - MemTrackerLimiter* query_pool_mem_tracker() { return _query_pool_mem_tracker; } - MemTrackerLimiter* load_pool_mem_tracker() { return _load_pool_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> process_mem_tracker() { return _process_mem_tracker; } + void set_process_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) { + _process_mem_tracker = tracker; + } + std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; } MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } ThreadResourceMgr* thread_mgr() { return _thread_mgr; } PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; } @@ -184,11 +186,11 @@ private: // The ancestor for all trackers. Every tracker is visible from the process down. // Not limit total memory by process tracker, and it's just used to track virtual memory of process. - MemTrackerLimiter* _process_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _process_mem_tracker; // The ancestor for all querys tracker. - MemTrackerLimiter* _query_pool_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker; // The ancestor for all load tracker. - MemTrackerLimiter* _load_pool_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _load_pool_mem_tracker; MemTrackerTaskPool* _task_pool_mem_tracker_registry; // The following two thread pools are used in different scenarios. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index afecca4159..e9074af172 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -193,7 +193,8 @@ Status ExecEnv::_init_mem_tracker() { << ". Using physical memory instead"; global_memory_limit_bytes = MemInfo::physical_mem(); } - _process_mem_tracker = new MemTrackerLimiter(global_memory_limit_bytes, "Process"); + _process_mem_tracker = + std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process"); thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ @@ -203,10 +204,12 @@ Status ExecEnv::_init_mem_tracker() { } #endif - _query_pool_mem_tracker = new MemTrackerLimiter(-1, "QueryPool", _process_mem_tracker); + _query_pool_mem_tracker = + std::make_shared<MemTrackerLimiter>(-1, "QueryPool", _process_mem_tracker); REGISTER_HOOK_METRIC(query_mem_consumption, [this]() { return _query_pool_mem_tracker->consumption(); }); - _load_pool_mem_tracker = new MemTrackerLimiter(-1, "LoadPool", _process_mem_tracker); + _load_pool_mem_tracker = + std::make_shared<MemTrackerLimiter>(-1, "LoadPool", _process_mem_tracker); REGISTER_HOOK_METRIC(load_mem_consumption, [this]() { return _load_pool_mem_tracker->consumption(); }); LOG(INFO) << "Using global memory limit: " @@ -363,9 +366,6 @@ void ExecEnv::_destroy() { SAFE_DELETE(_routine_load_task_executor); SAFE_DELETE(_external_scan_context_mgr); SAFE_DELETE(_heartbeat_flags); - SAFE_DELETE(_process_mem_tracker); - SAFE_DELETE(_query_pool_mem_tracker); - SAFE_DELETE(_load_pool_mem_tracker); SAFE_DELETE(_task_pool_mem_tracker_registry); SAFE_DELETE(_buffer_reservation); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index e1fc0a3463..56b21b7cef 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -25,11 +25,11 @@ namespace doris { -LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTrackerLimiter> mem_tracker, +LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>& mem_tracker, int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, bool is_vec) : _load_id(load_id), - _mem_tracker(std::move(mem_tracker)), + _mem_tracker(mem_tracker), _timeout_s(timeout_s), _is_high_priority(is_high_priority), _sender_ip(sender_ip), @@ -60,7 +60,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } else { // create a new tablets channel TabletsChannelKey key(params.id(), index_id); - channel.reset(new TabletsChannel(key, _mem_tracker.get(), _is_high_priority, _is_vec)); + channel.reset(new TabletsChannel(key, _mem_tracker, _is_high_priority, _is_vec)); _tablets_channels.insert({index_id, channel}); } } diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index ad8a476fcf..9647528304 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -39,7 +39,7 @@ class Cache; // corresponding to a certain load job class LoadChannel { public: - LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTrackerLimiter> mem_tracker, + LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>& mem_tracker, int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, bool is_vec); ~LoadChannel(); @@ -99,7 +99,7 @@ private: UniqueId _load_id; // Tracks the total memory consumed by current load job on this BE - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; // lock protect the tablets channel map std::mutex _lock; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 7429c09c6e..c252fe2aff 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -84,7 +84,7 @@ LoadChannelMgr::~LoadChannelMgr() { Status LoadChannelMgr::init(int64_t process_mem_limit) { int64_t load_mgr_mem_limit = calc_process_max_load_memory(process_mem_limit); - _mem_tracker = std::make_unique<MemTrackerLimiter>(load_mgr_mem_limit, "LoadChannelMgr"); + _mem_tracker = std::make_shared<MemTrackerLimiter>(load_mgr_mem_limit, "LoadChannelMgr"); REGISTER_HOOK_METRIC(load_channel_mem_consumption, [this]() { return _mem_tracker->consumption(); }); _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024); @@ -110,13 +110,13 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1; int64_t channel_mem_limit = calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit()); - auto channel_mem_tracker = std::make_unique<MemTrackerLimiter>( + auto channel_mem_tracker = std::make_shared<MemTrackerLimiter>( channel_mem_limit, fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string()), - _mem_tracker.get()); - channel.reset(new LoadChannel(load_id, std::move(channel_mem_tracker), - channel_timeout_s, is_high_priority, params.sender_ip(), + _mem_tracker); + channel.reset(new LoadChannel(load_id, channel_mem_tracker, channel_timeout_s, + is_high_priority, params.sender_ip(), params.is_vectorized())); _load_channels.insert({load_id, channel}); } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 0e46d8bf52..af9d3d6240 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -78,7 +78,7 @@ protected: Cache* _last_success_channel = nullptr; // check the total load channel mem consumption of this Backend - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; CountDownLatch _stop_background_threads_latch; // thread to clean timeout load channels diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 2223fce5a5..6a37b1df86 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -19,6 +19,8 @@ #include <fmt/format.h> +#include <boost/stacktrace.hpp> + #include "gutil/once.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" @@ -29,7 +31,8 @@ namespace doris { MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& label, - MemTrackerLimiter* parent, RuntimeProfile* profile) + const std::shared_ptr<MemTrackerLimiter>& parent, + RuntimeProfile* profile) : MemTracker(label, profile, true) { DCHECK_GE(byte_limit, -1); _limit = byte_limit; @@ -42,32 +45,28 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe while (tracker != nullptr) { _all_ancestors.push_back(tracker); if (tracker->has_limit()) _limited_ancestors.push_back(tracker); - tracker = tracker->_parent; + tracker = tracker->_parent.get(); } DCHECK_GT(_all_ancestors.size(), 0); DCHECK_EQ(_all_ancestors[0], this); - if (_parent) _parent->add_child(this); + if (_parent) { + std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock); + _child_tracker_it = _parent->_child_tracker_limiters.insert( + _parent->_child_tracker_limiters.end(), this); + _had_child_count++; + } } MemTrackerLimiter::~MemTrackerLimiter() { // TCMalloc hook will be triggered during destructor memtracker, may cause crash. if (_label == "Process") doris::thread_context_ptr._init = false; DCHECK(remain_child_count() == 0 || _label == "Process"); - if (_parent) _parent->remove_child(this); -} - -void MemTrackerLimiter::add_child(MemTrackerLimiter* tracker) { - std::lock_guard<std::mutex> l(_child_tracker_limiter_lock); - tracker->_child_tracker_it = - _child_tracker_limiters.insert(_child_tracker_limiters.end(), tracker); - _had_child_count++; -} - -void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) { - std::lock_guard<std::mutex> l(_child_tracker_limiter_lock); - if (tracker->_child_tracker_it != _child_tracker_limiters.end()) { - _child_tracker_limiters.erase(tracker->_child_tracker_it); - tracker->_child_tracker_it = _child_tracker_limiters.end(); + if (_parent) { + std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock); + if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { + _parent->_child_tracker_limiters.erase(_child_tracker_it); + _child_tracker_it = _parent->_child_tracker_limiters.end(); + } } } @@ -221,13 +220,14 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& details, int64_t failed_allocation_size, Status failed_alloc) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); - MemTrackerLimiter* process_tracker = ExecEnv::GetInstance()->process_mem_tracker(); std::string detail = "Memory exceed limit. fragment={}, details={}, on backend={}. Memory left in process " "limit={}."; - detail = fmt::format(detail, state != nullptr ? print_id(state->fragment_instance_id()) : "", - details, BackendOptions::get_localhost(), - PrettyPrinter::print(process_tracker->spare_capacity(), TUnit::BYTES)); + detail = fmt::format( + detail, state != nullptr ? print_id(state->fragment_instance_id()) : "", details, + BackendOptions::get_localhost(), + PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(), + TUnit::BYTES)); if (!failed_alloc) { detail += " failed alloc=<{}>. current tracker={}."; detail = fmt::format(detail, failed_alloc.to_string(), _label); @@ -240,13 +240,15 @@ Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::str Status status = Status::MemoryLimitExceeded(detail); if (state != nullptr) state->log_error(detail); + detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace()); // only print the tracker log_usage in be log. - if (process_tracker->spare_capacity() < failed_allocation_size) { + if (ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity() < failed_allocation_size) { // Dumping the process MemTracker is expensive. Limiting the recursive depth to two // levels limits the level of detail to a one-line summary for each query MemTracker. - detail += "\n" + process_tracker->log_usage(2); + detail += "\n" + ExecEnv::GetInstance()->process_mem_tracker()->log_usage(2); + } else { + detail += "\n" + log_usage(); } - detail += "\n" + log_usage(); LOG(WARNING) << detail; return status; diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 5c41ce7cda..c85205b2c0 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -42,8 +42,10 @@ class RuntimeState; class MemTrackerLimiter final : public MemTracker { public: // Creates and adds the tracker limiter to the tree - MemTrackerLimiter(int64_t byte_limit = -1, const std::string& label = std::string(), - MemTrackerLimiter* parent = nullptr, RuntimeProfile* profile = nullptr); + MemTrackerLimiter( + int64_t byte_limit = -1, const std::string& label = std::string(), + const std::shared_ptr<MemTrackerLimiter>& parent = std::shared_ptr<MemTrackerLimiter>(), + RuntimeProfile* profile = nullptr); // If the final consumption is not as expected, this usually means that the same memory is calling // consume and release on different trackers. If the two trackers have a parent-child relationship, @@ -51,10 +53,7 @@ public: // no parent-child relationship, the two tracker consumptions are wrong. ~MemTrackerLimiter(); - MemTrackerLimiter* parent() const { return _parent; } - - void add_child(MemTrackerLimiter* tracker); - void remove_child(MemTrackerLimiter* tracker); + std::shared_ptr<MemTrackerLimiter> parent() const { return _parent; } size_t remain_child_count() const { return _child_tracker_limiters.size(); } size_t had_child_count() const { return _had_child_count; } @@ -187,7 +186,7 @@ private: // Group number in MemTracker::mem_tracker_pool, generated by the timestamp. int64_t _group_num; - MemTrackerLimiter* _parent; // The parent of this tracker. + std::shared_ptr<MemTrackerLimiter> _parent; // The parent of this tracker. // this tracker limiter plus all of its ancestors std::vector<MemTrackerLimiter*> _all_ancestors; @@ -199,13 +198,12 @@ private: // update that of its children). mutable std::mutex _child_tracker_limiter_lock; std::list<MemTrackerLimiter*> _child_tracker_limiters; + // Iterator into parent_->_child_tracker_limiters for this object. Stored to have O(1) remove. + std::list<MemTrackerLimiter*>::iterator _child_tracker_it; // The number of child trackers that have been added. std::atomic_size_t _had_child_count = 0; - // Iterator into parent_->_child_tracker_limiters for this object. Stored to have O(1) remove. - std::list<MemTrackerLimiter*>::iterator _child_tracker_it; - // Lock to protect gc_memory(). This prevents many GCs from occurring at once. std::mutex _gc_lock; // Functions to call after the limit is reached to free memory. diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 24a8c95180..3c775db5ec 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -23,47 +23,49 @@ namespace doris { -MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id, - int64_t mem_limit, - const std::string& label, - MemTrackerLimiter* parent) { +std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_task_mem_tracker_impl( + const std::string& task_id, int64_t mem_limit, const std::string& label, + const std::shared_ptr<MemTrackerLimiter>& parent) { DCHECK(!task_id.empty()); // First time this task_id registered, make a new object, otherwise do nothing. // Combine new tracker and emplace into one operation to avoid the use of locks // Name for task MemTrackers. '$0' is replaced with the task id. + std::shared_ptr<MemTrackerLimiter> tracker; bool new_emplace = _task_mem_trackers.lazy_emplace_l( - task_id, [&](std::shared_ptr<MemTrackerLimiter>) {}, + task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) { tracker = v; }, [&](const auto& ctor) { - ctor(task_id, std::make_shared<MemTrackerLimiter>(mem_limit, label, parent)); + tracker = std::make_shared<MemTrackerLimiter>(mem_limit, label, parent); + ctor(task_id, tracker); }); if (new_emplace) { LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); } - return _task_mem_trackers[task_id].get(); + return tracker; } -MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id, - int64_t mem_limit) { +std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_mem_tracker( + const std::string& query_id, int64_t mem_limit) { return register_task_mem_tracker_impl(query_id, mem_limit, fmt::format("Query#queryId={}", query_id), ExecEnv::GetInstance()->query_pool_mem_tracker()); } -MemTrackerLimiter* MemTrackerTaskPool::register_load_mem_tracker(const std::string& load_id, - int64_t mem_limit) { +std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_mem_tracker( + const std::string& load_id, int64_t mem_limit) { // In load, the query id of the fragment is executed, which is the same as the load id of the load channel. return register_task_mem_tracker_impl(load_id, mem_limit, fmt::format("Load#queryId={}", load_id), ExecEnv::GetInstance()->load_pool_mem_tracker()); } -MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& task_id) { +std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::get_task_mem_tracker( + const std::string& task_id) { DCHECK(!task_id.empty()); - MemTrackerLimiter* tracker = nullptr; + std::shared_ptr<MemTrackerLimiter> tracker = nullptr; // Avoid using locks to resolve erase conflicts _task_mem_trackers.if_contains( - task_id, [&tracker](std::shared_ptr<MemTrackerLimiter> v) { tracker = v.get(); }); + task_id, [&tracker](const std::shared_ptr<MemTrackerLimiter>& v) { tracker = v; }); return tracker; } @@ -74,8 +76,8 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { // Unknown exception case with high concurrency, after _task_mem_trackers.erase, // the key still exists in _task_mem_trackers. https://github.com/apache/incubator-doris/issues/10006 expired_task_ids.emplace_back(it->first); - } else if (it->second->remain_child_count() == 0 && it->second->had_child_count() != 0) { - // No RuntimeState uses this task MemTracker, it is only referenced by this map, + } else if (it->second.use_count() == 1 && it->second->had_child_count() != 0) { + // No RuntimeState uses this task MemTrackerLimiter, it is only referenced by this map, // and tracker was not created soon, delete it. // // If consumption is not equal to 0 before query mem tracker is destructed, @@ -92,20 +94,12 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { it->second->parent()->consumption_revise(-it->second->consumption()); LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first; expired_task_ids.emplace_back(it->first); - } else { - // Log limit exceeded query tracker. - if (it->second->limit_exceeded()) { - it->second->mem_limit_exceeded( - nullptr, - fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first), - 0, Status::OK()); - } } } for (auto tid : expired_task_ids) { // Verify the condition again to make sure the tracker is not being used again. - _task_mem_trackers.erase_if(tid, [&](std::shared_ptr<MemTrackerLimiter> v) { - return !v || v->remain_child_count() == 0; + _task_mem_trackers.erase_if(tid, [&](const std::shared_ptr<MemTrackerLimiter>& v) { + return !v || v.use_count() == 1; }); } } diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h index 4890d72713..1958542fe9 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.h +++ b/be/src/runtime/memory/mem_tracker_task_pool.h @@ -32,26 +32,28 @@ using TaskTrackersMap = phmap::parallel_flat_hash_map< // Global task pool for query MemTrackers. Owned by ExecEnv. class MemTrackerTaskPool { public: - // Construct a MemTracker object for 'task_id' with 'mem_limit' as the memory limit. - // The MemTracker is a child of the pool MemTracker, Calling this with the same - // 'task_id' will return the same MemTracker object. This is used to track the local + // Construct a MemTrackerLimiter object for 'task_id' with 'mem_limit' as the memory limit. + // The MemTrackerLimiter is a child of the pool MemTrackerLimiter, Calling this with the same + // 'task_id' will return the same MemTrackerLimiter object. This is used to track the local // memory usage of all tasks executing. The first time this is called for a task, - // a new MemTracker object is created with the pool tracker as its parent. + // a new MemTrackerLimiter object is created with the pool tracker as its parent. // Newly created trackers will always have a limit of -1. - MemTrackerLimiter* register_task_mem_tracker_impl(const std::string& task_id, int64_t mem_limit, - const std::string& label, - MemTrackerLimiter* parent); - MemTrackerLimiter* register_query_mem_tracker(const std::string& query_id, int64_t mem_limit); - MemTrackerLimiter* register_load_mem_tracker(const std::string& load_id, int64_t mem_limit); + std::shared_ptr<MemTrackerLimiter> register_task_mem_tracker_impl( + const std::string& task_id, int64_t mem_limit, const std::string& label, + const std::shared_ptr<MemTrackerLimiter>& parent); + std::shared_ptr<MemTrackerLimiter> register_query_mem_tracker(const std::string& query_id, + int64_t mem_limit); + std::shared_ptr<MemTrackerLimiter> register_load_mem_tracker(const std::string& load_id, + int64_t mem_limit); - MemTrackerLimiter* get_task_mem_tracker(const std::string& task_id); + std::shared_ptr<MemTrackerLimiter> get_task_mem_tracker(const std::string& task_id); // Remove the mem tracker that has ended the query. void logout_task_mem_tracker(); private: - // All per-task MemTracker objects. - // The life cycle of task memtracker in the process is the same as task runtime state, + // All per-task MemTrackerLimiter objects. + // The life cycle of task MemTrackerLimiter in the process is the same as task runtime state, // MemTrackers will be removed from this map after query finish or cancel. TaskTrackersMap _task_mem_trackers; }; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 7841a7cb6a..b6f1ebde33 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -24,10 +24,10 @@ namespace doris { -void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg, - const std::string& task_id, - const TUniqueId& fragment_instance_id, - MemTrackerLimiter* mem_tracker) { +void ThreadMemTrackerMgr::attach_limiter_tracker( + const std::string& cancel_msg, const std::string& task_id, + const TUniqueId& fragment_instance_id, + const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { DCHECK(mem_tracker); flush_untracked_mem<false>(); _task_id = task_id; @@ -37,8 +37,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg, } void ThreadMemTrackerMgr::detach_limiter_tracker() { - // Do not flush untracked mem, instance executor thread may exit after instance fragment executor thread, - // `instance_mem_tracker` will be null pointer, which is not a graceful exit. + flush_untracked_mem<false>(); _task_id = ""; _fragment_instance_id = TUniqueId(); _exceed_cb.cancel_msg = ""; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 2d23388a81..d3940d1e50 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -77,7 +77,7 @@ public: // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_limiter_tracker(const std::string& cancel_msg, const std::string& task_id, const TUniqueId& fragment_instance_id, - MemTrackerLimiter* mem_tracker); + const std::shared_ptr<MemTrackerLimiter>& mem_tracker); void detach_limiter_tracker(); @@ -116,7 +116,7 @@ public: bool is_attach_task() { return _task_id != ""; } - MemTrackerLimiter* limiter_mem_tracker() { return _limiter_tracker; } + std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return _limiter_tracker; } void set_check_limit(bool check_limit) { _check_limit = check_limit; } void set_check_attach(bool check_attach) { _check_attach = check_attach; } @@ -145,7 +145,7 @@ private: // Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance. int64_t _untracked_mem = 0; - MemTrackerLimiter* _limiter_tracker; + std::shared_ptr<MemTrackerLimiter> _limiter_tracker; std::vector<MemTracker*> _consumer_tracker_stack; // If true, call memtracker try_consume, otherwise call consume. diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 4c3d114447..2d128c13e4 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -241,9 +241,10 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { print_id(query_id), bytes_limit); } else { DCHECK(false); + _query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker(); } - _instance_mem_tracker = std::make_unique<MemTrackerLimiter>( + _instance_mem_tracker = std::make_shared<MemTrackerLimiter>( bytes_limit, "RuntimeState:instance:" + print_id(_fragment_instance_id), _query_mem_tracker, &_profile); @@ -263,7 +264,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { Status RuntimeState::init_instance_mem_tracker() { _query_mem_tracker = nullptr; - _instance_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, "RuntimeState:instance"); + _instance_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "RuntimeState:instance"); return Status::OK(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f561adecf8..9ab470bf64 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -127,8 +127,8 @@ public: const TUniqueId& query_id() const { return _query_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } ExecEnv* exec_env() { return _exec_env; } - MemTrackerLimiter* query_mem_tracker() { return _query_mem_tracker; } - MemTrackerLimiter* instance_mem_tracker() { return _instance_mem_tracker.get(); } + std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return _query_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> instance_mem_tracker() { return _instance_mem_tracker; } ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; } void set_fragment_root_id(PlanNodeId id) { @@ -390,10 +390,10 @@ private: // MemTracker that is shared by all fragment instances running on this host. // The query mem tracker must be released after the _instance_mem_tracker. - MemTrackerLimiter* _query_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _query_mem_tracker; // Memory usage of this fragment instance - std::unique_ptr<MemTrackerLimiter> _instance_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _instance_mem_tracker; // put runtime state before _obj_pool, so that it will be deconstructed after // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc index de44701293..28d347462f 100644 --- a/be/src/runtime/sorted_run_merger.cc +++ b/be/src/runtime/sorted_run_merger.cc @@ -182,7 +182,7 @@ private: // signal of new batch or the eos/cancelled condition std::condition_variable _batch_prepared_cv; - void process_sorted_run_task(MemTrackerLimiter* mem_tracker) { + void process_sorted_run_task(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { SCOPED_ATTACH_TASK(mem_tracker, ThreadContext::TaskType::QUERY); std::unique_lock<std::mutex> lock(_mutex); while (true) { diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 043f4c7eab..0934214f89 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -30,14 +30,15 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count; -TabletsChannel::TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker, +TabletsChannel::TabletsChannel(const TabletsChannelKey& key, + const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_high_priority, bool is_vec) : _key(key), _state(kInitialized), _closed_senders(64), _is_high_priority(is_high_priority), _is_vec(is_vec) { - _mem_tracker = std::make_unique<MemTrackerLimiter>( + _mem_tracker = std::make_shared<MemTrackerLimiter>( -1, fmt::format("TabletsChannel#indexID={}", key.index_id), parent_tracker); static std::once_flag once_flag; std::call_once(once_flag, [] { @@ -240,7 +241,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request wrequest.ptable_schema_param = request.schema(); DeltaWriter* writer = nullptr; - auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker.get(), _is_vec); + auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker, _is_vec); if (!st.ok()) { std::stringstream ss; ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 318dd879a4..78ca9ae076 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -60,8 +60,9 @@ class OlapTableSchemaParam; // Write channel for a particular (load, index). class TabletsChannel { public: - TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker, - bool is_high_priority, bool is_vec); + TabletsChannel(const TabletsChannelKey& key, + const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_high_priority, + bool is_vec); ~TabletsChannel(); @@ -144,7 +145,7 @@ private: static std::atomic<uint64_t> _s_tablet_writer_count; - std::unique_ptr<MemTrackerLimiter> _mem_tracker; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; bool _is_high_priority = false; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 6071defe64..7a6968b30f 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -29,8 +29,9 @@ ThreadContextPtr::ThreadContextPtr() { _init = true; } -AttachTask::AttachTask(MemTrackerLimiter* mem_tracker, const ThreadContext::TaskType& type, - const std::string& task_id, const TUniqueId& fragment_instance_id) { +AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, + const ThreadContext::TaskType& type, const std::string& task_id, + const TUniqueId& fragment_instance_id) { DCHECK(mem_tracker); #ifdef USE_MEM_TRACKER thread_context()->attach_task(type, task_id, fragment_instance_id, mem_tracker); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 2c14929432..0acc7c6556 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -130,7 +130,8 @@ public: } void attach_task(const TaskType& type, const std::string& task_id, - const TUniqueId& fragment_instance_id, MemTrackerLimiter* mem_tracker) { + const TUniqueId& fragment_instance_id, + const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "") << ",new tracker label: " << mem_tracker->label() << ",old tracker label: " << _thread_mem_tracker_mgr->limiter_mem_tracker()->label(); @@ -195,7 +196,7 @@ static ThreadContext* thread_context() { class AttachTask { public: - explicit AttachTask(MemTrackerLimiter* mem_tracker, + explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, const ThreadContext::TaskType& type = ThreadContext::TaskType::UNKNOWN, const std::string& task_id = "", const TUniqueId& fragment_instance_id = TUniqueId()); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 7603beacff..15eb95b4b3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -118,20 +118,20 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_ const Status& extract_st) { std::string query_id; TUniqueId finst_id; - std::unique_ptr<MemTrackerLimiter> transmit_tracker; + std::shared_ptr<MemTrackerLimiter> transmit_tracker; if (request->has_query_id()) { query_id = print_id(request->query_id()); finst_id.__set_hi(request->finst_id().hi()); finst_id.__set_lo(request->finst_id().lo()); // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer. - transmit_tracker = std::make_unique<MemTrackerLimiter>( + transmit_tracker = std::make_shared<MemTrackerLimiter>( -1, fmt::format("QueryTransmit#queryId={}", query_id), _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id)); } else { query_id = "unkown_transmit_data"; - transmit_tracker = std::make_unique<MemTrackerLimiter>(-1, "unkown_transmit_data"); + transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data"); } - SCOPED_ATTACH_TASK(transmit_tracker.get(), ThreadContext::TaskType::QUERY, query_id, finst_id); + SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " query_id=" << query_id << " node=" << request->node_id(); // The response is accessed when done->Run is called in transmit_data(), @@ -649,20 +649,20 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl const Status& extract_st) { std::string query_id; TUniqueId finst_id; - std::unique_ptr<MemTrackerLimiter> transmit_tracker; + std::shared_ptr<MemTrackerLimiter> transmit_tracker; if (request->has_query_id()) { query_id = print_id(request->query_id()); finst_id.__set_hi(request->finst_id().hi()); finst_id.__set_lo(request->finst_id().lo()); // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer. - transmit_tracker = std::make_unique<MemTrackerLimiter>( + transmit_tracker = std::make_shared<MemTrackerLimiter>( -1, fmt::format("QueryTransmit#queryId={}", query_id), _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id)); } else { query_id = "unkown_transmit_block"; - transmit_tracker = std::make_unique<MemTrackerLimiter>(-1, "unkown_transmit_block"); + transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_block"); } - SCOPED_ATTACH_TASK(transmit_tracker.get(), ThreadContext::TaskType::QUERY, query_id, finst_id); + SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id()) << " query_id=" << query_id << " node=" << request->node_id(); // The response is accessed when done->Run is called in transmit_block(), diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index bee1fcf10c..511fbbe19d 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -53,8 +53,8 @@ std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr( VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id << ", node=" << dest_node_id; std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr( - this, row_desc, state->query_mem_tracker(), fragment_instance_id, dest_node_id, - num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr)); + this, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, + buffer_size, profile, sub_plan_query_statistics_recvr)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); std::lock_guard<std::mutex> l(_lock); _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 020f188cac..2fb7be3223 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -249,9 +249,8 @@ void VDataStreamRecvr::SenderQueue::close() { VDataStreamRecvr::VDataStreamRecvr( VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc, - MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, - RuntimeProfile* profile, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + bool is_merging, int total_buffer_limit, RuntimeProfile* profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr) : _mgr(stream_mgr), _fragment_instance_id(fragment_instance_id), diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 87024a917a..bedd18bbce 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -51,9 +51,9 @@ class VExprContext; class VDataStreamRecvr { public: VDataStreamRecvr(VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc, - MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, bool is_merging, - int total_buffer_limit, RuntimeProfile* profile, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, + int num_senders, bool is_merging, int total_buffer_limit, + RuntimeProfile* profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr); ~VDataStreamRecvr(); diff --git a/be/test/runtime/mem_limit_test.cpp b/be/test/runtime/mem_limit_test.cpp index b0a298ff8a..eeadb41b28 100644 --- a/be/test/runtime/mem_limit_test.cpp +++ b/be/test/runtime/mem_limit_test.cpp @@ -53,9 +53,9 @@ TEST(MemTestTest, SingleTrackerWithLimit) { } TEST(MemTestTest, TrackerHierarchy) { - auto p = std::make_unique<MemTrackerLimiter>(100); - auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p.get()); - auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p.get()); + auto p = std::make_shared<MemTrackerLimiter>(100); + auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p); + auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p); // everything below limits c1->consume(60); @@ -96,9 +96,9 @@ TEST(MemTestTest, TrackerHierarchy) { } TEST(MemTestTest, TrackerHierarchyTryConsume) { - auto p = std::make_unique<MemTrackerLimiter>(100); - auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p.get()); - auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p.get()); + auto p = std::make_shared<MemTrackerLimiter>(100); + auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p); + auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p); // everything below limits bool consumption = c1->try_consume(60).ok(); diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index b84cc6c375..0f6579ec9e 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -29,7 +29,8 @@ #include "util/mem_info.h" int main(int argc, char** argv) { - doris::MemTrackerLimiter* process_mem_tracker = new doris::MemTrackerLimiter(-1, "Process"); + std::shared_ptr<doris::MemTrackerLimiter> process_mem_tracker = + std::make_shared<doris::MemTrackerLimiter>(-1, "Process"); doris::ExecEnv::GetInstance()->set_process_mem_tracker(process_mem_tracker); doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::StoragePageCache::create_global_cache(1 << 30, 10); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org