This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new cf05284da6 [cherrypick](branch1.2) Pick memory manager related fixes (#16184) cf05284da6 is described below commit cf05284da64d5c8a05f6b491e00cedb4cd99091d Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon Jan 30 14:47:14 2023 +0800 [cherrypick](branch1.2) Pick memory manager related fixes (#16184) cherry pick #15716 #16083 #16084 --- be/src/common/config.h | 6 +- be/src/common/daemon.cpp | 42 +++++++--- be/src/exec/analytic_eval_node.cpp | 3 - be/src/exec/blocking_join_node.cpp | 1 - be/src/exec/broker_scan_node.cpp | 1 - be/src/exec/es_http_scan_node.cpp | 1 - be/src/exec/exec_node.cpp | 6 +- be/src/exec/exec_node.h | 6 +- be/src/exec/hash_join_node.cpp | 1 - be/src/exec/olap_scan_node.cpp | 2 - be/src/exec/set_operation_node.cpp | 2 - be/src/exec/spill_sort_node.cc | 3 - be/src/exec/table_function_node.cpp | 3 - be/src/exec/topn_node.cpp | 3 - be/src/exec/union_node.cpp | 3 - be/src/http/default_path_handlers.cpp | 6 +- be/src/olap/delta_writer.cpp | 4 +- be/src/olap/memtable.cpp | 4 +- be/src/olap/olap_server.cpp | 10 +-- be/src/olap/push_handler.cpp | 4 +- be/src/olap/storage_engine.cpp | 7 +- be/src/olap/storage_engine.h | 2 - be/src/olap/tablet_manager.cpp | 11 ++- be/src/olap/tablet_manager.h | 1 + be/src/olap/task/engine_batch_load_task.cpp | 2 +- be/src/olap/task/engine_checksum_task.cpp | 2 +- be/src/runtime/data_stream_recvr.cc | 5 +- be/src/runtime/data_stream_sender.cpp | 3 +- be/src/runtime/exec_env.h | 7 +- be/src/runtime/exec_env_init.cpp | 12 ++- be/src/runtime/load_channel_mgr.cpp | 9 +- be/src/runtime/load_channel_mgr.h | 27 +++--- be/src/runtime/memory/mem_tracker.cpp | 13 ++- be/src/runtime/memory/mem_tracker.h | 9 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 104 +++++++++++++++--------- be/src/runtime/memory/mem_tracker_limiter.h | 42 ++++++---- be/src/runtime/plan_fragment_executor.cpp | 1 - be/src/runtime/runtime_filter_mgr.cpp | 6 +- be/src/runtime/runtime_state.cpp | 5 +- be/src/runtime/runtime_state.h | 8 -- be/src/util/mem_info.cpp | 59 ++++++++++---- be/src/util/mem_info.h | 10 ++- be/src/vec/exec/join/vhash_join_node.cpp | 2 - be/src/vec/exec/join/vjoin_node_base.cpp | 2 - be/src/vec/exec/join/vnested_loop_join_node.cpp | 3 - be/src/vec/exec/scan/new_es_scan_node.cpp | 1 - be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 1 - be/src/vec/exec/scan/new_odbc_scan_node.cpp | 1 - be/src/vec/exec/scan/new_olap_scan_node.cpp | 1 - be/src/vec/exec/scan/scanner_scheduler.cpp | 1 - be/src/vec/exec/scan/vscan_node.cpp | 5 -- be/src/vec/exec/vaggregation_node.cpp | 3 - be/src/vec/exec/vanalytic_eval_node.cpp | 3 - be/src/vec/exec/vbroker_scan_node.cpp | 4 - be/src/vec/exec/vexchange_node.cpp | 3 - be/src/vec/exec/vmysql_scan_node.cpp | 2 - be/src/vec/exec/vschema_scan_node.cpp | 2 - be/src/vec/exec/vset_operation_node.cpp | 2 - be/src/vec/exec/vsort_node.cpp | 3 - be/src/vec/runtime/vdata_stream_recvr.cpp | 5 +- be/src/vec/sink/vdata_stream_sender.cpp | 3 +- be/test/testutil/run_all_tests.cpp | 5 +- 62 files changed, 275 insertions(+), 233 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 425005e021..8eb20699da 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -74,6 +74,10 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918"); // The size of the memory that gc wants to release each time, as a percentage of the mem limit. CONF_mString(process_minor_gc_size, "10%"); CONF_mString(process_full_gc_size, "20%"); +// Some caches have their own gc threads, such as segment cache. +// For caches that do not have a separate gc thread, perform regular gc in the memory maintenance thread. +// Currently only storage page cache, chunk allocator, more in the future. +CONF_mInt32(cache_gc_interval_s, "60"); // If true, when the process does not exceed the soft mem limit, the query memory will not be limited; // when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently @@ -505,7 +509,7 @@ CONF_String(buffer_pool_limit, "20%"); CONF_String(buffer_pool_clean_pages_limit, "50%"); // Sleep time in milliseconds between memory maintenance iterations -CONF_mInt64(memory_maintenance_sleep_time_ms, "500"); +CONF_mInt32(memory_maintenance_sleep_time_ms, "500"); // Sleep time in milliseconds between load channel memory refresh iterations CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100"); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 992b6528b3..149b2f1ce4 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -202,7 +202,9 @@ void Daemon::buffer_pool_gc_thread() { } void Daemon::memory_maintenance_thread() { - int64_t interval_milliseconds = config::memory_maintenance_sleep_time_ms; + int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms; + int32_t cache_gc_interval_ms = config::cache_gc_interval_s * 1000; + int64_t cache_gc_freed_mem = 0; while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(interval_milliseconds))) { if (!MemInfo::initialized()) { @@ -217,29 +219,49 @@ void Daemon::memory_maintenance_thread() { doris::MemInfo::refresh_allocator_mem(); #endif doris::MemInfo::refresh_proc_mem_no_allocator_cache(); - LOG_EVERY_N(INFO, 10) << MemTrackerLimiter::process_mem_log_str(); // Refresh mem tracker each type metrics. doris::MemTrackerLimiter::refresh_global_counter(); - if (doris::config::memory_debug) { - doris::MemTrackerLimiter::print_log_process_usage("memory_debug", false); - } - doris::MemTrackerLimiter::enable_print_log_process_usage(); // If system available memory is not enough, or the process memory exceeds the limit, reduce refresh interval. if (doris::MemInfo::sys_mem_available() < doris::MemInfo::sys_mem_available_low_water_mark() || doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::mem_limit()) { - interval_milliseconds = 100; - doris::MemInfo::process_full_gc(); + doris::MemTrackerLimiter::print_log_process_usage("process full gc", false); + interval_milliseconds = std::min(100, config::memory_maintenance_sleep_time_ms); + if (doris::MemInfo::process_full_gc()) { + // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc. + doris::MemTrackerLimiter::enable_print_log_process_usage(); + } + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; } else if (doris::MemInfo::sys_mem_available() < doris::MemInfo::sys_mem_available_warning_water_mark() || doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::soft_mem_limit()) { - interval_milliseconds = 200; - doris::MemInfo::process_minor_gc(); + doris::MemTrackerLimiter::print_log_process_usage("process minor gc", false); + interval_milliseconds = std::min(200, config::memory_maintenance_sleep_time_ms); + if (doris::MemInfo::process_minor_gc()) { + doris::MemTrackerLimiter::enable_print_log_process_usage(); + } + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; } else { + doris::MemTrackerLimiter::enable_print_log_process_usage(); interval_milliseconds = config::memory_maintenance_sleep_time_ms; + if (doris::config::memory_debug) { + LOG_EVERY_N(WARNING, 20) << doris::MemTrackerLimiter::log_process_usage_str( + "memory debug", false); // default 10s print once + } else { + LOG_EVERY_N(INFO, 10) + << MemTrackerLimiter::process_mem_log_str(); // default 5s print once + } + cache_gc_interval_ms -= interval_milliseconds; + if (cache_gc_interval_ms < 0) { + cache_gc_freed_mem = 0; + doris::MemInfo::process_cache_gc(cache_gc_freed_mem); + LOG(INFO) << fmt::format("Process regular GC Cache, Free Memory {} Bytes", + cache_gc_freed_mem); // default 6s print once + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; + } } } } diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index 19e6fe87c4..9818ddb433 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -142,7 +142,6 @@ Status AnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) { Status AnalyticEvalNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); DCHECK(child(0)->row_desc().is_prefix_of(row_desc())); _child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0]; _curr_tuple_pool.reset(new MemPool(mem_tracker())); @@ -186,7 +185,6 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) { Status AnalyticEvalNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); //RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->open(state)); @@ -814,7 +812,6 @@ inline int64_t AnalyticEvalNode::num_output_rows_ready() const { Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); //RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(state->check_query_state("Analytic eval, while get_next.")); diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index 0d366ab1d4..10419a3499 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -93,7 +93,6 @@ Status BlockingJoinNode::close(RuntimeState* state) { void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); status->set_value(construct_build_side(state)); } diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index a281d29c6e..f822c35bbb 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -376,7 +376,6 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, void BrokerScanNode::scanner_worker(int start_idx, int length) { SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); // Clone expr context std::vector<ExprContext*> scanner_expr_ctxs; auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs); diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index cea7068631..14d2911a0b 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -424,7 +424,6 @@ static std::string get_host_port(const std::vector<TNetworkAddress>& es_hosts) { void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) { SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); // Clone expr context std::vector<ExprContext*> scanner_expr_ctxs; DCHECK(start_idx < length); diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index d804533e88..d73888e0b8 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -220,9 +220,8 @@ Status ExecNode::prepare(RuntimeState* state) { std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker = std::make_shared<MemTracker>("ExecNode:" + _runtime_profile->name(), - _runtime_profile.get()); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(), + _runtime_profile.get(), nullptr, "PeakMemoryUsage"); if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, intermediate_row_desc())); @@ -244,7 +243,6 @@ Status ExecNode::prepare(RuntimeState* state) { } Status ExecNode::open(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state)); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 1de8f921a8..5ea19d8d81 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -195,7 +195,6 @@ public: RuntimeProfile::Counter* memory_used_counter() const { return _memory_used_counter; } MemTracker* mem_tracker() const { return _mem_tracker.get(); } - std::shared_ptr<MemTracker> mem_tracker_shared() const { return _mem_tracker; } OpentelemetrySpan get_next_span() { return _get_next_span; } @@ -299,8 +298,9 @@ protected: std::unique_ptr<RuntimeProfile> _runtime_profile; - /// Account for peak memory used by this node - std::shared_ptr<MemTracker> _mem_tracker; + // Record this node memory size. it is expected that artificial guarantees are accurate, + // which will providea reference for operator memory. + std::unique_ptr<MemTracker> _mem_tracker; RuntimeProfile::Counter* _rows_returned_counter; RuntimeProfile::Counter* _rows_returned_rate; diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 471801abca..73b3720c0b 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -184,7 +184,6 @@ Status HashJoinNode::close(RuntimeState* state) { void HashJoinNode::probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); status->set_value(child(0)->open(state)); } diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 35f3ef5023..3a88d7bd99 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1487,7 +1487,6 @@ Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) { void OlapScanNode::transfer_thread(RuntimeState* state) { // scanner open pushdown to scanThread SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); Status status = Status::OK(); for (auto scanner : _olap_scanners) { status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs()); @@ -1666,7 +1665,6 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } void OlapScanNode::scanner_thread(OlapScanner* scanner) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); Thread::set_self_name("olap_scanner"); if (UNLIKELY(_transfer_done)) { _scanner_done = true; diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 3458c57293..5ce2c947e1 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -42,7 +42,6 @@ Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) { Status SetOperationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); _build_pool.reset(new MemPool(mem_tracker())); @@ -130,7 +129,6 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* other) { Status SetOperationNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); // open result expr lists. for (const std::vector<ExprContext*>& exprs : _child_expr_lists) { diff --git a/be/src/exec/spill_sort_node.cc b/be/src/exec/spill_sort_node.cc index a40d889b0e..b74c9ba467 100644 --- a/be/src/exec/spill_sort_node.cc +++ b/be/src/exec/spill_sort_node.cc @@ -42,7 +42,6 @@ Status SpillSortNode::init(const TPlanNode& tnode, RuntimeState* state) { Status SpillSortNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); // AddExprCtxsToFree(_sort_exec_exprs); return Status::OK(); @@ -51,7 +50,6 @@ Status SpillSortNode::prepare(RuntimeState* state) { Status SpillSortNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_ERROR(_sort_exec_exprs.open(state)); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Spill sort, while open.")); @@ -81,7 +79,6 @@ Status SpillSortNode::open(RuntimeState* state) { Status SpillSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Spill sort, while getting next.")); diff --git a/be/src/exec/table_function_node.cpp b/be/src/exec/table_function_node.cpp index 1eb94aac0e..249d465b51 100644 --- a/be/src/exec/table_function_node.cpp +++ b/be/src/exec/table_function_node.cpp @@ -90,7 +90,6 @@ bool TableFunctionNode::_is_inner_and_empty() { Status TableFunctionNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", TUnit::UNIT); @@ -106,7 +105,6 @@ Status TableFunctionNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_ERROR(Expr::open(_fn_ctxs, state)); RETURN_IF_ERROR(vectorized::VExpr::open(_vfn_ctxs, state)); @@ -198,7 +196,6 @@ bool TableFunctionNode::_roll_table_functions(int last_eos_idx) { // And the inner loop is to expand the row by table functions, and output row by row. Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); const RowDescriptor& parent_rowdesc = row_batch->row_desc(); const RowDescriptor& child_rowdesc = _children[0]->row_desc(); diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index f5215b10d5..0a2ab6eceb 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -60,7 +60,6 @@ Status TopNNode::init(const TPlanNode& tnode, RuntimeState* state) { Status TopNNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _tuple_pool.reset(new MemPool(mem_tracker())); RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); // AddExprCtxsToFree(_sort_exec_exprs); @@ -77,7 +76,6 @@ Status TopNNode::prepare(RuntimeState* state) { Status TopNNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Top n, before open.")); RETURN_IF_ERROR(_sort_exec_exprs.open(state)); @@ -129,7 +127,6 @@ Status TopNNode::open(RuntimeState* state) { Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Top n, before moving result to row_batch.")); diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp index de5e7d7aec..a006fd53e7 100644 --- a/be/src/exec/union_node.cpp +++ b/be/src/exec/union_node.cpp @@ -69,7 +69,6 @@ Status UnionNode::init(const TPlanNode& tnode, RuntimeState* state) { Status UnionNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); _materialize_exprs_evaluate_timer = @@ -96,7 +95,6 @@ Status UnionNode::prepare(RuntimeState* state) { Status UnionNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // open const expr lists. for (const std::vector<ExprContext*>& exprs : _const_expr_lists) { RETURN_IF_ERROR(Expr::open(exprs, state)); @@ -234,7 +232,6 @@ Status UnionNode::get_next_const(RuntimeState* state, RowBatch* row_batch) { Status UnionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); if (_to_close_child_idx != -1) { diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 42a29d21e9..e0ddfbbb37 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -147,11 +147,9 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr MemTrackerLimiter::Type::SCHEMA_CHANGE); } else if (iter->second == "clone") { MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::CLONE); - } else if (iter->second == "batch_load") { - MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::BATCHLOAD); - } else if (iter->second == "consistency") { + } else if (iter->second == "experimental") { MemTrackerLimiter::make_type_snapshots(&snapshots, - MemTrackerLimiter::Type::CONSISTENCY); + MemTrackerLimiter::Type::EXPERIMENTAL); } } else { (*output) << "<h4>*Note: (see documentation for details)</h4>\n"; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 0a05a03006..ab5d955089 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -292,11 +292,11 @@ void DeltaWriter::_reset_mem_table() { auto mem_table_insert_tracker = std::make_shared<MemTracker>( fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()), - nullptr, ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); auto mem_table_flush_tracker = std::make_shared<MemTracker>( fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()), - nullptr, ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); #else auto mem_table_insert_tracker = std::make_shared<MemTracker>( fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 1f8465c0f3..adc57dfee7 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -56,8 +56,8 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t _cur_max_version(cur_max_version) { #ifndef BE_TEST _insert_mem_tracker_use_hook = std::make_unique<MemTracker>( - fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())), nullptr, - ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); + fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())), + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); #else _insert_mem_tracker_use_hook = std::make_unique<MemTracker>( fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id()))); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 6e39b0f093..db1fac59bb 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -124,20 +124,14 @@ Status StorageEngine::start_bg_threads() { scoped_refptr<Thread> path_scan_thread; RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_scan_thread", - [this, data_dir]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - this->_path_scan_thread_callback(data_dir); - }, + [this, data_dir]() { this->_path_scan_thread_callback(data_dir); }, &path_scan_thread)); _path_scan_threads.emplace_back(path_scan_thread); scoped_refptr<Thread> path_gc_thread; RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_gc_thread", - [this, data_dir]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - this->_path_gc_thread_callback(data_dir); - }, + [this, data_dir]() { this->_path_gc_thread_callback(data_dir); }, &path_gc_thread)); _path_gc_threads.emplace_back(path_gc_thread); } diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 6a33fbb548..b174bbb6ea 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -819,8 +819,8 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc } _runtime_profile = _runtime_state->runtime_profile(); _runtime_profile->set_name("PushBrokerReader"); - _mem_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get())); - _tuple_buffer_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get())); + _mem_pool.reset(new MemPool()); + _tuple_buffer_pool.reset(new MemPool()); _counter.reset(new ScannerCounter()); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index e73a5ba8c1..2ea8e20b7d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -107,7 +107,6 @@ StorageEngine::StorageEngine(const EngineOptions& options) _effective_cluster_id(-1), _is_all_cluster_id_exist(true), _stopped(false), - _mem_tracker(std::make_shared<MemTracker>("StorageEngine")), _segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")), _segment_meta_mem_tracker(std::make_shared<MemTracker>("SegmentMeta")), _stop_background_threads_latch(1), @@ -153,8 +152,7 @@ StorageEngine::~StorageEngine() { void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) { std::vector<std::thread> threads; for (auto data_dir : data_dirs) { - threads.emplace_back([this, data_dir] { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + threads.emplace_back([data_dir] { auto res = data_dir->load(); if (!res.ok()) { LOG(WARNING) << "io error when init load tables. res=" << res @@ -199,8 +197,7 @@ Status StorageEngine::_init_store_map() { DataDir* store = new DataDir(path.path, path.capacity_bytes, path.storage_medium, _tablet_manager.get(), _txn_manager.get()); tmp_stores.emplace_back(store); - threads.emplace_back([this, store, &error_msg_lock, &error_msg]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + threads.emplace_back([store, &error_msg_lock, &error_msg]() { auto st = store->init(); if (!st.ok()) { { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 49ff072a59..62ff7817ee 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -326,8 +326,6 @@ private: // map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we need custom hash func std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets; - // StorageEngine oneself - std::shared_ptr<MemTracker> _mem_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr<MemTracker> _segcompaction_mem_tracker; // This mem tracker is only for tracking memory use by segment meta data such as footer or index page. diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index c877a8d0ec..1a5183b080 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -73,7 +73,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE mem_consumption, Labels({{"type", "tablet_meta"}})); TabletManager::TabletManager(int32_t tablet_map_lock_shard_size) - : _mem_tracker(std::make_shared<MemTracker>("TabletManager")), + : _mem_tracker(std::make_shared<MemTracker>( + "TabletManager", ExecEnv::GetInstance()->experimental_mem_tracker())), + _tablet_meta_mem_tracker(std::make_shared<MemTracker>("TabletMeta")), _tablets_shards_size(tablet_map_lock_shard_size), _tablets_shards_mask(tablet_map_lock_shard_size - 1) { CHECK_GT(_tablets_shards_size, 0); @@ -206,6 +208,10 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, tablet_map_t& tablet_map = _get_tablet_map(tablet_id); tablet_map[tablet_id] = tablet; _add_tablet_to_partition(tablet); + // TODO: remove multiply 2 of tablet meta mem size + // Because table schema will copy in tablet, there will be double mem cost + // so here multiply 2 + _tablet_meta_mem_tracker->consume(tablet->tablet_meta()->mem_size() * 2); VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" << tablet_id; @@ -488,6 +494,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl } to_drop_tablet->deregister_tablet_from_dir(); + _tablet_meta_mem_tracker->release(to_drop_tablet->tablet_meta()->mem_size() * 2); return Status::OK(); } @@ -718,6 +725,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_ TSchemaHash schema_hash, const string& meta_binary, bool update_meta, bool force, bool restore, bool check_path) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); TabletMetaSharedPtr tablet_meta(new TabletMeta()); Status status = tablet_meta->deserialize(meta_binary); if (!status.ok()) { @@ -800,6 +808,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, SchemaHash schema_hash, const string& schema_hash_path, bool force, bool restore) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); LOG(INFO) << "begin to load tablet from dir. " << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash << " path = " << schema_hash_path << " force = " << force << " restore = " << restore; diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 1bd62381cf..75c29bdac7 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -202,6 +202,7 @@ private: // trace the memory use by meta of tablet std::shared_ptr<MemTracker> _mem_tracker; + std::shared_ptr<MemTracker> _tablet_meta_mem_tracker; const int32_t _tablets_shards_size; const int32_t _tablets_shards_mask; diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 59a9cf16bc..121ee0891c 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -49,7 +49,7 @@ namespace doris { EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTabletInfo>* tablet_infos) : _push_req(push_req), _tablet_infos(tablet_infos) { _mem_tracker = std::make_shared<MemTrackerLimiter>( - MemTrackerLimiter::Type::BATCHLOAD, + MemTrackerLimiter::Type::LOAD, fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", _push_req.push_type, std::to_string(_push_req.tablet_id))); } diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 8bb6eb1f9e..ac23770c49 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -27,7 +27,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h TVersion version, uint32_t* checksum) : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { _mem_tracker = std::make_shared<MemTrackerLimiter>( - MemTrackerLimiter::Type::CONSISTENCY, + MemTrackerLimiter::Type::LOAD, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id)); } diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index 59d46102b3..5ab627f317 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -455,8 +455,9 @@ DataStreamRecvr::DataStreamRecvr( _num_buffered_bytes(0), _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { - _mem_tracker = std::make_unique<MemTracker>( - "DataStreamRecvr:" + print_id(_fragment_instance_id), _profile); + _mem_tracker = + std::make_unique<MemTracker>("DataStreamRecvr:" + print_id(_fragment_instance_id), + _profile, nullptr, "PeakMemoryUsage"); // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index df1495b750..19d9a90ad4 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -411,7 +411,8 @@ Status DataStreamSender::prepare(RuntimeState* state) { _profile = _pool->add(new RuntimeProfile(title.str())); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker = std::make_unique<MemTracker>( - "DataStreamSender:" + print_id(state->fragment_instance_id()), _profile); + "DataStreamSender:" + print_id(state->fragment_instance_id()), _profile, nullptr, + "PeakMemoryUsage"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4bd9fd73af..a0a132cfc4 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -116,12 +116,10 @@ public: return nullptr; } - void set_orphan_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& orphan_tracker) { - _orphan_mem_tracker = orphan_tracker; - _orphan_mem_tracker_raw = orphan_tracker.get(); - } + void init_mem_tracker(); std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; } MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } + MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); } ThreadResourceMgr* thread_mgr() { return _thread_mgr; } PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; } PriorityThreadPool* remote_scan_thread_pool() { return _remote_scan_thread_pool; } @@ -208,6 +206,7 @@ private: // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker; MemTrackerLimiter* _orphan_mem_tracker_raw; + std::shared_ptr<MemTrackerLimiter> _experimental_mem_tracker; // The following two thread pools are used in different scenarios. // _scan_thread_pool is a priority thread pool. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 735d8091ed..a9a1eac159 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -176,9 +176,7 @@ Status ExecEnv::_init_mem_env() { bool is_percent = false; std::stringstream ss; // 1. init mem tracker - _orphan_mem_tracker = - std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan"); - _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); + init_mem_tracker(); thread_context()->thread_mem_tracker_mgr->init(); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) @@ -289,6 +287,14 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, int64_t capacity, _buffer_pool = new BufferPool(min_page_size, capacity, clean_pages_limit); } +void ExecEnv::init_mem_tracker() { + _orphan_mem_tracker = + std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan"); + _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); + _experimental_mem_tracker = std::make_shared<MemTrackerLimiter>( + MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet"); +} + void ExecEnv::init_download_cache_buf() { std::unique_ptr<char[]> download_cache_buf(new char[config::download_cache_buffer_size]); memset(download_cache_buf.get(), 0, config::download_cache_buffer_size); diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index b44fe23335..6470f56f70 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -76,9 +76,8 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) { // it's not quite helpfull to reduce memory pressure. // In this case we need to pick multiple load channels to reduce memory more effectively. _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1; - _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr"); - _mem_tracker_set = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, - "LoadChannelMgrTrackerSet"); + _mem_tracker = + std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, "LoadChannelMgr"); REGISTER_HOOK_METRIC(load_channel_mem_consumption, [this]() { return _mem_tracker->consumption(); }); _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024); @@ -106,7 +105,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { auto channel_mem_tracker = std::make_unique<MemTracker>( fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string()), - nullptr, ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set()); + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); #else auto channel_mem_tracker = std::make_unique<MemTracker>(fmt::format( "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string())); @@ -345,6 +344,8 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { if (_soft_reduce_mem_in_progress) { _soft_reduce_mem_in_progress = false; } + // refresh mem tacker to avoid duplicate reduce + _refresh_mem_tracker_without_lock(); } return; } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index beaaf29a91..7dc0c779d4 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -60,16 +60,10 @@ public: Status cancel(const PTabletWriterCancelRequest& request); void refresh_mem_tracker() { - int64_t mem_usage = 0; - { - std::lock_guard<std::mutex> l(_lock); - for (auto& kv : _load_channels) { - mem_usage += kv.second->mem_consumption(); - } - } - _mem_tracker->set_consumption(mem_usage); + std::lock_guard<std::mutex> l(_lock); + _refresh_mem_tracker_without_lock(); } - MemTrackerLimiter* mem_tracker_set() { return _mem_tracker_set.get(); } + MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); } private: template <typename Request> @@ -83,6 +77,16 @@ private: Status _start_bg_worker(); + // lock should be held when calling this method + void _refresh_mem_tracker_without_lock() { + _mem_usage = 0; + for (auto& kv : _load_channels) { + _mem_usage += kv.second->mem_consumption(); + } + THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), + _mem_tracker.get()); + } + protected: // lock protect the load channel map std::mutex _lock; @@ -91,9 +95,8 @@ protected: Cache* _last_success_channel = nullptr; // check the total load channel mem consumption of this Backend - std::unique_ptr<MemTracker> _mem_tracker; - // Associate load channel tracker and memtable tracker, avoid default association to Orphan tracker. - std::unique_ptr<MemTrackerLimiter> _mem_tracker_set; + int64_t _mem_usage = 0; + std::unique_ptr<MemTrackerLimiter> _mem_tracker; int64_t _load_hard_mem_limit = -1; int64_t _load_soft_mem_limit = -1; // By default, we try to reduce memory on the load channel with largest mem consumption, diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 34baeb9ff3..08582aafc6 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -41,7 +41,8 @@ struct TrackerGroup { // Multiple groups are used to reduce the impact of locks. static std::vector<TrackerGroup> mem_tracker_pool(1000); -MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent) +MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent, + const std::string& profile_counter_name) : _label(label) { if (profile == nullptr) { _consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES); @@ -57,7 +58,15 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTra // release(). _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); } + bind_parent(parent); +} + +MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _label(label) { + _consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES); + bind_parent(parent); +} +void MemTracker::bind_parent(MemTrackerLimiter* parent) { if (parent) { _parent_label = parent->label(); _parent_group_num = parent->group_num(); @@ -96,7 +105,7 @@ void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot int64_t group_num, std::string parent_label) { std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock); for (auto tracker : mem_tracker_pool[group_num].trackers) { - if (tracker->parent_label() == parent_label) { + if (tracker->parent_label() == parent_label && tracker->consumption() != 0) { snapshots->push_back(tracker->make_snapshot()); } } diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index bf468043e1..0b4e54bb10 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -44,8 +44,9 @@ public: }; // Creates and adds the tracker to the mem_tracker_pool. - MemTracker(const std::string& label, RuntimeProfile* profile = nullptr, - MemTrackerLimiter* parent = nullptr); + MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent, + const std::string& profile_counter_name); + MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr); // For MemTrackerLimiter MemTracker() { _parent_group_num = -1; } @@ -59,6 +60,7 @@ public: public: const std::string& label() const { return _label; } const std::string& parent_label() const { return _parent_label; } + const std::string& set_parent_label() const { return _parent_label; } // Returns the memory consumed in bytes. int64_t consumption() const { return _consumption->current_value(); } int64_t peak_consumption() const { return _consumption->value(); } @@ -88,6 +90,8 @@ public: static const std::string COUNTER_NAME; protected: + void bind_parent(MemTrackerLimiter* parent); + // label used in the make snapshot, not guaranteed unique. std::string _label; @@ -95,6 +99,7 @@ protected: // Tracker is located in group num in mem_tracker_pool int64_t _parent_group_num = 0; + // Use _parent_label to correlate with parent limiter tracker. std::string _parent_label = "-"; // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index aea4b5f806..1478a5704f 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -24,6 +24,7 @@ #include "gutil/once.h" #include "runtime/fragment_mgr.h" +#include "runtime/load_channel_mgr.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/pretty_printer.h" @@ -96,9 +97,8 @@ MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { void MemTrackerLimiter::refresh_global_counter() { std::unordered_map<Type, int64_t> type_mem_sum = { - {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0}, - {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0}, - {Type::BATCHLOAD, 0}, {Type::CONSISTENCY, 0}}; + {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0}, {Type::COMPACTION, 0}, + {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0}}; // No need refresh Type::EXPERIMENTAL for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) { std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock); for (auto tracker : mem_tracker_limiter_pool[i].trackers) { @@ -195,26 +195,35 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { } } +std::string MemTrackerLimiter::log_process_usage_str(const std::string& msg, bool with_stacktrace) { + std::string detail = msg; + detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); + if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); + std::vector<MemTracker::Snapshot> snapshots; + MemTrackerLimiter::make_process_snapshots(&snapshots); + MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); + + // Add additional tracker printed when memory exceeds limit. + snapshots.emplace_back( + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()->make_snapshot()); + + detail += "\nMemory Tracker Summary:"; + for (const auto& snapshot : snapshots) { + if (snapshot.label == "" && snapshot.parent_label == "") { + detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); + } else if (snapshot.parent_label == "") { + detail += "\n " + MemTrackerLimiter::log_usage(snapshot); + } else { + detail += "\n " + MemTracker::log_usage(snapshot); + } + } + return detail; +} + void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool with_stacktrace) { if (MemTrackerLimiter::_enable_print_log_process_usage) { MemTrackerLimiter::_enable_print_log_process_usage = false; - std::string detail = msg; - detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); - if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); - std::vector<MemTracker::Snapshot> snapshots; - MemTrackerLimiter::make_process_snapshots(&snapshots); - MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); - detail += "\nMemory Tracker Summary:"; - for (const auto& snapshot : snapshots) { - if (snapshot.label == "" && snapshot.parent_label == "") { - detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); - } else if (snapshot.parent_label == "") { - detail += "\n " + MemTrackerLimiter::log_usage(snapshot); - } else { - detail += "\n " + MemTracker::log_usage(snapshot); - } - } - LOG(WARNING) << detail; + LOG(WARNING) << log_process_usage_str(msg, with_stacktrace); } } @@ -239,7 +248,7 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const return Status::MemoryLimitExceeded(failed_msg); } -int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) { +int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, Type type) { std::priority_queue<std::pair<int64_t, std::string>, std::vector<std::pair<int64_t, std::string>>, std::greater<std::pair<int64_t, std::string>>> @@ -252,16 +261,21 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) { int64_t freed_mem = 0; while (!min_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + min_pq.pop(); + continue; + } ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - fmt::format("Process has no memory available, cancel top memory usage query: " - "query memory tracker <{}> consumption {}, backend {} " + fmt::format("Process has no memory available, cancel top memory usage {}: " + "{} memory tracker <{}> consumption {}, backend {} " "process memory used {} exceed limit {} or sys mem available {} " "less than low water mark {}. Execute again after enough memory, " "details see be.INFO.", - min_pq.top().second, print_bytes(min_pq.top().first), - BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), - MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(), + TypeString[type], TypeString[type], min_pq.top().second, + print_bytes(min_pq.top().first), BackendOptions::get_localhost(), + PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), + MemInfo::sys_mem_available_str(), print_bytes(MemInfo::sys_mem_available_low_water_mark()))); freed_mem += min_pq.top().first; @@ -270,7 +284,8 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) { min_pq.pop(); } if (!usage_strings.empty()) { - LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ","); + LOG(INFO) << "Process GC Free Top Memory Usage " << TypeString[type] << ": " + << join(usage_strings, ","); } return freed_mem; }; @@ -278,7 +293,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) { for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) { std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock); for (auto tracker : mem_tracker_limiter_pool[i].trackers) { - if (tracker->type() == Type::QUERY) { + if (tracker->type() == type) { if (tracker->consumption() > min_free_mem) { std::priority_queue<std::pair<int64_t, std::string>, std::vector<std::pair<int64_t, std::string>>, @@ -304,7 +319,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) { return cancel_top_query(min_pq); } -int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) { +int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, Type type) { std::priority_queue<std::pair<int64_t, std::string>, std::vector<std::pair<int64_t, std::string>>, std::greater<std::pair<int64_t, std::string>>> @@ -314,7 +329,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) { for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) { std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock); for (auto tracker : mem_tracker_limiter_pool[i].trackers) { - if (tracker->type() == Type::QUERY) { + if (tracker->type() == type) { int64_t overcommit_ratio = (static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000; if (overcommit_ratio == 0) { // Small query does not cancel @@ -326,6 +341,11 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) { } } + // Minor gc does not cancel when there is only one query. + if (query_consumption.size() <= 1) { + return 0; + } + std::priority_queue<std::pair<int64_t, std::string>> max_pq; // Min-heap to Max-heap. while (!min_pq.empty()) { @@ -337,18 +357,23 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) { int64_t freed_mem = 0; while (!max_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + max_pq.pop(); + continue; + } int64_t query_mem = query_consumption[max_pq.top().second]; ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - fmt::format("Process has no memory available, cancel top memory usage query: " - "query memory tracker <{}> consumption {}, backend {} " - "process memory used {} exceed limit {} or sys mem available {} " - "less than low water mark {}. Execute again after enough memory, " + fmt::format("Process has less memory, cancel top memory overcommit {}: " + "{} memory tracker <{}> consumption {}, backend {} " + "process memory used {} exceed soft limit {} or sys mem available {} " + "less than warning water mark {}. Execute again after enough memory, " "details see be.INFO.", - max_pq.top().second, print_bytes(query_mem), - BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), - MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(), - print_bytes(MemInfo::sys_mem_available_low_water_mark()))); + TypeString[type], TypeString[type], max_pq.top().second, + print_bytes(query_mem), BackendOptions::get_localhost(), + PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(), + MemInfo::sys_mem_available_str(), + print_bytes(MemInfo::sys_mem_available_warning_water_mark()))); usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}", max_pq.top().second, query_mem, max_pq.top().first)); @@ -359,7 +384,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) { max_pq.pop(); } if (!usage_strings.empty()) { - LOG(INFO) << "Process GC Free Top Memory Overcommit Query: " << join(usage_strings, ","); + LOG(INFO) << "Process GC Free Top Memory Overcommit " << TypeString[type] << ": " + << join(usage_strings, ","); } return freed_mem; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 617a7ffdee..0b66a5c7fb 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -46,8 +46,8 @@ public: COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. CLONE = 5, // Count the memory consumption of all EngineCloneTask. Note: Memory that does not contain make/release snapshots. - BATCHLOAD = 6, // Count the memory consumption of all EngineBatchLoadTask. - CONSISTENCY = 7 // Count the memory consumption of all EngineChecksumTask. + EXPERIMENTAL = + 6 // Experimental memory statistics, usually inaccurate, used for debugging, and expect to add other types in the future. }; inline static std::unordered_map<Type, std::shared_ptr<RuntimeProfile::HighWaterMarkCounter>> @@ -63,14 +63,11 @@ public: std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}, {Type::CLONE, std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}, - {Type::BATCHLOAD, - std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}, - {Type::CONSISTENCY, + {Type::EXPERIMENTAL, std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}}; - inline static const std::string TypeString[] = {"global", "query", "load", - "compaction", "schema_change", "clone", - "batch_load", "consistency"}; + inline static const std::string TypeString[] = { + "global", "query", "load", "compaction", "schema_change", "clone", "experimental"}; public: // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. @@ -107,7 +104,8 @@ public: int64_t limit() const { return _limit; } bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); } - Status check_limit(int64_t bytes); + Status check_limit(int64_t bytes = 0); + bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; } // Returns the maximum consumption that can be made without exceeding the limit on // this tracker limiter. @@ -137,6 +135,7 @@ public: void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } + static std::string log_process_usage_str(const std::string& msg, bool with_stacktrace = true); static void print_log_process_usage(const std::string& msg, bool with_stacktrace = true); // Log the memory usage when memory limit is exceeded. @@ -146,12 +145,21 @@ public: int64_t failed_allocation_size = 0); // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. - static int64_t free_top_memory_query(int64_t min_free_mem); + static int64_t free_top_memory_query(int64_t min_free_mem, Type type = Type::QUERY); + static int64_t free_top_memory_load(int64_t min_free_mem) { + return free_top_memory_query(min_free_mem, Type::LOAD); + } // Start canceling from the query with the largest memory overcommit ratio until the memory // of min_free_mem size is freed. - static int64_t free_top_overcommit_query(int64_t min_free_mem); + static int64_t free_top_overcommit_query(int64_t min_free_mem, Type type = Type::QUERY); + static int64_t free_top_overcommit_load(int64_t min_free_mem) { + return free_top_overcommit_query(min_free_mem, Type::LOAD); + } // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { + if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) != 0) { + return TUniqueId(); + } auto queryid = split(label, "#Id=")[1]; TUniqueId querytid; parse_id(queryid, &querytid); @@ -160,12 +168,14 @@ public: static std::string process_mem_log_str() { return fmt::format( - "physical memory {}, process memory used {} limit {}, sys mem available {} low " - "water mark {}, refresh interval memory growth {} B", + "OS physical memory {}. Process memory usage {}, limit {}, soft limit {}. Sys " + "available memory {}, low water mark {}, warning water mark {}. Refresh interval " + "memory growth {} B", PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::sys_mem_available_str(), + MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(), PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES), MemInfo::refresh_interval_memory_growth); } @@ -265,7 +275,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms return false; } - if (_limit < 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) { + if (_limit < 0 || (is_overcommit_tracker() && config::enable_query_memroy_overcommit)) { _consumption->add(bytes); // No limit at this tracker. } else { if (!_consumption->try_add(bytes, _limit)) { @@ -282,7 +292,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { if (sys_mem_exceed_limit_check(bytes)) { return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes)); } - if (bytes <= 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) { + if (bytes <= 0 || (is_overcommit_tracker() && config::enable_query_memroy_overcommit)) { return Status::OK(); } if (_limit > 0 && _consumption->current_value() + bytes > _limit) { diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index b59cba4896..020b6ae9e4 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -102,7 +102,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, _runtime_state->set_tracer(std::move(tracer)); SCOPED_ATTACH_TASK(_runtime_state.get()); - _runtime_state->init_scanner_mem_trackers(); _runtime_state->runtime_filter_mgr()->init(); _runtime_state->set_be_number(request.backend_num); if (request.__isset.backend_id) { diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index ce851c119a..a40a2ecc02 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -45,7 +45,8 @@ RuntimeFilterMgr::~RuntimeFilterMgr() {} Status RuntimeFilterMgr::init() { DCHECK(_state->query_mem_tracker() != nullptr); - _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr"); + _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr", + ExecEnv::GetInstance()->experimental_mem_tracker()); return Status::OK(); } @@ -161,7 +162,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag const TQueryOptions& query_options) { _query_id = query_id; _fragment_instance_id = fragment_instance_id; - _mem_tracker = std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity"); + _mem_tracker = std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity", + ExecEnv::GetInstance()->experimental_mem_tracker()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { int filter_id = filterid_to_desc.first; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index b18671a9b8..f61db045a8 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -216,8 +216,6 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { _query_mem_tracker = std::make_shared<MemTrackerLimiter>( MemTrackerLimiter::Type::QUERY, fmt::format("TestQuery#Id={}", print_id(query_id))); - _scanner_mem_tracker = - std::make_shared<MemTracker>(fmt::format("TestScanner#QueryId={}", print_id(query_id))); return Status::OK(); } @@ -281,7 +279,8 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) { Status RuntimeState::check_query_state(const std::string& msg) { // TODO: it would be nice if this also checked for cancellation, but doing so breaks // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached. - if (thread_context()->thread_mem_tracker()->limit_exceeded()) { + if (thread_context()->thread_mem_tracker()->limit_exceeded() && + !config::enable_query_memroy_overcommit) { RETURN_LIMIT_EXCEEDED(this, msg); } return query_status(); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f68de8d714..ca6dc42c4e 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -76,11 +76,6 @@ public: Status init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env); - // after SCOPED_ATTACH_TASK; - void init_scanner_mem_trackers() { - _scanner_mem_tracker = std::make_shared<MemTracker>( - fmt::format("Scanner#QueryId={}", print_id(_query_id))); - } // for ut and non-query. Status init_mem_trackers(const TUniqueId& query_id = TUniqueId()); @@ -115,7 +110,6 @@ public: const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } ExecEnv* exec_env() { return _exec_env; } std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return _query_mem_tracker; } - std::shared_ptr<MemTracker> scanner_mem_tracker() { return _scanner_mem_tracker; } ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; } void set_fragment_root_id(PlanNodeId id) { @@ -418,8 +412,6 @@ private: static const int DEFAULT_BATCH_SIZE = 2048; std::shared_ptr<MemTrackerLimiter> _query_mem_tracker; - // Count the memory consumption of Scanner - std::shared_ptr<MemTracker> _scanner_mem_tracker; // put runtime state before _obj_pool, so that it will be deconstructed after // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index cccf77c060..b7e68674e8 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -47,6 +47,7 @@ int64_t MemInfo::_s_physical_mem = -1; int64_t MemInfo::_s_mem_limit = -1; std::string MemInfo::_s_mem_limit_str = ""; int64_t MemInfo::_s_soft_mem_limit = -1; +std::string MemInfo::_s_soft_mem_limit_str = ""; int64_t MemInfo::_s_allocator_cache_mem = 0; std::string MemInfo::_s_allocator_cache_mem_str = ""; @@ -89,44 +90,66 @@ void MemInfo::refresh_allocator_mem() { #endif } -void MemInfo::process_minor_gc() { +void MemInfo::process_cache_gc(int64_t& freed_mem) { // TODO, free more cache, and should free a certain percentage of capacity, not all. + freed_mem += ChunkAllocator::instance()->mem_consumption(); + ChunkAllocator::instance()->clear(); + freed_mem += + StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); + StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); +} + +// step1: free all cache +// step2: free top overcommit query, if enable query memroy overcommit +bool MemInfo::process_minor_gc() { int64_t freed_mem = 0; Defer defer {[&]() { LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", freed_mem); }}; - freed_mem += ChunkAllocator::instance()->mem_consumption(); - ChunkAllocator::instance()->clear(); + MemInfo::process_cache_gc(freed_mem); if (freed_mem > _s_process_minor_gc_size) { - return; + return true; } - freed_mem += - StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); - StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); if (config::enable_query_memroy_overcommit) { freed_mem += MemTrackerLimiter::free_top_overcommit_query(_s_process_minor_gc_size - freed_mem); } + if (freed_mem > _s_process_minor_gc_size) { + return true; + } + return false; } -void MemInfo::process_full_gc() { +// step1: free all cache +// step2: free top memory query +// step3: free top overcommit load, load retries are more expensive, So cancel at the end. +// step4: free top memory load +bool MemInfo::process_full_gc() { int64_t freed_mem = 0; Defer defer { [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes", freed_mem); }}; - freed_mem += - StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); - StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); + MemInfo::process_cache_gc(freed_mem); if (freed_mem > _s_process_full_gc_size) { - return; + return true; } - freed_mem += ChunkAllocator::instance()->mem_consumption(); - ChunkAllocator::instance()->clear(); + freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem); if (freed_mem > _s_process_full_gc_size) { - return; + return true; } - freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem); + if (config::enable_query_memroy_overcommit) { + freed_mem += + MemTrackerLimiter::free_top_overcommit_load(_s_process_full_gc_size - freed_mem); + if (freed_mem > _s_process_full_gc_size) { + return true; + } + } + freed_mem += MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem); + if (freed_mem > _s_process_full_gc_size) { + return true; + } + return false; } #ifndef __APPLE__ @@ -186,6 +209,7 @@ void MemInfo::init() { } _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; + _s_soft_mem_limit_str = PrettyPrinter::print(_s_soft_mem_limit, TUnit::BYTES); _s_process_minor_gc_size = ParseUtil::parse_mem_spec(config::process_minor_gc_size, -1, _s_mem_limit, &is_percent); @@ -222,7 +246,7 @@ void MemInfo::init() { config::max_sys_mem_available_low_water_mark_bytes); int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0); _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0); - _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1 * 2; + _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1; LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", Mem Limit: " << _s_mem_limit_str @@ -247,6 +271,7 @@ void MemInfo::init() { _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent); _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; + _s_soft_mem_limit_str = PrettyPrinter::print(_s_soft_mem_limit, TUnit::BYTES); LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES); _s_initialized = true; diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index bd76c6124c..2cb17eb6b8 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -111,11 +111,16 @@ public: DCHECK(_s_initialized); return _s_soft_mem_limit; } + static inline std::string soft_mem_limit_str() { + DCHECK(_s_initialized); + return _s_soft_mem_limit_str; + } static std::string debug_string(); - static void process_minor_gc(); - static void process_full_gc(); + static void process_cache_gc(int64_t& freed_mem); + static bool process_minor_gc(); + static bool process_full_gc(); // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, // avoid multiple threads starting at the same time and causing OOM. @@ -127,6 +132,7 @@ private: static int64_t _s_mem_limit; static std::string _s_mem_limit_str; static int64_t _s_soft_mem_limit; + static std::string _s_soft_mem_limit_str; static int64_t _s_allocator_cache_mem; static std::string _s_allocator_cache_mem_str; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 8236f61fb6..9f967a8190 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -348,7 +348,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // Build phase _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); runtime_profile()->add_child(_build_phase_profile, false, nullptr); @@ -635,7 +634,6 @@ Status HashJoinNode::open(RuntimeState* state) { RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->open(state)); } RETURN_IF_ERROR(VJoinNodeBase::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); return Status::OK(); } diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 9befa4f4bb..528854537b 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -171,7 +171,6 @@ Status VJoinNodeBase::init(const TPlanNode& tnode, RuntimeState* state) { Status VJoinNodeBase::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::open"); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); std::promise<Status> thread_status; @@ -209,7 +208,6 @@ void VJoinNodeBase::_reset_tuple_is_null_column() { void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::_hash_table_build_thread"); SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); status->set_value(child(0)->open(state)); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 7c3f29c854..ce6ff568d2 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -104,7 +104,6 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VNestedLoopJoinNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); @@ -206,7 +205,6 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_TIMER(_probe_timer); RETURN_IF_CANCELLED(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (_is_output_left_side_only) { RETURN_IF_ERROR(get_left_side(state, &_left_block)); @@ -708,7 +706,6 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) { RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->open(state)); } RETURN_IF_ERROR(VJoinNodeBase::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); // We can close the right child to release its resources because its input has been // fully consumed. diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index bd6f676fe5..0ca61e0652 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -76,7 +76,6 @@ Status NewEsScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status NewEsScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (_tuple_desc == nullptr) { diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp index eaa511f40d..19fd93b9d3 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -37,7 +37,6 @@ std::string NewJdbcScanNode::get_name() { Status NewJdbcScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << "VNewJdbcScanNode::Prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp index 1a7296a866..dbbf57a120 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -38,7 +38,6 @@ std::string NewOdbcScanNode::get_name() { Status NewOdbcScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index add047fc6a..c113a5e6f2 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -45,7 +45,6 @@ Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) { Status NewOlapScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 63c8fa1355..c645cbc62d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -191,7 +191,6 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScanner* scanner) { SCOPED_ATTACH_TASK(scanner->runtime_state()); - SCOPED_CONSUME_MEM_TRACKER(scanner->runtime_state()->scanner_mem_tracker()); Thread::set_self_name("_scanner_scan"); scanner->update_wait_worker_timer(); scanner->start_scan_cpu_timer(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index e0ba630f67..57178d4d92 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -71,8 +71,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - RETURN_IF_ERROR(_init_profile()); // init profile for runtime filter @@ -89,9 +87,7 @@ Status VScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - RETURN_IF_ERROR(_acquire_runtime_filter()); RETURN_IF_ERROR(_process_conjuncts()); if (_eos) { return Status::OK(); @@ -112,7 +108,6 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VScanNode::get_next"); SCOPED_TIMER(_get_next_timer); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (state->is_cancelled()) { _scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled")); return _scanner_ctx->status(); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index e93fc89591..277f1f6691 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -292,7 +292,6 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs) Status AggregationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime"); _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime"); @@ -449,7 +448,6 @@ Status AggregationNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state)); @@ -493,7 +491,6 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "AggregationNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (_is_streaming_preagg) { RETURN_IF_CANCELLED(state); diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 55d98d47ab..64b6a2e7cb 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -149,7 +149,6 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VAnalyticEvalNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor)); _mem_pool.reset(new MemPool(mem_tracker())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); @@ -217,7 +216,6 @@ Status VAnalyticEvalNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(child(0)->open(state)); RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state)); @@ -255,7 +253,6 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VAnalyticEvalNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); if (_input_eos && _output_block_index == _input_blocks.size()) { diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index f164b5a93d..dfa82d893c 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -59,7 +59,6 @@ Status VBrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VBrokerScanNode::prepare(RuntimeState* state) { VLOG_QUERY << "VBrokerScanNode prepare"; RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // get tuple desc _runtime_state = state; _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -85,7 +84,6 @@ Status VBrokerScanNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBrokerScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(start_scanners()); @@ -109,7 +107,6 @@ Status VBrokerScanNode::start_scanners() { Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VBrokerScanNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // check if CANCELLED. if (state->is_cancelled()) { std::unique_lock<std::mutex> l(_batch_queue_lock); @@ -273,7 +270,6 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner void VBrokerScanNode::scanner_worker(int start_idx, int length) { START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VBrokerScanNode::scanner_worker"); SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); Thread::set_self_name("vbroker_scanner"); Status status = Status::OK(); ScannerCounter counter; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index c28ca147f8..f595c6fea3 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -50,7 +50,6 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VExchangeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); DCHECK_GT(_num_senders, 0); _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr()); _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( @@ -67,7 +66,6 @@ Status VExchangeNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExchangeNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (_is_merging) { RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); @@ -85,7 +83,6 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next"); SCOPED_TIMER(runtime_profile()->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); auto status = _stream_recvr->get_next(block, eos); if (block != nullptr) { if (!_is_merging) { diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp index d8148d4b9d..fb350fb477 100644 --- a/be/src/vec/exec/vmysql_scan_node.cpp +++ b/be/src/vec/exec/vmysql_scan_node.cpp @@ -50,7 +50,6 @@ Status VMysqlScanNode::prepare(RuntimeState* state) { } RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -104,7 +103,6 @@ Status VMysqlScanNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); VLOG_CRITICAL << "MysqlScanNode::Open"; if (!_is_init) { diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index c0b1491c50..31d3fc60ef 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -123,7 +123,6 @@ Status VSchemaScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (_scanner_param.user) { TSetSessionParams param; @@ -146,7 +145,6 @@ Status VSchemaScanNode::prepare(RuntimeState* state) { } START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSchemaScanNode::prepare"); RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // new one mem pool _tuple_pool.reset(new (std::nothrow) MemPool()); diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 1324c5f7d5..8ce115813c 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -123,7 +123,6 @@ Status VSetOperationNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); // open result expr lists. for (const std::vector<VExprContext*>& exprs : _child_expr_lists) { RETURN_IF_ERROR(VExpr::open(exprs, state)); @@ -135,7 +134,6 @@ Status VSetOperationNode::open(RuntimeState* state) { Status VSetOperationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 37e120b015..f2462efae4 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -68,7 +68,6 @@ Status VSortNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : "true"); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); return Status::OK(); } @@ -77,7 +76,6 @@ Status VSortNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSortNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("vsort, while open.")); @@ -113,7 +111,6 @@ Status VSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSortNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(_sorter->get_next(state, block, eos)); reached_limit(block, eos); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 4c6c3a57bc..c8c2f5ecd4 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -270,8 +270,9 @@ VDataStreamRecvr::VDataStreamRecvr( _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { // DataStreamRecvr may be destructed after the instance execution thread ends. - _mem_tracker = std::make_unique<MemTracker>( - "VDataStreamRecvr:" + print_id(_fragment_instance_id), _profile); + _mem_tracker = + std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id), + _profile, nullptr, "PeakMemoryUsage"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); // Create one queue per sender if is_merging is true. diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 9ab836252d..0bf10c4fd2 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -425,7 +425,8 @@ Status VDataStreamSender::prepare(RuntimeState* state) { _profile = _pool->add(new RuntimeProfile(title)); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker = std::make_unique<MemTracker>( - "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile); + "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile, nullptr, + "PeakMemoryUsage"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 99a338a5aa..cd6499831f 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -28,10 +28,7 @@ #include "util/mem_info.h" int main(int argc, char** argv) { - std::shared_ptr<doris::MemTrackerLimiter> orphan_mem_tracker = - std::make_shared<doris::MemTrackerLimiter>(doris::MemTrackerLimiter::Type::GLOBAL, - "Orphan"); - doris::ExecEnv::GetInstance()->set_orphan_mem_tracker(orphan_mem_tracker); + doris::ExecEnv::GetInstance()->init_mem_tracker(); doris::thread_context()->thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org