This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 32a029d9dc [enhancement](memtracker) Refactor load channel + memtable
mem tracker (#13795)
32a029d9dc is described below
commit 32a029d9dcd18828ecb9f7da261abe8de10a7bae
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Nov 3 09:47:12 2022 +0800
[enhancement](memtracker) Refactor load channel + memtable mem tracker
(#13795)
---
be/src/common/config.h | 12 +-----
be/src/exec/blocking_join_node.cpp | 2 +-
be/src/exec/broker_scan_node.cpp | 2 +-
be/src/exec/es_http_scan_node.cpp | 2 +-
be/src/exec/except_node.cpp | 2 +-
be/src/exec/exec_node.cpp | 2 +-
be/src/exec/exec_node.h | 3 +-
be/src/exec/hash_join_node.cpp | 2 +-
be/src/exec/intersect_node.cpp | 2 +-
be/src/exec/olap_scan_node.cpp | 9 ++---
be/src/exec/olap_scan_node.h | 2 +-
be/src/exec/olap_scanner.cpp | 8 ++--
be/src/exec/olap_scanner.h | 5 ++-
be/src/olap/delta_writer.cpp | 53 +++++++++++++++----------
be/src/olap/delta_writer.h | 24 +++++------
be/src/olap/memtable.cpp | 38 +++++++++---------
be/src/olap/memtable.h | 25 +++++++-----
be/src/olap/tablet_manager.cpp | 2 -
be/src/runtime/load_channel.cpp | 12 +++---
be/src/runtime/load_channel.h | 29 ++++++++++----
be/src/runtime/load_channel_mgr.cpp | 21 ++++------
be/src/runtime/load_channel_mgr.h | 32 ++++++++++-----
be/src/runtime/memory/mem_tracker.h | 1 +
be/src/runtime/memory/mem_tracker_limiter.h | 1 +
be/src/runtime/memory/mem_tracker_task_pool.cpp | 2 +-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 1 -
be/src/runtime/runtime_filter_mgr.cpp | 4 +-
be/src/runtime/runtime_filter_mgr.h | 2 +-
be/src/runtime/tablets_channel.cpp | 26 ++++++++----
be/src/runtime/tablets_channel.h | 11 ++---
be/src/runtime/thread_context.cpp | 16 ++++----
be/src/runtime/thread_context.h | 8 ++++
be/src/service/doris_main.cpp | 4 +-
be/src/vec/exec/join/vhash_join_node.cpp | 2 +-
be/src/vec/exec/vblocking_join_node.cpp | 2 +-
be/src/vec/exec/vbroker_scan_node.cpp | 2 +-
be/src/vec/exec/ves_http_scan_node.cpp | 2 +-
be/src/vec/exec/volap_scan_node.cpp | 2 +-
be/src/vec/exec/vschema_scan_node.cpp | 2 +-
be/test/olap/delta_writer_test.cpp | 6 +--
40 files changed, 215 insertions(+), 168 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index cf04e8d3d3..2d627682bb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -631,16 +631,8 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16");
// Whether Hook TCmalloc new/delete, currently consume/release tls mem tracker
in Hook.
CONF_Bool(enable_tcmalloc_hook, "true");
-// If true, switch TLS MemTracker to count more detailed memory,
-// including caches such as ExecNode operators and TabletManager.
-//
-// At present, there is a performance problem in the frequent switch thread
mem tracker.
-// This is because the mem tracker exists as a shared_ptr in the thread local.
Each time it is switched,
-// the atomic variable use_count in the shared_ptr of the current tracker will
be -1, and the tracker to be
-// replaced use_count +1, multi-threading Frequent changes to the same tracker
shared_ptr are slow.
-// TODO: 1. Reduce unnecessary thread mem tracker switches,
-// 2. Consider using raw pointers for mem tracker in thread local
-CONF_Bool(memory_verbose_track, "false");
+// Print more detailed logs, more detailed records, etc.
+CONF_Bool(memory_debug, "false");
// The minimum length when TCMalloc Hook consumes/releases MemTracker, consume
size
// smaller than this value will continue to accumulate. specified as number of
bytes.
diff --git a/be/src/exec/blocking_join_node.cpp
b/be/src/exec/blocking_join_node.cpp
index 5e5aee3714..0d366ab1d4 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -93,7 +93,7 @@ Status BlockingJoinNode::close(RuntimeState* state) {
void BlockingJoinNode::build_side_thread(RuntimeState* state,
std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
status->set_value(construct_build_side(state));
}
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 9cff2da4ff..a281d29c6e 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -376,7 +376,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange&
scan_range,
void BrokerScanNode::scanner_worker(int start_idx, int length) {
SCOPED_ATTACH_TASK(_runtime_state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
// Clone expr context
std::vector<ExprContext*> scanner_expr_ctxs;
auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state,
&scanner_expr_ctxs);
diff --git a/be/src/exec/es_http_scan_node.cpp
b/be/src/exec/es_http_scan_node.cpp
index 28205290cc..38de757f87 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -424,7 +424,7 @@ static std::string get_host_port(const
std::vector<TNetworkAddress>& es_hosts) {
void EsHttpScanNode::scanner_worker(int start_idx, int length,
std::promise<Status>& p_status) {
SCOPED_ATTACH_TASK(_runtime_state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
// Clone expr context
std::vector<ExprContext*> scanner_expr_ctxs;
DCHECK(start_idx < length);
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 8ad7ce9044..f12f6ca18a 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -39,8 +39,8 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState*
state) {
}
Status ExceptNode::open(RuntimeState* state) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(SetOperationNode::open(state));
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index be94dcbb28..fe0d6d1fe1 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -220,7 +220,7 @@ Status ExecNode::prepare(RuntimeState* state) {
std::bind<int64_t>(&RuntimeProfile::units_per_second,
_rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
- _mem_tracker = std::make_unique<MemTracker>("ExecNode:" +
_runtime_profile->name(),
+ _mem_tracker = std::make_shared<MemTracker>("ExecNode:" +
_runtime_profile->name(),
_runtime_profile.get());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 20e0785d94..296726b924 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -194,6 +194,7 @@ public:
RuntimeProfile::Counter* memory_used_counter() const { return
_memory_used_counter; }
MemTracker* mem_tracker() const { return _mem_tracker.get(); }
+ std::shared_ptr<MemTracker> mem_tracker_shared() const { return
_mem_tracker; }
OpentelemetrySpan get_next_span() { return _get_next_span; }
@@ -298,7 +299,7 @@ protected:
std::unique_ptr<RuntimeProfile> _runtime_profile;
/// Account for peak memory used by this node
- std::unique_ptr<MemTracker> _mem_tracker;
+ std::shared_ptr<MemTracker> _mem_tracker;
RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _rows_returned_rate;
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 7e9a2bf989..592c93ee7e 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -184,7 +184,7 @@ Status HashJoinNode::close(RuntimeState* state) {
void HashJoinNode::probe_side_open_thread(RuntimeState* state,
std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
status->set_value(child(0)->open(state));
}
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index 9bf02cedd4..d04ddc5061 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -43,8 +43,8 @@ Status IntersectNode::init(const TPlanNode& tnode,
RuntimeState* state) {
// 2 probe with child(1), then filter the hash table and find the matched
item, use them to rebuild a hash table
// repeat [2] this for all the rest child
Status IntersectNode::open(RuntimeState* state) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(SetOperationNode::open(state));
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index ef4f2236c5..1dcd5c3754 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -191,7 +191,7 @@ Status OlapScanNode::prepare(RuntimeState* state) {
_init_counter(state);
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
- _scanner_mem_tracker = std::make_unique<MemTracker>("Scanners");
+ _scanner_mem_tracker = std::make_shared<MemTracker>("Scanners");
if (_tuple_desc == nullptr) {
// TODO: make sure we print all available diagnostic output to our
error log
@@ -941,7 +941,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state)
{
}
OlapScanner* scanner =
new OlapScanner(state, this,
_olap_scan_node.is_preaggregation,
- _need_agg_finalize, *scan_range,
_scanner_mem_tracker.get());
+ _need_agg_finalize, *scan_range,
_scanner_mem_tracker);
scanner->set_batch_size(_batch_size);
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare
failed.
@@ -1483,7 +1483,7 @@ Status
OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
void OlapScanNode::transfer_thread(RuntimeState* state) {
// scanner open pushdown to scanThread
SCOPED_ATTACH_TASK(state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
Status status = Status::OK();
for (auto scanner : _olap_scanners) {
status = Expr::clone_if_not_exists(_conjunct_ctxs, state,
scanner->conjunct_ctxs());
@@ -1660,8 +1660,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
}
void OlapScanNode::scanner_thread(OlapScanner* scanner) {
- // SCOPED_ATTACH_TASK(_runtime_state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
Thread::set_self_name("olap_scanner");
if (UNLIKELY(_transfer_done)) {
_scanner_done = true;
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index f5d6e38cfd..47cf6cd01c 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -264,7 +264,7 @@ protected:
int64_t _buffered_bytes;
// Count the memory consumption of Rowset Reader and Tablet Reader in
OlapScanner.
- std::unique_ptr<MemTracker> _scanner_mem_tracker;
+ std::shared_ptr<MemTracker> _scanner_mem_tracker;
EvalConjunctsFn _eval_conjuncts_fn;
// the max num of scan keys of this scan request.
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 6f348d02a1..e90bc2124c 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -43,7 +43,7 @@ namespace doris {
OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent,
bool aggregation,
bool need_agg_finalize, const TPaloScanRange&
scan_range,
- MemTracker* tracker)
+ const std::shared_ptr<MemTracker>& tracker)
: _runtime_state(runtime_state),
_parent(parent),
_tuple_desc(parent->_tuple_desc),
@@ -51,8 +51,8 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state,
OlapScanNode* parent, bool
_is_open(false),
_aggregation(aggregation),
_need_agg_finalize(need_agg_finalize),
- _version(-1) {
- _mem_tracker = tracker;
+ _version(-1),
+ _mem_tracker(tracker) {
_tablet_schema = std::make_shared<TabletSchema>();
}
@@ -326,7 +326,7 @@ Status OlapScanner::get_batch(RuntimeState* state,
RowBatch* batch, bool* eof) {
bzero(tuple_buf, _batch_size * _tuple_desc->byte_size());
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf);
- std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker));
+ std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get()));
int64_t raw_rows_threshold = raw_rows_read() +
config::doris_scanner_row_num;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
{
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 134a515219..961f80c9d0 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -42,7 +42,8 @@ class OlapScanNode;
class OlapScanner {
public:
OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
aggregation,
- bool need_agg_finalize, const TPaloScanRange& scan_range,
MemTracker* tracker);
+ bool need_agg_finalize, const TPaloScanRange& scan_range,
+ const std::shared_ptr<MemTracker>& tracker);
virtual ~OlapScanner() = default;
@@ -151,7 +152,7 @@ protected:
MonotonicStopWatch _watcher;
- MemTracker* _mem_tracker;
+ std::shared_ptr<MemTracker> _mem_tracker;
TabletSchemaSPtr _tablet_schema;
};
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index c8a36fbec7..39185cdbac 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -33,14 +33,14 @@
namespace doris {
-Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer,
- const std::shared_ptr<MemTrackerLimiter>&
parent_tracker, bool is_vec) {
- *writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker,
is_vec);
+Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, const
UniqueId& load_id,
+ bool is_vec) {
+ *writer = new DeltaWriter(req, StorageEngine::instance(), load_id, is_vec);
return Status::OK();
}
-DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
- const std::shared_ptr<MemTrackerLimiter>&
parent_tracker, bool is_vec)
+DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
const UniqueId& load_id,
+ bool is_vec)
: _req(*req),
_tablet(nullptr),
_cur_rowset(nullptr),
@@ -48,7 +48,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine*
storage_engine,
_tablet_schema(new TabletSchema),
_delta_written_success(false),
_storage_engine(storage_engine),
- _parent_tracker(parent_tracker),
+ _load_id(load_id),
_is_vec(is_vec) {}
DeltaWriter::~DeltaWriter() {
@@ -109,8 +109,6 @@ Status DeltaWriter::init() {
_rowset_ids = _tablet->all_rs_id(_cur_max_version);
}
- _mem_tracker = std::make_shared<MemTrackerLimiter>(
- -1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()),
_parent_tracker);
// check tablet version number
if (_tablet->version_count() > config::max_tablet_version_num) {
//trigger quick compaction
@@ -164,8 +162,6 @@ Status DeltaWriter::write(Tuple* tuple) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
- SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
-
_mem_table->insert(tuple);
// if memtable is full, push it to the flush executor,
@@ -281,9 +277,21 @@ void DeltaWriter::_reset_mem_table() {
if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap ==
nullptr) {
_delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id()));
}
+ auto mem_table_insert_tracker = std::make_shared<MemTracker>(
+
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
+ std::to_string(tablet_id()), _mem_table_num,
_load_id.to_string()));
+ auto mem_table_flush_tracker = std::make_shared<MemTracker>(
+
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
+ std::to_string(tablet_id()), _mem_table_num++,
_load_id.to_string()));
+ {
+ std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+ _mem_table_tracker.push_back(mem_table_insert_tracker);
+ _mem_table_tracker.push_back(mem_table_flush_tracker);
+ }
_mem_table.reset(new MemTable(_tablet, _schema.get(),
_tablet_schema.get(), _req.slots,
_req.tuple_desc, _rowset_writer.get(),
_delete_bitmap,
- _rowset_ids, _cur_max_version, _mem_tracker,
_is_vec));
+ _rowset_ids, _cur_max_version,
mem_table_insert_tracker,
+ mem_table_flush_tracker, _is_vec));
}
Status DeltaWriter::close() {
@@ -384,8 +392,9 @@ Status DeltaWriter::cancel() {
}
void DeltaWriter::save_mem_consumption_snapshot() {
+ std::lock_guard<std::mutex> l(_lock);
_mem_consumption_snapshot = mem_consumption();
- _memtable_consumption_snapshot = memtable_consumption();
+ _memtable_consumption_snapshot = _mem_table->memory_usage();
}
int64_t DeltaWriter::get_memtable_consumption_inflush() const {
@@ -397,20 +406,20 @@ int64_t DeltaWriter::get_memtable_consumption_snapshot()
const {
return _memtable_consumption_snapshot;
}
-int64_t DeltaWriter::mem_consumption() const {
- if (_mem_tracker == nullptr) {
+int64_t DeltaWriter::mem_consumption() {
+ if (_flush_token == nullptr) {
// This method may be called before this writer is initialized.
- // So _mem_tracker may be null.
+ // So _flush_token may be null.
return 0;
}
- return _mem_tracker->consumption();
-}
-
-int64_t DeltaWriter::memtable_consumption() const {
- if (_mem_table == nullptr) {
- return 0;
+ int64_t mem_usage = 0;
+ {
+ std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+ for (auto mem_table_tracker : _mem_table_tracker) {
+ mem_usage += mem_table_tracker->consumption();
+ }
}
- return _mem_table->mem_tracker_hook()->consumption();
+ return mem_usage;
}
int64_t DeltaWriter::partition_id() const {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index a7770326fd..85133e456c 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -20,6 +20,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/tablet.h"
+#include "util/spinlock.h"
namespace doris {
@@ -56,9 +57,7 @@ struct WriteRequest {
class DeltaWriter {
public:
static Status open(WriteRequest* req, DeltaWriter** writer,
- const std::shared_ptr<MemTrackerLimiter>&
parent_tracker =
- std::shared_ptr<MemTrackerLimiter>(),
- bool is_vec = false);
+ const UniqueId& load_id = TUniqueId(), bool is_vec =
false);
~DeltaWriter();
@@ -93,7 +92,7 @@ public:
int64_t partition_id() const;
- int64_t mem_consumption() const;
+ int64_t mem_consumption();
// Wait all memtable in flush queue to be flushed
Status wait_flush();
@@ -102,8 +101,6 @@ public:
int32_t schema_hash() { return _tablet->schema_hash(); }
- int64_t memtable_consumption() const;
-
void save_mem_consumption_snapshot();
int64_t get_memtable_consumption_inflush() const;
@@ -113,8 +110,8 @@ public:
void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
private:
- DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
- const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool
is_vec);
+ DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const
UniqueId& load_id,
+ bool is_vec);
// push a full memtable to flush executor
Status _flush_memtable_async();
@@ -146,14 +143,11 @@ private:
bool _delta_written_success;
StorageEngine* _storage_engine;
+ UniqueId _load_id;
std::unique_ptr<FlushToken> _flush_token;
- // The memory value automatically tracked by the Tcmalloc hook is 20% less
than the manually recorded
- // value in the memtable, because some freed memory is not allocated in
the DeltaWriter.
- // The memory value automatically tracked by the Tcmalloc hook, used for
load channel mgr to trigger
- // flush memtable when the sum of all channel memory exceeds the limit.
- // The manually recorded value of memtable is used to flush when it is
larger than write_buffer_size.
- std::shared_ptr<MemTrackerLimiter> _mem_tracker;
- std::shared_ptr<MemTrackerLimiter> _parent_tracker;
+ std::vector<std::shared_ptr<MemTracker>> _mem_table_tracker;
+ SpinLock _mem_table_tracker_lock;
+ std::atomic<uint32_t> _mem_table_num = 1;
// The counter of number of segment flushed already.
int64_t _segment_counter = 0;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 5eed903d29..f61e945bda 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -35,14 +35,14 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema,
const TabletSchema* t
const std::vector<SlotDescriptor*>* slot_descs,
TupleDescriptor* tuple_desc,
RowsetWriter* rowset_writer, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids, int64_t
cur_max_version,
- const std::shared_ptr<MemTrackerLimiter>& tracker, bool
support_vec)
+ const std::shared_ptr<MemTracker>& insert_mem_tracker,
+ const std::shared_ptr<MemTracker>& flush_mem_tracker, bool
support_vec)
: _tablet(std::move(tablet)),
_schema(schema),
_tablet_schema(tablet_schema),
_slot_descs(slot_descs),
- _mem_tracker_hook(std::make_shared<MemTrackerLimiter>(
- -1, fmt::format("MemTableHook:tabletId={}",
std::to_string(tablet_id())),
- tracker)),
+ _insert_mem_tracker(insert_mem_tracker),
+ _flush_mem_tracker(flush_mem_tracker),
_schema_size(_schema->schema_size()),
_rowset_writer(rowset_writer),
_is_first_insertion(true),
@@ -53,12 +53,10 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema,
const TabletSchema* t
_delete_bitmap(delete_bitmap),
_rowset_ids(rowset_ids),
_cur_max_version(cur_max_version) {
- _mem_tracker_hook->enable_reset_zero();
- SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
- _mem_tracker_manual = std::make_unique<MemTracker>(
- fmt::format("MemTableManual:tabletId={}",
std::to_string(tablet_id())));
- _buffer_mem_pool = std::make_unique<MemPool>(_mem_tracker_manual.get());
- _table_mem_pool = std::make_unique<MemPool>(_mem_tracker_manual.get());
+ _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
+ fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id())));
+ _buffer_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get());
+ _table_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get());
if (support_vec) {
_skip_list = nullptr;
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
@@ -153,12 +151,14 @@ MemTable::~MemTable() {
}
}
std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(),
std::default_delete<RowInBlock>());
- _mem_tracker_manual->release(_mem_usage);
+ _insert_mem_tracker->release(_mem_usage);
_buffer_mem_pool->free_all();
_table_mem_pool->free_all();
- DCHECK_EQ(_mem_tracker_manual->consumption(), 0)
+ _flush_mem_tracker->set_consumption(0);
+ DCHECK_EQ(_insert_mem_tracker->consumption(), 0)
<< std::endl
- << MemTracker::log_usage(_mem_tracker_manual->make_snapshot(0));
+ << MemTracker::log_usage(_insert_mem_tracker->make_snapshot(0));
+ DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
}
MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) :
_schema(schema) {}
@@ -176,7 +176,7 @@ int MemTable::RowInBlockComparator::operator()(const
RowInBlock* left,
}
void MemTable::insert(const vectorized::Block* input_block, const
std::vector<int>& row_idxs) {
- SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
+ SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
auto target_block = input_block->copy_block(_column_offset);
if (_is_first_insertion) {
_is_first_insertion = false;
@@ -193,8 +193,7 @@ void MemTable::insert(const vectorized::Block* input_block,
const std::vector<in
_input_mutable_block.add_rows(&target_block, row_idxs.data(),
row_idxs.data() + num_rows);
size_t input_size = target_block.allocated_bytes() * num_rows /
target_block.rows();
_mem_usage += input_size;
- _mem_tracker_manual->consume(input_size);
-
+ _insert_mem_tracker->consume(input_size);
for (int i = 0; i < num_rows; i++) {
_row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock +
i});
_insert_one_row_from_block(_row_in_blocks.back());
@@ -374,7 +373,8 @@ void MemTable::_collect_vskiplist_results() {
if constexpr (!is_final) {
// if is not final, we collect the agg results to input_block and
then continue to insert
size_t shrunked_after_agg =
_output_mutable_block.allocated_bytes();
- _mem_tracker_manual->consume(shrunked_after_agg - _mem_usage);
+ // flush will not run here, so will not duplicate
`_flush_mem_tracker`
+ _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage);
_mem_usage = shrunked_after_agg;
_input_mutable_block.swap(_output_mutable_block);
//TODO(weixang):opt here.
@@ -392,7 +392,7 @@ void MemTable::_collect_vskiplist_results() {
}
void MemTable::shrink_memtable_by_agg() {
- SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
+ SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
if (keys_type() == KeysType::DUP_KEYS) {
return;
}
@@ -434,7 +434,7 @@ Status MemTable::_generate_delete_bitmap() {
}
Status MemTable::flush() {
- SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
+ SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker.get());
VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
<< ", memsize: " << memory_usage() << ", rows: " << _rows;
int64_t duration_ns = 0;
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 9d837b5560..52589f35cc 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -45,18 +45,17 @@ public:
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor*
tuple_desc,
RowsetWriter* rowset_writer, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids, int64_t cur_max_version,
- const std::shared_ptr<MemTrackerLimiter>& tracker, bool
support_vec = false);
+ const std::shared_ptr<MemTracker>& insert_mem_tracker,
+ const std::shared_ptr<MemTracker>& flush_mem_tracker, bool
support_vec = false);
~MemTable();
int64_t tablet_id() const { return _tablet->tablet_id(); }
KeysType keys_type() const { return _tablet->keys_type(); }
- std::shared_ptr<MemTrackerLimiter> mem_tracker_hook() const { return
_mem_tracker_hook; }
- size_t memory_usage() const { return _mem_tracker_manual->consumption(); }
-
- inline void insert(const Tuple* tuple) {
- SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
- (this->*_insert_fn)(tuple);
+ size_t memory_usage() const {
+ return _insert_mem_tracker->consumption() +
_flush_mem_tracker->consumption();
}
+
+ inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
// insert tuple from (row_pos) to (row_pos+num_rows)
void insert(const vectorized::Block* block, const std::vector<int>&
row_idxs);
@@ -161,8 +160,16 @@ private:
std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
- std::unique_ptr<MemTracker> _mem_tracker_manual;
- std::shared_ptr<MemTrackerLimiter> _mem_tracker_hook;
+ // `_insert_manual_mem_tracker` manually records the memory value of
memtable insert()
+ // `_flush_hook_mem_tracker` automatically records the memory value of
memtable flush() through mem hook.
+ // Is used to flush when _insert_manual_mem_tracker larger than
write_buffer_size and run flush memtable
+ // when the sum of all memtable (_insert_manual_mem_tracker +
_flush_hook_mem_tracker) exceeds the limit.
+ std::shared_ptr<MemTracker> _insert_mem_tracker;
+ std::shared_ptr<MemTracker> _flush_mem_tracker;
+ // It is only used for verification when the value of
`_insert_manual_mem_tracker` is suspected to be wrong.
+ // The memory value automatically tracked by the mem hook is 20% less than
the manually recorded
+ // value in the memtable, because some freed memory is not allocated in
the DeltaWriter.
+ std::unique_ptr<MemTracker> _insert_mem_tracker_use_hook;
// This is a buffer, to hold the memory referenced by the rows that have
not
// been inserted into the SkipList
std::unique_ptr<MemPool> _buffer_mem_pool;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 7cc81b782e..7580b1aed3 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -722,7 +722,6 @@ Status TabletManager::load_tablet_from_meta(DataDir*
data_dir, TTabletId tablet_
TSchemaHash schema_hash, const
string& meta_binary,
bool update_meta, bool force, bool
restore,
bool check_path) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
TabletMetaSharedPtr tablet_meta(new TabletMeta());
Status status = tablet_meta->deserialize(meta_binary);
if (!status.ok()) {
@@ -805,7 +804,6 @@ Status TabletManager::load_tablet_from_meta(DataDir*
data_dir, TTabletId tablet_
Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
SchemaHash schema_hash, const
string& schema_hash_path,
bool force, bool restore) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
LOG(INFO) << "begin to load tablet from dir. "
<< " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
<< " path = " << schema_hash_path << " force = " << force << "
restore = " << restore;
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index c989f11336..ba15c6c7b4 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,11 +25,11 @@
namespace doris {
-LoadChannel::LoadChannel(const UniqueId& load_id,
std::shared_ptr<MemTrackerLimiter>& mem_tracker,
+LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker>
mem_tracker,
int64_t timeout_s, bool is_high_priority, const
std::string& sender_ip,
bool is_vec)
: _load_id(load_id),
- _mem_tracker(mem_tracker),
+ _mem_tracker(std::move(mem_tracker)),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
@@ -38,7 +38,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id,
std::shared_ptr<MemTrackerLimi
// _load_channels in load_channel_mgr, or it may be erased
// immediately by gc thread.
_last_updated_time.store(time(nullptr));
- _mem_tracker->enable_reset_zero();
}
LoadChannel::~LoadChannel() {
@@ -59,8 +58,11 @@ Status LoadChannel::open(const PTabletWriterOpenRequest&
params) {
} else {
// create a new tablets channel
TabletsChannelKey key(params.id(), index_id);
- channel.reset(new TabletsChannel(key, _mem_tracker,
_is_high_priority, _is_vec));
- _tablets_channels.insert({index_id, channel});
+ channel.reset(new TabletsChannel(key, _load_id, _is_high_priority,
_is_vec));
+ {
+ std::lock_guard<SpinLock> l(_tablets_channels_lock);
+ _tablets_channels.insert({index_id, channel});
+ }
}
}
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index a75639e433..5af420a970 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,9 +39,8 @@ class Cache;
// corresponding to a certain load job
class LoadChannel {
public:
- LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>&
mem_tracker,
- int64_t timeout_s, bool is_high_priority, const std::string&
sender_ip,
- bool is_vec);
+ LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker>
mem_tracker, int64_t timeout_s,
+ bool is_high_priority, const std::string& sender_ip, bool
is_vec);
~LoadChannel();
// open a new load channel if not exist
@@ -67,7 +66,17 @@ public:
template <typename TabletWriterAddResult>
Status handle_mem_exceed_limit(TabletWriterAddResult* response);
- int64_t mem_consumption() const { return _mem_tracker->consumption(); }
+ int64_t mem_consumption() {
+ int64_t mem_usage = 0;
+ {
+ std::lock_guard<SpinLock> l(_tablets_channels_lock);
+ for (auto& it : _tablets_channels) {
+ mem_usage += it.second->mem_consumption();
+ }
+ }
+ _mem_tracker->set_consumption(mem_usage);
+ return mem_usage;
+ }
int64_t timeout() const { return _timeout_s; }
@@ -89,7 +98,10 @@ protected:
request.write_single_replica()));
if (finished) {
std::lock_guard<std::mutex> l(_lock);
- _tablets_channels.erase(index_id);
+ {
+ std::lock_guard<SpinLock> l(_tablets_channels_lock);
+ _tablets_channels.erase(index_id);
+ }
_finished_channel_ids.emplace(index_id);
}
return Status::OK();
@@ -102,12 +114,13 @@ private:
UniqueId _load_id;
// Tracks the total memory consumed by current load job on this BE
- std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+ std::unique_ptr<MemTracker> _mem_tracker;
// lock protect the tablets channel map
std::mutex _lock;
// index id -> tablets channel
std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>>
_tablets_channels;
+ SpinLock _tablets_channels_lock;
// This is to save finished channels id, to handle the retry request.
std::unordered_set<int64_t> _finished_channel_ids;
// set to true if at least one tablets channel has been opened
@@ -163,7 +176,7 @@ Status LoadChannel::add_batch(const TabletWriterAddRequest&
request,
return st;
}
-inline std::ostream& operator<<(std::ostream& os, const LoadChannel&
load_channel) {
+inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" <<
load_channel.mem_consumption()
<< ", last_update_time=" <<
static_cast<uint64_t>(load_channel.last_updated_time())
<< ", is high priority: " << load_channel.is_high_priority() << ")";
@@ -176,7 +189,7 @@ Status
LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) {
std::shared_ptr<TabletsChannel> channel;
{
// lock so that only one thread can check mem limit
- std::lock_guard<std::mutex> l(_lock);
+ std::lock_guard<SpinLock> l(_tablets_channels_lock);
found = _find_largest_consumption_channel(&channel);
}
// Release lock so that other threads can still call add_batch
concurrently.
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index e6f908f69c..8292b25656 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -68,11 +68,9 @@ LoadChannelMgr::~LoadChannelMgr() {
}
Status LoadChannelMgr::init(int64_t process_mem_limit) {
- int64_t load_mgr_mem_limit =
calc_process_max_load_memory(process_mem_limit);
- _load_soft_mem_limit = load_mgr_mem_limit *
config::load_process_soft_mem_limit_percent / 100;
- _process_soft_mem_limit =
- ExecEnv::GetInstance()->process_mem_tracker()->limit() *
config::soft_mem_limit_frac;
- _mem_tracker = std::make_shared<MemTrackerLimiter>(load_mgr_mem_limit,
"LoadChannelMgr");
+ _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
+ _load_soft_mem_limit = _load_hard_mem_limit *
config::load_process_soft_mem_limit_percent / 100;
+ _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
REGISTER_HOOK_METRIC(load_channel_mem_consumption,
[this]() { return _mem_tracker->consumption(); });
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -96,13 +94,10 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest&
params) {
bool is_high_priority = (params.has_is_high_priority() &&
params.is_high_priority());
// Use the same mem limit as LoadChannelMgr for a single load
channel
- auto channel_mem_tracker = std::make_shared<MemTrackerLimiter>(
- _mem_tracker->limit(),
- fmt::format("LoadChannel#senderIp={}#loadID={}",
params.sender_ip(),
- load_id.to_string()),
- _mem_tracker);
- channel.reset(new LoadChannel(load_id, channel_mem_tracker,
channel_timeout_s,
- is_high_priority, params.sender_ip(),
+ auto channel_mem_tracker =
std::make_unique<MemTracker>(fmt::format(
+ "LoadChannel#senderIp={}#loadID={}", params.sender_ip(),
load_id.to_string()));
+ channel.reset(new LoadChannel(load_id,
std::move(channel_mem_tracker),
+ channel_timeout_s, is_high_priority,
params.sender_ip(),
params.is_vectorized()));
_load_channels.insert({load_id, channel});
}
@@ -200,7 +195,7 @@ Status LoadChannelMgr::_start_load_channels_clean() {
// this log print every 1 min, so that we could observe the mem
consumption of load process
// on this Backend
- LOG(INFO) << "load mem consumption(bytes). limit: " <<
_mem_tracker->limit()
+ LOG(INFO) << "load mem consumption(bytes). limit: " << _load_hard_mem_limit
<< ", current: " << _mem_tracker->consumption()
<< ", peak: " << _mem_tracker->peak_consumption();
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index 686322b076..4fd113cc77 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -28,6 +28,7 @@
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "gutil/ref_counted.h"
+#include "gutil/walltime.h"
#include "olap/lru_cache.h"
#include "runtime/load_channel.h"
#include "runtime/tablets_channel.h"
@@ -58,6 +59,14 @@ public:
// cancel all tablet stream for 'load_id' load
Status cancel(const PTabletWriterCancelRequest& request);
+ void refresh_mem_tracker() {
+ int64_t mem_usage = 0;
+ for (auto& kv : _load_channels) {
+ mem_usage += kv.second->mem_consumption();
+ }
+ _mem_tracker->set_consumption(mem_usage);
+ }
+
private:
template <typename Request>
Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool&
is_eof,
@@ -80,9 +89,9 @@ protected:
Cache* _last_success_channel = nullptr;
// check the total load channel mem consumption of this Backend
- std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+ std::unique_ptr<MemTracker> _mem_tracker;
+ int64_t _load_hard_mem_limit = -1;
int64_t _load_soft_mem_limit = -1;
- int64_t _process_soft_mem_limit = -1;
// If hard limit reached, one thread will trigger load channel flush,
// other threads should wait on the condition variable.
@@ -153,9 +162,9 @@ template <typename TabletWriterAddResult>
Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult*
response) {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
- DCHECK(_process_soft_mem_limit > 0);
+ int64_t process_mem_limit = MemInfo::mem_limit() *
config::soft_mem_limit_frac;
if (_mem_tracker->consumption() < _load_soft_mem_limit &&
- MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) {
+ MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
return Status::OK();
}
// Pick load channel to reduce memory.
@@ -163,14 +172,15 @@ Status
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
{
std::unique_lock<std::mutex> l(_lock);
while (_should_wait_flush) {
- LOG(INFO) << "Reached the load hard limit " <<
_mem_tracker->limit()
+ LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
<< ", waiting for flush";
_wait_flush_cond.wait(l);
}
// Some other thread is flushing data, and not reached hard limit now,
// we don't need to handle mem limit in current thread.
- if (_reduce_memory_channel != nullptr &&
!_mem_tracker->limit_exceeded() &&
- MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) {
+ if (_reduce_memory_channel != nullptr &&
+ _mem_tracker->consumption() < _load_hard_mem_limit &&
+ MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
return Status::OK();
}
@@ -199,13 +209,13 @@ Status
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
_reduce_memory_channel = channel;
std::ostringstream oss;
- if (MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) {
+ if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
oss << "reducing memory of " << *channel << " because total load
mem consumption "
<< PrettyPrinter::print(_mem_tracker->consumption(),
TUnit::BYTES)
<< " has exceeded";
- if (_mem_tracker->limit_exceeded()) {
+ if (_mem_tracker->consumption() > _load_hard_mem_limit) {
_should_wait_flush = true;
- oss << " hard limit: " <<
PrettyPrinter::print(_mem_tracker->limit(), TUnit::BYTES);
+ oss << " hard limit: " <<
PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
} else {
oss << " soft limit: " <<
PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
}
@@ -213,7 +223,7 @@ Status
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
_should_wait_flush = true;
oss << "reducing memory of " << *channel << " because process
memory used "
<< PerfCounters::get_vm_rss_str() << " has exceeded limit "
- << PrettyPrinter::print(_process_soft_mem_limit, TUnit::BYTES)
+ << PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
<< " , tc/jemalloc allocator cache " <<
MemInfo::allocator_cache_mem_str();
}
LOG(INFO) << oss.str();
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index c939d9a62d..01a0d58cdb 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -72,6 +72,7 @@ public:
void release(int64_t bytes) { consume(-bytes); }
// Transfer 'bytes' of consumption from this tracker to 'dst'.
void transfer_to(MemTracker* dst, int64_t bytes);
+ void set_consumption(int64_t bytes) { _consumption->set(bytes); }
public:
bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit <
consumption(); }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index f6440c0ace..6bc9449c20 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -82,6 +82,7 @@ public:
return false;
}
+ void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption
not supported"; }
int64_t group_num() const { return _group_num; }
bool has_limit() const { return _limit >= 0; }
int64_t limit() const { return _limit; }
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 58d98278c6..28539703b4 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -108,7 +108,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
MemTracker::print_bytes(it->second->consumption()),
MemTracker::print_bytes(it->second->peak_consumption()));
expired_task_ids.emplace_back(it->first);
- } else if (config::memory_verbose_track) {
+ } else if (config::memory_debug) {
it->second->print_log_usage("query routine");
it->second->enable_print_log_usage();
}
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 5289db99e3..4661fbea5d 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -170,7 +170,6 @@ private:
inline void ThreadMemTrackerMgr::init() {
DCHECK(_limiter_tracker_stack.size() == 0);
DCHECK(_limiter_tracker_raw == nullptr);
- DCHECK(_consumer_tracker_stack.empty());
init_impl();
}
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 9bc95e2de1..8087d6efc5 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -160,7 +160,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId
query_id, UniqueId frag
const TQueryOptions&
query_options) {
_query_id = query_id;
_fragment_instance_id = fragment_instance_id;
- _mem_tracker =
std::make_unique<MemTracker>("RuntimeFilterMergeControllerEntity");
+ _mem_tracker =
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity");
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter)
{
int filter_id = filterid_to_desc.first;
@@ -181,7 +181,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId
query_id, UniqueId frag
// merge data
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest*
request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
- // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
int merged_size = 0;
{
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index e090be6f4f..63960f0182 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -147,7 +147,7 @@ private:
UniqueId _fragment_instance_id;
// protect _filter_map
std::mutex _filter_map_mutex;
- std::unique_ptr<MemTracker> _mem_tracker;
+ std::shared_ptr<MemTracker> _mem_tracker;
// TODO: convert filter id to i32
// filter-id -> val
std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 9023c100f1..584e101199 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -34,16 +34,14 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count,
MetricUnit::NOUNIT);
std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
- const std::shared_ptr<MemTrackerLimiter>&
parent_tracker,
+TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId&
load_id,
bool is_high_priority, bool is_vec)
: _key(key),
_state(kInitialized),
+ _load_id(load_id),
_closed_senders(64),
_is_high_priority(is_high_priority),
_is_vec(is_vec) {
- _mem_tracker = std::make_shared<MemTrackerLimiter>(
- -1, fmt::format("TabletsChannel#indexID={}", key.index_id),
parent_tracker);
static std::once_flag once_flag;
std::call_once(once_flag, [] {
REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return
_s_tablet_writer_count.load(); });
@@ -194,6 +192,17 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
}
}
+int64_t TabletsChannel::mem_consumption() {
+ int64_t mem_usage = 0;
+ {
+ std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ for (auto& it : _tablet_writers) {
+ mem_usage += it.second->mem_consumption();
+ }
+ }
+ return mem_usage;
+}
+
template <typename TabletWriterAddResult>
Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
if (_try_to_wait_flushing()) {
@@ -243,7 +252,7 @@ Status
TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
// So here we only flush part of the tablet, and the next time the
reduce memory operation is triggered,
// the tablet that has not been flushed before will accumulate more
data, thereby reducing the number of flushes.
- int64_t mem_to_flushed = _mem_tracker->consumption() / 3;
+ int64_t mem_to_flushed = mem_consumption() / 3;
if (total_memtable_consumption_in_flush < mem_to_flushed) {
mem_to_flushed -= total_memtable_consumption_in_flush;
int counter = 0;
@@ -349,7 +358,7 @@ Status TabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& request
wrequest.ptable_schema_param = request.schema();
DeltaWriter* writer = nullptr;
- auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker, _is_vec);
+ auto st = DeltaWriter::open(&wrequest, &writer, _load_id, _is_vec);
if (!st.ok()) {
std::stringstream ss;
ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
@@ -358,7 +367,10 @@ Status TabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& request
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
- _tablet_writers.emplace(tablet.tablet_id(), writer);
+ {
+ std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ _tablet_writers.emplace(tablet.tablet_id(), writer);
+ }
}
_s_tablet_writer_count += _tablet_writers.size();
DCHECK_EQ(_tablet_writers.size(), request.tablets_size());
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index cf7e067b32..bfdbf0e8f7 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -61,8 +61,7 @@ class LoadChannel;
// Write channel for a particular (load, index).
class TabletsChannel {
public:
- TabletsChannel(const TabletsChannelKey& key,
- const std::shared_ptr<MemTrackerLimiter>& parent_tracker,
bool is_high_priority,
+ TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool
is_high_priority,
bool is_vec);
~TabletsChannel();
@@ -96,7 +95,7 @@ public:
template <typename TabletWriterAddResult>
Status reduce_mem_usage(TabletWriterAddResult* response);
- int64_t mem_consumption() const { return _mem_tracker->consumption(); }
+ int64_t mem_consumption();
private:
template <typename Request>
@@ -119,6 +118,8 @@ private:
// make execute sequence
std::mutex _lock;
+ SpinLock _tablet_writers_lock;
+
enum State {
kInitialized,
kOpened,
@@ -126,6 +127,8 @@ private:
};
State _state;
+ UniqueId _load_id;
+
// initialized in open function
int64_t _txn_id = -1;
int64_t _index_id = -1;
@@ -160,8 +163,6 @@ private:
static std::atomic<uint64_t> _s_tablet_writer_count;
- std::shared_ptr<MemTrackerLimiter> _mem_tracker;
-
bool _is_high_priority = false;
bool _is_vec = false;
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index 1e6be4a0d9..babee85c3e 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -76,18 +76,20 @@
SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() {
}
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker*
mem_tracker) {
- if (config::memory_verbose_track) {
-
thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
- }
+
thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
+}
+
+AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
+ const std::shared_ptr<MemTracker>& mem_tracker)
+ : _mem_tracker(mem_tracker) {
+
thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
}
AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
- if (config::memory_verbose_track) {
#ifndef NDEBUG
-
DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1);
+
DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1);
#endif // NDEBUG
- thread_context()->_thread_mem_tracker_mgr->pop_consumer_tracker();
- }
+ thread_context()->_thread_mem_tracker_mgr->pop_consumer_tracker();
}
} // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 861b3e0567..7d213bfa4d 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -39,6 +39,7 @@
// Compared to count `scope_mem`, MemTracker is easier to observe from the
outside and is thread-safe.
// Usage example: std::unique_ptr<MemTracker> tracker =
std::make_unique<MemTracker>("first_tracker");
// { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); xxx; xxx; }
+// Usually used to record query more detailed memory, including ExecNode
operators.
#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
auto VARNAME_LINENUM(add_mem_consumer) =
doris::AddThreadMemTrackerConsumer(mem_tracker)
#else
@@ -300,9 +301,16 @@ public:
class AddThreadMemTrackerConsumer {
public:
+ // The owner and user of MemTracker are in the same thread, and the raw
pointer is faster.
explicit AddThreadMemTrackerConsumer(MemTracker* mem_tracker);
+ // The owner and user of MemTracker are in different threads.
+ explicit AddThreadMemTrackerConsumer(const std::shared_ptr<MemTracker>&
mem_tracker);
+
~AddThreadMemTrackerConsumer();
+
+private:
+ std::shared_ptr<MemTracker> _mem_tracker = nullptr; // Avoid mem_tracker
being released midway.
};
class StopCheckThreadMemTrackerLimit {
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a48d3adb3e..b6a7e3a118 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -52,6 +52,7 @@
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "runtime/heartbeat_flags.h"
+#include "runtime/load_channel_mgr.h"
#include "runtime/memory/mem_tracker_task_pool.h"
#include "service/backend_options.h"
#include "service/backend_service.h"
@@ -510,6 +511,7 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->allocator_cache_mem_tracker()->consume(
allocator_cache_mem_diff);
CONSUME_THREAD_MEM_TRACKER(allocator_cache_mem_diff);
+
doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker();
// 1s clear the expired task mem tracker, a query mem tracker is about
57 bytes.
// this will cause coredump for ASAN build when running regression
test,
@@ -518,7 +520,7 @@ int main(int argc, char** argv) {
// The process tracker print log usage interval is 1s to avoid a large
number of tasks being
// canceled when the process exceeds the mem limit, resulting in too
many duplicate logs.
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
- if (doris::config::memory_verbose_track) {
+ if (doris::config::memory_debug) {
doris::ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage("main
routine");
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 91b5784da3..eb419eb52f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1268,7 +1268,7 @@ Status HashJoinNode::open(RuntimeState* state) {
void HashJoinNode::_probe_side_open_thread(RuntimeState* state,
std::promise<Status>* status) {
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"HashJoinNode::_hash_table_build_thread");
SCOPED_ATTACH_TASK(state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
status->set_value(child(0)->open(state));
}
diff --git a/be/src/vec/exec/vblocking_join_node.cpp
b/be/src/vec/exec/vblocking_join_node.cpp
index 809da414f4..f4b9258c92 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -75,7 +75,7 @@ Status VBlockingJoinNode::close(RuntimeState* state) {
void VBlockingJoinNode::build_side_thread(RuntimeState* state,
std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
status->set_value(construct_build_side(state));
// Release the thread token as soon as possible (before the main thread
joins
// on it). This way, if we had a chain of 10 joins using 1 additional
thread,
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp
b/be/src/vec/exec/vbroker_scan_node.cpp
index 4e1177f4bd..f164b5a93d 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -273,7 +273,7 @@ Status VBrokerScanNode::scanner_scan(const
TBrokerScanRange& scan_range, Scanner
void VBrokerScanNode::scanner_worker(int start_idx, int length) {
START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span,
"VBrokerScanNode::scanner_worker");
SCOPED_ATTACH_TASK(_runtime_state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
Thread::set_self_name("vbroker_scanner");
Status status = Status::OK();
ScannerCounter counter;
diff --git a/be/src/vec/exec/ves_http_scan_node.cpp
b/be/src/vec/exec/ves_http_scan_node.cpp
index 9f308acc6b..d0704a2477 100644
--- a/be/src/vec/exec/ves_http_scan_node.cpp
+++ b/be/src/vec/exec/ves_http_scan_node.cpp
@@ -385,7 +385,7 @@ void VEsHttpScanNode::debug_string(int ident_level,
std::stringstream* out) cons
void VEsHttpScanNode::scanner_worker(int start_idx, int length,
std::promise<Status>& p_status) {
START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span,
"VEsHttpScanNode::scanner_worker");
SCOPED_ATTACH_TASK(_runtime_state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
// Clone expr context
std::vector<ExprContext*> scanner_expr_ctxs;
DCHECK(start_idx < length);
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index 2fbd9bb99d..19ab63ae37 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -279,7 +279,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
// scanner open pushdown to scanThread
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VOlapScanNode::transfer_thread");
SCOPED_ATTACH_TASK(state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
Status status = Status::OK();
if (_vconjunct_ctx_ptr) {
diff --git a/be/src/vec/exec/vschema_scan_node.cpp
b/be/src/vec/exec/vschema_scan_node.cpp
index cdc258fe99..dfb6811cbf 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -108,9 +108,9 @@ Status VSchemaScanNode::open(RuntimeState* state) {
}
SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(ExecNode::open(state));
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
if (_scanner_param.user) {
TSetSessionParams param;
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index 3c1e7f9924..ea2799fdea 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -398,7 +398,7 @@ TEST_F(TestDeltaWriter, open) {
SAFE_DELETE(delta_writer);
// test vec delta writer
- DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
+ DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true);
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
@@ -551,7 +551,7 @@ TEST_F(TestDeltaWriter, vec_write) {
WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002,
30002, load_id, tuple_desc,
&(tuple_desc->slots())};
DeltaWriter* delta_writer = nullptr;
- DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
+ DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true);
ASSERT_NE(delta_writer, nullptr);
MemPool pool;
@@ -764,7 +764,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
30003, load_id, tuple_desc,
&(tuple_desc->slots())};
DeltaWriter* delta_writer = nullptr;
- DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
+ DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true);
ASSERT_NE(delta_writer, nullptr);
MemPool pool;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]