This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 32a029d9dc [enhancement](memtracker) Refactor load channel + memtable mem tracker (#13795) 32a029d9dc is described below commit 32a029d9dcd18828ecb9f7da261abe8de10a7bae Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Nov 3 09:47:12 2022 +0800 [enhancement](memtracker) Refactor load channel + memtable mem tracker (#13795) --- be/src/common/config.h | 12 +----- be/src/exec/blocking_join_node.cpp | 2 +- be/src/exec/broker_scan_node.cpp | 2 +- be/src/exec/es_http_scan_node.cpp | 2 +- be/src/exec/except_node.cpp | 2 +- be/src/exec/exec_node.cpp | 2 +- be/src/exec/exec_node.h | 3 +- be/src/exec/hash_join_node.cpp | 2 +- be/src/exec/intersect_node.cpp | 2 +- be/src/exec/olap_scan_node.cpp | 9 ++--- be/src/exec/olap_scan_node.h | 2 +- be/src/exec/olap_scanner.cpp | 8 ++-- be/src/exec/olap_scanner.h | 5 ++- be/src/olap/delta_writer.cpp | 53 +++++++++++++++---------- be/src/olap/delta_writer.h | 24 +++++------ be/src/olap/memtable.cpp | 38 +++++++++--------- be/src/olap/memtable.h | 25 +++++++----- be/src/olap/tablet_manager.cpp | 2 - be/src/runtime/load_channel.cpp | 12 +++--- be/src/runtime/load_channel.h | 29 ++++++++++---- be/src/runtime/load_channel_mgr.cpp | 21 ++++------ be/src/runtime/load_channel_mgr.h | 32 ++++++++++----- be/src/runtime/memory/mem_tracker.h | 1 + be/src/runtime/memory/mem_tracker_limiter.h | 1 + be/src/runtime/memory/mem_tracker_task_pool.cpp | 2 +- be/src/runtime/memory/thread_mem_tracker_mgr.h | 1 - be/src/runtime/runtime_filter_mgr.cpp | 4 +- be/src/runtime/runtime_filter_mgr.h | 2 +- be/src/runtime/tablets_channel.cpp | 26 ++++++++---- be/src/runtime/tablets_channel.h | 11 ++--- be/src/runtime/thread_context.cpp | 16 ++++---- be/src/runtime/thread_context.h | 8 ++++ be/src/service/doris_main.cpp | 4 +- be/src/vec/exec/join/vhash_join_node.cpp | 2 +- be/src/vec/exec/vblocking_join_node.cpp | 2 +- be/src/vec/exec/vbroker_scan_node.cpp | 2 +- be/src/vec/exec/ves_http_scan_node.cpp | 2 +- be/src/vec/exec/volap_scan_node.cpp | 2 +- be/src/vec/exec/vschema_scan_node.cpp | 2 +- be/test/olap/delta_writer_test.cpp | 6 +-- 40 files changed, 215 insertions(+), 168 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index cf04e8d3d3..2d627682bb 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -631,16 +631,8 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16"); // Whether Hook TCmalloc new/delete, currently consume/release tls mem tracker in Hook. CONF_Bool(enable_tcmalloc_hook, "true"); -// If true, switch TLS MemTracker to count more detailed memory, -// including caches such as ExecNode operators and TabletManager. -// -// At present, there is a performance problem in the frequent switch thread mem tracker. -// This is because the mem tracker exists as a shared_ptr in the thread local. Each time it is switched, -// the atomic variable use_count in the shared_ptr of the current tracker will be -1, and the tracker to be -// replaced use_count +1, multi-threading Frequent changes to the same tracker shared_ptr are slow. -// TODO: 1. Reduce unnecessary thread mem tracker switches, -// 2. Consider using raw pointers for mem tracker in thread local -CONF_Bool(memory_verbose_track, "false"); +// Print more detailed logs, more detailed records, etc. +CONF_Bool(memory_debug, "false"); // The minimum length when TCMalloc Hook consumes/releases MemTracker, consume size // smaller than this value will continue to accumulate. specified as number of bytes. diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index 5e5aee3714..0d366ab1d4 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -93,7 +93,7 @@ Status BlockingJoinNode::close(RuntimeState* state) { void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); status->set_value(construct_build_side(state)); } diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 9cff2da4ff..a281d29c6e 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -376,7 +376,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, void BrokerScanNode::scanner_worker(int start_idx, int length) { SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); // Clone expr context std::vector<ExprContext*> scanner_expr_ctxs; auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs); diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index 28205290cc..38de757f87 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -424,7 +424,7 @@ static std::string get_host_port(const std::vector<TNetworkAddress>& es_hosts) { void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) { SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); // Clone expr context std::vector<ExprContext*> scanner_expr_ctxs; DCHECK(start_idx < length); diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 8ad7ce9044..f12f6ca18a 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -39,8 +39,8 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { } Status ExceptNode::open(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_ERROR(SetOperationNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index be94dcbb28..fe0d6d1fe1 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -220,7 +220,7 @@ Status ExecNode::prepare(RuntimeState* state) { std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(), + _mem_tracker = std::make_shared<MemTracker>("ExecNode:" + _runtime_profile->name(), _runtime_profile.get()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 20e0785d94..296726b924 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -194,6 +194,7 @@ public: RuntimeProfile::Counter* memory_used_counter() const { return _memory_used_counter; } MemTracker* mem_tracker() const { return _mem_tracker.get(); } + std::shared_ptr<MemTracker> mem_tracker_shared() const { return _mem_tracker; } OpentelemetrySpan get_next_span() { return _get_next_span; } @@ -298,7 +299,7 @@ protected: std::unique_ptr<RuntimeProfile> _runtime_profile; /// Account for peak memory used by this node - std::unique_ptr<MemTracker> _mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; RuntimeProfile::Counter* _rows_returned_counter; RuntimeProfile::Counter* _rows_returned_rate; diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 7e9a2bf989..592c93ee7e 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -184,7 +184,7 @@ Status HashJoinNode::close(RuntimeState* state) { void HashJoinNode::probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); status->set_value(child(0)->open(state)); } diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index 9bf02cedd4..d04ddc5061 100644 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -43,8 +43,8 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { // 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table // repeat [2] this for all the rest child Status IntersectNode::open(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_ERROR(SetOperationNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index ef4f2236c5..1dcd5c3754 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -191,7 +191,7 @@ Status OlapScanNode::prepare(RuntimeState* state) { _init_counter(state); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); - _scanner_mem_tracker = std::make_unique<MemTracker>("Scanners"); + _scanner_mem_tracker = std::make_shared<MemTracker>("Scanners"); if (_tuple_desc == nullptr) { // TODO: make sure we print all available diagnostic output to our error log @@ -941,7 +941,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { } OlapScanner* scanner = new OlapScanner(state, this, _olap_scan_node.is_preaggregation, - _need_agg_finalize, *scan_range, _scanner_mem_tracker.get()); + _need_agg_finalize, *scan_range, _scanner_mem_tracker); scanner->set_batch_size(_batch_size); // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. @@ -1483,7 +1483,7 @@ Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) { void OlapScanNode::transfer_thread(RuntimeState* state) { // scanner open pushdown to scanThread SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); Status status = Status::OK(); for (auto scanner : _olap_scanners) { status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs()); @@ -1660,8 +1660,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } void OlapScanNode::scanner_thread(OlapScanner* scanner) { - // SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); Thread::set_self_name("olap_scanner"); if (UNLIKELY(_transfer_done)) { _scanner_done = true; diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index f5d6e38cfd..47cf6cd01c 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -264,7 +264,7 @@ protected: int64_t _buffered_bytes; // Count the memory consumption of Rowset Reader and Tablet Reader in OlapScanner. - std::unique_ptr<MemTracker> _scanner_mem_tracker; + std::shared_ptr<MemTracker> _scanner_mem_tracker; EvalConjunctsFn _eval_conjuncts_fn; // the max num of scan keys of this scan request. diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 6f348d02a1..e90bc2124c 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -43,7 +43,7 @@ namespace doris { OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool aggregation, bool need_agg_finalize, const TPaloScanRange& scan_range, - MemTracker* tracker) + const std::shared_ptr<MemTracker>& tracker) : _runtime_state(runtime_state), _parent(parent), _tuple_desc(parent->_tuple_desc), @@ -51,8 +51,8 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool _is_open(false), _aggregation(aggregation), _need_agg_finalize(need_agg_finalize), - _version(-1) { - _mem_tracker = tracker; + _version(-1), + _mem_tracker(tracker) { _tablet_schema = std::make_shared<TabletSchema>(); } @@ -326,7 +326,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { bzero(tuple_buf, _batch_size * _tuple_desc->byte_size()); Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf); - std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker)); + std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get())); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index 134a515219..961f80c9d0 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -42,7 +42,8 @@ class OlapScanNode; class OlapScanner { public: OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool aggregation, - bool need_agg_finalize, const TPaloScanRange& scan_range, MemTracker* tracker); + bool need_agg_finalize, const TPaloScanRange& scan_range, + const std::shared_ptr<MemTracker>& tracker); virtual ~OlapScanner() = default; @@ -151,7 +152,7 @@ protected: MonotonicStopWatch _watcher; - MemTracker* _mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; TabletSchemaSPtr _tablet_schema; }; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index c8a36fbec7..39185cdbac 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -33,14 +33,14 @@ namespace doris { -Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, - const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec) { - *writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, is_vec); +Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, const UniqueId& load_id, + bool is_vec) { + *writer = new DeltaWriter(req, StorageEngine::instance(), load_id, is_vec); return Status::OK(); } -DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, - const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec) +DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id, + bool is_vec) : _req(*req), _tablet(nullptr), _cur_rowset(nullptr), @@ -48,7 +48,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, _tablet_schema(new TabletSchema), _delta_written_success(false), _storage_engine(storage_engine), - _parent_tracker(parent_tracker), + _load_id(load_id), _is_vec(is_vec) {} DeltaWriter::~DeltaWriter() { @@ -109,8 +109,6 @@ Status DeltaWriter::init() { _rowset_ids = _tablet->all_rs_id(_cur_max_version); } - _mem_tracker = std::make_shared<MemTrackerLimiter>( - -1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), _parent_tracker); // check tablet version number if (_tablet->version_count() > config::max_tablet_version_num) { //trigger quick compaction @@ -164,8 +162,6 @@ Status DeltaWriter::write(Tuple* tuple) { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); - _mem_table->insert(tuple); // if memtable is full, push it to the flush executor, @@ -281,9 +277,21 @@ void DeltaWriter::_reset_mem_table() { if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap == nullptr) { _delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id())); } + auto mem_table_insert_tracker = std::make_shared<MemTracker>( + fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", + std::to_string(tablet_id()), _mem_table_num, _load_id.to_string())); + auto mem_table_flush_tracker = std::make_shared<MemTracker>( + fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", + std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string())); + { + std::lock_guard<SpinLock> l(_mem_table_tracker_lock); + _mem_table_tracker.push_back(mem_table_insert_tracker); + _mem_table_tracker.push_back(mem_table_flush_tracker); + } _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, _req.tuple_desc, _rowset_writer.get(), _delete_bitmap, - _rowset_ids, _cur_max_version, _mem_tracker, _is_vec)); + _rowset_ids, _cur_max_version, mem_table_insert_tracker, + mem_table_flush_tracker, _is_vec)); } Status DeltaWriter::close() { @@ -384,8 +392,9 @@ Status DeltaWriter::cancel() { } void DeltaWriter::save_mem_consumption_snapshot() { + std::lock_guard<std::mutex> l(_lock); _mem_consumption_snapshot = mem_consumption(); - _memtable_consumption_snapshot = memtable_consumption(); + _memtable_consumption_snapshot = _mem_table->memory_usage(); } int64_t DeltaWriter::get_memtable_consumption_inflush() const { @@ -397,20 +406,20 @@ int64_t DeltaWriter::get_memtable_consumption_snapshot() const { return _memtable_consumption_snapshot; } -int64_t DeltaWriter::mem_consumption() const { - if (_mem_tracker == nullptr) { +int64_t DeltaWriter::mem_consumption() { + if (_flush_token == nullptr) { // This method may be called before this writer is initialized. - // So _mem_tracker may be null. + // So _flush_token may be null. return 0; } - return _mem_tracker->consumption(); -} - -int64_t DeltaWriter::memtable_consumption() const { - if (_mem_table == nullptr) { - return 0; + int64_t mem_usage = 0; + { + std::lock_guard<SpinLock> l(_mem_table_tracker_lock); + for (auto mem_table_tracker : _mem_table_tracker) { + mem_usage += mem_table_tracker->consumption(); + } } - return _mem_table->mem_tracker_hook()->consumption(); + return mem_usage; } int64_t DeltaWriter::partition_id() const { diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index a7770326fd..85133e456c 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -20,6 +20,7 @@ #include "gen_cpp/internal_service.pb.h" #include "olap/rowset/rowset_writer.h" #include "olap/tablet.h" +#include "util/spinlock.h" namespace doris { @@ -56,9 +57,7 @@ struct WriteRequest { class DeltaWriter { public: static Status open(WriteRequest* req, DeltaWriter** writer, - const std::shared_ptr<MemTrackerLimiter>& parent_tracker = - std::shared_ptr<MemTrackerLimiter>(), - bool is_vec = false); + const UniqueId& load_id = TUniqueId(), bool is_vec = false); ~DeltaWriter(); @@ -93,7 +92,7 @@ public: int64_t partition_id() const; - int64_t mem_consumption() const; + int64_t mem_consumption(); // Wait all memtable in flush queue to be flushed Status wait_flush(); @@ -102,8 +101,6 @@ public: int32_t schema_hash() { return _tablet->schema_hash(); } - int64_t memtable_consumption() const; - void save_mem_consumption_snapshot(); int64_t get_memtable_consumption_inflush() const; @@ -113,8 +110,8 @@ public: void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); private: - DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, - const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec); + DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id, + bool is_vec); // push a full memtable to flush executor Status _flush_memtable_async(); @@ -146,14 +143,11 @@ private: bool _delta_written_success; StorageEngine* _storage_engine; + UniqueId _load_id; std::unique_ptr<FlushToken> _flush_token; - // The memory value automatically tracked by the Tcmalloc hook is 20% less than the manually recorded - // value in the memtable, because some freed memory is not allocated in the DeltaWriter. - // The memory value automatically tracked by the Tcmalloc hook, used for load channel mgr to trigger - // flush memtable when the sum of all channel memory exceeds the limit. - // The manually recorded value of memtable is used to flush when it is larger than write_buffer_size. - std::shared_ptr<MemTrackerLimiter> _mem_tracker; - std::shared_ptr<MemTrackerLimiter> _parent_tracker; + std::vector<std::shared_ptr<MemTracker>> _mem_table_tracker; + SpinLock _mem_table_tracker_lock; + std::atomic<uint32_t> _mem_table_num = 1; // The counter of number of segment flushed already. int64_t _segment_counter = 0; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 5eed903d29..f61e945bda 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -35,14 +35,14 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, RowsetWriter* rowset_writer, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, int64_t cur_max_version, - const std::shared_ptr<MemTrackerLimiter>& tracker, bool support_vec) + const std::shared_ptr<MemTracker>& insert_mem_tracker, + const std::shared_ptr<MemTracker>& flush_mem_tracker, bool support_vec) : _tablet(std::move(tablet)), _schema(schema), _tablet_schema(tablet_schema), _slot_descs(slot_descs), - _mem_tracker_hook(std::make_shared<MemTrackerLimiter>( - -1, fmt::format("MemTableHook:tabletId={}", std::to_string(tablet_id())), - tracker)), + _insert_mem_tracker(insert_mem_tracker), + _flush_mem_tracker(flush_mem_tracker), _schema_size(_schema->schema_size()), _rowset_writer(rowset_writer), _is_first_insertion(true), @@ -53,12 +53,10 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t _delete_bitmap(delete_bitmap), _rowset_ids(rowset_ids), _cur_max_version(cur_max_version) { - _mem_tracker_hook->enable_reset_zero(); - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); - _mem_tracker_manual = std::make_unique<MemTracker>( - fmt::format("MemTableManual:tabletId={}", std::to_string(tablet_id()))); - _buffer_mem_pool = std::make_unique<MemPool>(_mem_tracker_manual.get()); - _table_mem_pool = std::make_unique<MemPool>(_mem_tracker_manual.get()); + _insert_mem_tracker_use_hook = std::make_unique<MemTracker>( + fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id()))); + _buffer_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get()); + _table_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get()); if (support_vec) { _skip_list = nullptr; _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema); @@ -153,12 +151,14 @@ MemTable::~MemTable() { } } std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>()); - _mem_tracker_manual->release(_mem_usage); + _insert_mem_tracker->release(_mem_usage); _buffer_mem_pool->free_all(); _table_mem_pool->free_all(); - DCHECK_EQ(_mem_tracker_manual->consumption(), 0) + _flush_mem_tracker->set_consumption(0); + DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl - << MemTracker::log_usage(_mem_tracker_manual->make_snapshot(0)); + << MemTracker::log_usage(_insert_mem_tracker->make_snapshot(0)); + DCHECK_EQ(_flush_mem_tracker->consumption(), 0); } MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {} @@ -176,7 +176,7 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, } void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs) { - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); auto target_block = input_block->copy_block(_column_offset); if (_is_first_insertion) { _is_first_insertion = false; @@ -193,8 +193,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows); size_t input_size = target_block.allocated_bytes() * num_rows / target_block.rows(); _mem_usage += input_size; - _mem_tracker_manual->consume(input_size); - + _insert_mem_tracker->consume(input_size); for (int i = 0; i < num_rows; i++) { _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); _insert_one_row_from_block(_row_in_blocks.back()); @@ -374,7 +373,8 @@ void MemTable::_collect_vskiplist_results() { if constexpr (!is_final) { // if is not final, we collect the agg results to input_block and then continue to insert size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); - _mem_tracker_manual->consume(shrunked_after_agg - _mem_usage); + // flush will not run here, so will not duplicate `_flush_mem_tracker` + _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage); _mem_usage = shrunked_after_agg; _input_mutable_block.swap(_output_mutable_block); //TODO(weixang):opt here. @@ -392,7 +392,7 @@ void MemTable::_collect_vskiplist_results() { } void MemTable::shrink_memtable_by_agg() { - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); if (keys_type() == KeysType::DUP_KEYS) { return; } @@ -434,7 +434,7 @@ Status MemTable::_generate_delete_bitmap() { } Status MemTable::flush() { - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker.get()); VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id() << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 9d837b5560..52589f35cc 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -45,18 +45,17 @@ public: const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, RowsetWriter* rowset_writer, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, int64_t cur_max_version, - const std::shared_ptr<MemTrackerLimiter>& tracker, bool support_vec = false); + const std::shared_ptr<MemTracker>& insert_mem_tracker, + const std::shared_ptr<MemTracker>& flush_mem_tracker, bool support_vec = false); ~MemTable(); int64_t tablet_id() const { return _tablet->tablet_id(); } KeysType keys_type() const { return _tablet->keys_type(); } - std::shared_ptr<MemTrackerLimiter> mem_tracker_hook() const { return _mem_tracker_hook; } - size_t memory_usage() const { return _mem_tracker_manual->consumption(); } - - inline void insert(const Tuple* tuple) { - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); - (this->*_insert_fn)(tuple); + size_t memory_usage() const { + return _insert_mem_tracker->consumption() + _flush_mem_tracker->consumption(); } + + inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); } // insert tuple from (row_pos) to (row_pos+num_rows) void insert(const vectorized::Block* block, const std::vector<int>& row_idxs); @@ -161,8 +160,16 @@ private: std::shared_ptr<RowInBlockComparator> _vec_row_comparator; - std::unique_ptr<MemTracker> _mem_tracker_manual; - std::shared_ptr<MemTrackerLimiter> _mem_tracker_hook; + // `_insert_manual_mem_tracker` manually records the memory value of memtable insert() + // `_flush_hook_mem_tracker` automatically records the memory value of memtable flush() through mem hook. + // Is used to flush when _insert_manual_mem_tracker larger than write_buffer_size and run flush memtable + // when the sum of all memtable (_insert_manual_mem_tracker + _flush_hook_mem_tracker) exceeds the limit. + std::shared_ptr<MemTracker> _insert_mem_tracker; + std::shared_ptr<MemTracker> _flush_mem_tracker; + // It is only used for verification when the value of `_insert_manual_mem_tracker` is suspected to be wrong. + // The memory value automatically tracked by the mem hook is 20% less than the manually recorded + // value in the memtable, because some freed memory is not allocated in the DeltaWriter. + std::unique_ptr<MemTracker> _insert_mem_tracker_use_hook; // This is a buffer, to hold the memory referenced by the rows that have not // been inserted into the SkipList std::unique_ptr<MemPool> _buffer_mem_pool; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 7cc81b782e..7580b1aed3 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -722,7 +722,6 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_ TSchemaHash schema_hash, const string& meta_binary, bool update_meta, bool force, bool restore, bool check_path) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); TabletMetaSharedPtr tablet_meta(new TabletMeta()); Status status = tablet_meta->deserialize(meta_binary); if (!status.ok()) { @@ -805,7 +804,6 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, SchemaHash schema_hash, const string& schema_hash_path, bool force, bool restore) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); LOG(INFO) << "begin to load tablet from dir. " << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash << " path = " << schema_hash_path << " force = " << force << " restore = " << restore; diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index c989f11336..ba15c6c7b4 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -25,11 +25,11 @@ namespace doris { -LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>& mem_tracker, +LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> mem_tracker, int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, bool is_vec) : _load_id(load_id), - _mem_tracker(mem_tracker), + _mem_tracker(std::move(mem_tracker)), _timeout_s(timeout_s), _is_high_priority(is_high_priority), _sender_ip(sender_ip), @@ -38,7 +38,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimi // _load_channels in load_channel_mgr, or it may be erased // immediately by gc thread. _last_updated_time.store(time(nullptr)); - _mem_tracker->enable_reset_zero(); } LoadChannel::~LoadChannel() { @@ -59,8 +58,11 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } else { // create a new tablets channel TabletsChannelKey key(params.id(), index_id); - channel.reset(new TabletsChannel(key, _mem_tracker, _is_high_priority, _is_vec)); - _tablets_channels.insert({index_id, channel}); + channel.reset(new TabletsChannel(key, _load_id, _is_high_priority, _is_vec)); + { + std::lock_guard<SpinLock> l(_tablets_channels_lock); + _tablets_channels.insert({index_id, channel}); + } } } diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index a75639e433..5af420a970 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -39,9 +39,8 @@ class Cache; // corresponding to a certain load job class LoadChannel { public: - LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>& mem_tracker, - int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, - bool is_vec); + LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> mem_tracker, int64_t timeout_s, + bool is_high_priority, const std::string& sender_ip, bool is_vec); ~LoadChannel(); // open a new load channel if not exist @@ -67,7 +66,17 @@ public: template <typename TabletWriterAddResult> Status handle_mem_exceed_limit(TabletWriterAddResult* response); - int64_t mem_consumption() const { return _mem_tracker->consumption(); } + int64_t mem_consumption() { + int64_t mem_usage = 0; + { + std::lock_guard<SpinLock> l(_tablets_channels_lock); + for (auto& it : _tablets_channels) { + mem_usage += it.second->mem_consumption(); + } + } + _mem_tracker->set_consumption(mem_usage); + return mem_usage; + } int64_t timeout() const { return _timeout_s; } @@ -89,7 +98,10 @@ protected: request.write_single_replica())); if (finished) { std::lock_guard<std::mutex> l(_lock); - _tablets_channels.erase(index_id); + { + std::lock_guard<SpinLock> l(_tablets_channels_lock); + _tablets_channels.erase(index_id); + } _finished_channel_ids.emplace(index_id); } return Status::OK(); @@ -102,12 +114,13 @@ private: UniqueId _load_id; // Tracks the total memory consumed by current load job on this BE - std::shared_ptr<MemTrackerLimiter> _mem_tracker; + std::unique_ptr<MemTracker> _mem_tracker; // lock protect the tablets channel map std::mutex _lock; // index id -> tablets channel std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> _tablets_channels; + SpinLock _tablets_channels_lock; // This is to save finished channels id, to handle the retry request. std::unordered_set<int64_t> _finished_channel_ids; // set to true if at least one tablets channel has been opened @@ -163,7 +176,7 @@ Status LoadChannel::add_batch(const TabletWriterAddRequest& request, return st; } -inline std::ostream& operator<<(std::ostream& os, const LoadChannel& load_channel) { +inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption() << ", last_update_time=" << static_cast<uint64_t>(load_channel.last_updated_time()) << ", is high priority: " << load_channel.is_high_priority() << ")"; @@ -176,7 +189,7 @@ Status LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) { std::shared_ptr<TabletsChannel> channel; { // lock so that only one thread can check mem limit - std::lock_guard<std::mutex> l(_lock); + std::lock_guard<SpinLock> l(_tablets_channels_lock); found = _find_largest_consumption_channel(&channel); } // Release lock so that other threads can still call add_batch concurrently. diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index e6f908f69c..8292b25656 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -68,11 +68,9 @@ LoadChannelMgr::~LoadChannelMgr() { } Status LoadChannelMgr::init(int64_t process_mem_limit) { - int64_t load_mgr_mem_limit = calc_process_max_load_memory(process_mem_limit); - _load_soft_mem_limit = load_mgr_mem_limit * config::load_process_soft_mem_limit_percent / 100; - _process_soft_mem_limit = - ExecEnv::GetInstance()->process_mem_tracker()->limit() * config::soft_mem_limit_frac; - _mem_tracker = std::make_shared<MemTrackerLimiter>(load_mgr_mem_limit, "LoadChannelMgr"); + _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); + _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; + _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr"); REGISTER_HOOK_METRIC(load_channel_mem_consumption, [this]() { return _mem_tracker->consumption(); }); _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024); @@ -96,13 +94,10 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority()); // Use the same mem limit as LoadChannelMgr for a single load channel - auto channel_mem_tracker = std::make_shared<MemTrackerLimiter>( - _mem_tracker->limit(), - fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(), - load_id.to_string()), - _mem_tracker); - channel.reset(new LoadChannel(load_id, channel_mem_tracker, channel_timeout_s, - is_high_priority, params.sender_ip(), + auto channel_mem_tracker = std::make_unique<MemTracker>(fmt::format( + "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string())); + channel.reset(new LoadChannel(load_id, std::move(channel_mem_tracker), + channel_timeout_s, is_high_priority, params.sender_ip(), params.is_vectorized())); _load_channels.insert({load_id, channel}); } @@ -200,7 +195,7 @@ Status LoadChannelMgr::_start_load_channels_clean() { // this log print every 1 min, so that we could observe the mem consumption of load process // on this Backend - LOG(INFO) << "load mem consumption(bytes). limit: " << _mem_tracker->limit() + LOG(INFO) << "load mem consumption(bytes). limit: " << _load_hard_mem_limit << ", current: " << _mem_tracker->consumption() << ", peak: " << _mem_tracker->peak_consumption(); diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 686322b076..4fd113cc77 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -28,6 +28,7 @@ #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "gutil/ref_counted.h" +#include "gutil/walltime.h" #include "olap/lru_cache.h" #include "runtime/load_channel.h" #include "runtime/tablets_channel.h" @@ -58,6 +59,14 @@ public: // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); + void refresh_mem_tracker() { + int64_t mem_usage = 0; + for (auto& kv : _load_channels) { + mem_usage += kv.second->mem_consumption(); + } + _mem_tracker->set_consumption(mem_usage); + } + private: template <typename Request> Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, @@ -80,9 +89,9 @@ protected: Cache* _last_success_channel = nullptr; // check the total load channel mem consumption of this Backend - std::shared_ptr<MemTrackerLimiter> _mem_tracker; + std::unique_ptr<MemTracker> _mem_tracker; + int64_t _load_hard_mem_limit = -1; int64_t _load_soft_mem_limit = -1; - int64_t _process_soft_mem_limit = -1; // If hard limit reached, one thread will trigger load channel flush, // other threads should wait on the condition variable. @@ -153,9 +162,9 @@ template <typename TabletWriterAddResult> Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); - DCHECK(_process_soft_mem_limit > 0); + int64_t process_mem_limit = MemInfo::mem_limit() * config::soft_mem_limit_frac; if (_mem_tracker->consumption() < _load_soft_mem_limit && - MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) { + MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { return Status::OK(); } // Pick load channel to reduce memory. @@ -163,14 +172,15 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) { std::unique_lock<std::mutex> l(_lock); while (_should_wait_flush) { - LOG(INFO) << "Reached the load hard limit " << _mem_tracker->limit() + LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit << ", waiting for flush"; _wait_flush_cond.wait(l); } // Some other thread is flushing data, and not reached hard limit now, // we don't need to handle mem limit in current thread. - if (_reduce_memory_channel != nullptr && !_mem_tracker->limit_exceeded() && - MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) { + if (_reduce_memory_channel != nullptr && + _mem_tracker->consumption() < _load_hard_mem_limit && + MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { return Status::OK(); } @@ -199,13 +209,13 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) _reduce_memory_channel = channel; std::ostringstream oss; - if (MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) { + if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { oss << "reducing memory of " << *channel << " because total load mem consumption " << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES) << " has exceeded"; - if (_mem_tracker->limit_exceeded()) { + if (_mem_tracker->consumption() > _load_hard_mem_limit) { _should_wait_flush = true; - oss << " hard limit: " << PrettyPrinter::print(_mem_tracker->limit(), TUnit::BYTES); + oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES); } else { oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES); } @@ -213,7 +223,7 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) _should_wait_flush = true; oss << "reducing memory of " << *channel << " because process memory used " << PerfCounters::get_vm_rss_str() << " has exceeded limit " - << PrettyPrinter::print(_process_soft_mem_limit, TUnit::BYTES) + << PrettyPrinter::print(process_mem_limit, TUnit::BYTES) << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str(); } LOG(INFO) << oss.str(); diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index c939d9a62d..01a0d58cdb 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -72,6 +72,7 @@ public: void release(int64_t bytes) { consume(-bytes); } // Transfer 'bytes' of consumption from this tracker to 'dst'. void transfer_to(MemTracker* dst, int64_t bytes); + void set_consumption(int64_t bytes) { _consumption->set(bytes); } public: bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < consumption(); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index f6440c0ace..6bc9449c20 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -82,6 +82,7 @@ public: return false; } + void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; } int64_t group_num() const { return _group_num; } bool has_limit() const { return _limit >= 0; } int64_t limit() const { return _limit; } diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 58d98278c6..28539703b4 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -108,7 +108,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { MemTracker::print_bytes(it->second->consumption()), MemTracker::print_bytes(it->second->peak_consumption())); expired_task_ids.emplace_back(it->first); - } else if (config::memory_verbose_track) { + } else if (config::memory_debug) { it->second->print_log_usage("query routine"); it->second->enable_print_log_usage(); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 5289db99e3..4661fbea5d 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -170,7 +170,6 @@ private: inline void ThreadMemTrackerMgr::init() { DCHECK(_limiter_tracker_stack.size() == 0); DCHECK(_limiter_tracker_raw == nullptr); - DCHECK(_consumer_tracker_stack.empty()); init_impl(); } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 9bc95e2de1..8087d6efc5 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -160,7 +160,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag const TQueryOptions& query_options) { _query_id = query_id; _fragment_instance_id = fragment_instance_id; - _mem_tracker = std::make_unique<MemTracker>("RuntimeFilterMergeControllerEntity"); + _mem_tracker = std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { int filter_id = filterid_to_desc.first; @@ -181,7 +181,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag // merge data Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data) { - // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); std::shared_ptr<RuntimeFilterCntlVal> cntVal; int merged_size = 0; { diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index e090be6f4f..63960f0182 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -147,7 +147,7 @@ private: UniqueId _fragment_instance_id; // protect _filter_map std::mutex _filter_map_mutex; - std::unique_ptr<MemTracker> _mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; // TODO: convert filter id to i32 // filter-id -> val std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 9023c100f1..584e101199 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -34,16 +34,14 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count; -TabletsChannel::TabletsChannel(const TabletsChannelKey& key, - const std::shared_ptr<MemTrackerLimiter>& parent_tracker, +TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority, bool is_vec) : _key(key), _state(kInitialized), + _load_id(load_id), _closed_senders(64), _is_high_priority(is_high_priority), _is_vec(is_vec) { - _mem_tracker = std::make_shared<MemTrackerLimiter>( - -1, fmt::format("TabletsChannel#indexID={}", key.index_id), parent_tracker); static std::once_flag once_flag; std::call_once(once_flag, [] { REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); }); @@ -194,6 +192,17 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, } } +int64_t TabletsChannel::mem_consumption() { + int64_t mem_usage = 0; + { + std::lock_guard<SpinLock> l(_tablet_writers_lock); + for (auto& it : _tablet_writers) { + mem_usage += it.second->mem_consumption(); + } + } + return mem_usage; +} + template <typename TabletWriterAddResult> Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { if (_try_to_wait_flushing()) { @@ -243,7 +252,7 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { // So here we only flush part of the tablet, and the next time the reduce memory operation is triggered, // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes. - int64_t mem_to_flushed = _mem_tracker->consumption() / 3; + int64_t mem_to_flushed = mem_consumption() / 3; if (total_memtable_consumption_in_flush < mem_to_flushed) { mem_to_flushed -= total_memtable_consumption_in_flush; int counter = 0; @@ -349,7 +358,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request wrequest.ptable_schema_param = request.schema(); DeltaWriter* writer = nullptr; - auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker, _is_vec); + auto st = DeltaWriter::open(&wrequest, &writer, _load_id, _is_vec); if (!st.ok()) { std::stringstream ss; ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() @@ -358,7 +367,10 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } - _tablet_writers.emplace(tablet.tablet_id(), writer); + { + std::lock_guard<SpinLock> l(_tablet_writers_lock); + _tablet_writers.emplace(tablet.tablet_id(), writer); + } } _s_tablet_writer_count += _tablet_writers.size(); DCHECK_EQ(_tablet_writers.size(), request.tablets_size()); diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index cf7e067b32..bfdbf0e8f7 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -61,8 +61,7 @@ class LoadChannel; // Write channel for a particular (load, index). class TabletsChannel { public: - TabletsChannel(const TabletsChannelKey& key, - const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_high_priority, + TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority, bool is_vec); ~TabletsChannel(); @@ -96,7 +95,7 @@ public: template <typename TabletWriterAddResult> Status reduce_mem_usage(TabletWriterAddResult* response); - int64_t mem_consumption() const { return _mem_tracker->consumption(); } + int64_t mem_consumption(); private: template <typename Request> @@ -119,6 +118,8 @@ private: // make execute sequence std::mutex _lock; + SpinLock _tablet_writers_lock; + enum State { kInitialized, kOpened, @@ -126,6 +127,8 @@ private: }; State _state; + UniqueId _load_id; + // initialized in open function int64_t _txn_id = -1; int64_t _index_id = -1; @@ -160,8 +163,6 @@ private: static std::atomic<uint64_t> _s_tablet_writer_count; - std::shared_ptr<MemTrackerLimiter> _mem_tracker; - bool _is_high_priority = false; bool _is_vec = false; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 1e6be4a0d9..babee85c3e 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -76,18 +76,20 @@ SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() { } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { - if (config::memory_verbose_track) { - thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); - } + thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); +} + +AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer( + const std::shared_ptr<MemTracker>& mem_tracker) + : _mem_tracker(mem_tracker) { + thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); } AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { - if (config::memory_verbose_track) { #ifndef NDEBUG - DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1); + DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1); #endif // NDEBUG - thread_context()->_thread_mem_tracker_mgr->pop_consumer_tracker(); - } + thread_context()->_thread_mem_tracker_mgr->pop_consumer_tracker(); } } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 861b3e0567..7d213bfa4d 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -39,6 +39,7 @@ // Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. // Usage example: std::unique_ptr<MemTracker> tracker = std::make_unique<MemTracker>("first_tracker"); // { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); xxx; xxx; } +// Usually used to record query more detailed memory, including ExecNode operators. #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) #else @@ -300,9 +301,16 @@ public: class AddThreadMemTrackerConsumer { public: + // The owner and user of MemTracker are in the same thread, and the raw pointer is faster. explicit AddThreadMemTrackerConsumer(MemTracker* mem_tracker); + // The owner and user of MemTracker are in different threads. + explicit AddThreadMemTrackerConsumer(const std::shared_ptr<MemTracker>& mem_tracker); + ~AddThreadMemTrackerConsumer(); + +private: + std::shared_ptr<MemTracker> _mem_tracker = nullptr; // Avoid mem_tracker being released midway. }; class StopCheckThreadMemTrackerLimit { diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index a48d3adb3e..b6a7e3a118 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -52,6 +52,7 @@ #include "olap/storage_engine.h" #include "runtime/exec_env.h" #include "runtime/heartbeat_flags.h" +#include "runtime/load_channel_mgr.h" #include "runtime/memory/mem_tracker_task_pool.h" #include "service/backend_options.h" #include "service/backend_service.h" @@ -510,6 +511,7 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->allocator_cache_mem_tracker()->consume( allocator_cache_mem_diff); CONSUME_THREAD_MEM_TRACKER(allocator_cache_mem_diff); + doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker(); // 1s clear the expired task mem tracker, a query mem tracker is about 57 bytes. // this will cause coredump for ASAN build when running regression test, @@ -518,7 +520,7 @@ int main(int argc, char** argv) { // The process tracker print log usage interval is 1s to avoid a large number of tasks being // canceled when the process exceeds the mem limit, resulting in too many duplicate logs. doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage(); - if (doris::config::memory_verbose_track) { + if (doris::config::memory_debug) { doris::ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage("main routine"); doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage(); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 91b5784da3..eb419eb52f 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -1268,7 +1268,7 @@ Status HashJoinNode::open(RuntimeState* state) { void HashJoinNode::_probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::_hash_table_build_thread"); SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); status->set_value(child(0)->open(state)); } diff --git a/be/src/vec/exec/vblocking_join_node.cpp b/be/src/vec/exec/vblocking_join_node.cpp index 809da414f4..f4b9258c92 100644 --- a/be/src/vec/exec/vblocking_join_node.cpp +++ b/be/src/vec/exec/vblocking_join_node.cpp @@ -75,7 +75,7 @@ Status VBlockingJoinNode::close(RuntimeState* state) { void VBlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); status->set_value(construct_build_side(state)); // Release the thread token as soon as possible (before the main thread joins // on it). This way, if we had a chain of 10 joins using 1 additional thread, diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index 4e1177f4bd..f164b5a93d 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -273,7 +273,7 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner void VBrokerScanNode::scanner_worker(int start_idx, int length) { START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VBrokerScanNode::scanner_worker"); SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); Thread::set_self_name("vbroker_scanner"); Status status = Status::OK(); ScannerCounter counter; diff --git a/be/src/vec/exec/ves_http_scan_node.cpp b/be/src/vec/exec/ves_http_scan_node.cpp index 9f308acc6b..d0704a2477 100644 --- a/be/src/vec/exec/ves_http_scan_node.cpp +++ b/be/src/vec/exec/ves_http_scan_node.cpp @@ -385,7 +385,7 @@ void VEsHttpScanNode::debug_string(int ident_level, std::stringstream* out) cons void VEsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) { START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VEsHttpScanNode::scanner_worker"); SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); // Clone expr context std::vector<ExprContext*> scanner_expr_ctxs; DCHECK(start_idx < length); diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 2fbd9bb99d..19ab63ae37 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -279,7 +279,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { // scanner open pushdown to scanThread START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::transfer_thread"); SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); Status status = Status::OK(); if (_vconjunct_ctx_ptr) { diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index cdc258fe99..dfb6811cbf 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -108,9 +108,9 @@ Status VSchemaScanNode::open(RuntimeState* state) { } SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (_scanner_param.user) { TSetSessionParams param; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 3c1e7f9924..ea2799fdea 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -398,7 +398,7 @@ TEST_F(TestDeltaWriter, open) { SAFE_DELETE(delta_writer); // test vec delta writer - DeltaWriter::open(&write_req, &delta_writer, nullptr, true); + DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true); EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); @@ -551,7 +551,7 @@ TEST_F(TestDeltaWriter, vec_write) { WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002, 30002, load_id, tuple_desc, &(tuple_desc->slots())}; DeltaWriter* delta_writer = nullptr; - DeltaWriter::open(&write_req, &delta_writer, nullptr, true); + DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true); ASSERT_NE(delta_writer, nullptr); MemPool pool; @@ -764,7 +764,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, 30003, load_id, tuple_desc, &(tuple_desc->slots())}; DeltaWriter* delta_writer = nullptr; - DeltaWriter::open(&write_req, &delta_writer, nullptr, true); + DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true); ASSERT_NE(delta_writer, nullptr); MemPool pool; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org