This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new aaaaae5 [feature] (memory) Switch TLS mem tracker to separate more
detailed memory usage (#8605)
aaaaae5 is described below
commit aaaaae53b53ce6060188dfd2c19bc1784031b90c
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Mar 24 14:29:34 2022 +0800
[feature] (memory) Switch TLS mem tracker to separate more detailed memory
usage (#8605)
In pr #8476, all memory usage of a process is recorded in the process mem
tracker,
and all memory usage of a query is recorded in the query mem tracker,
and it is still necessary to manually call `transfer to` to track the
cached memory size.
We hope to separate out more detailed memory usage based on Hook TCMalloc
new/delete + TLS mem tracker.
In this pr, the more detailed mem tracker is switched to TLS, which
automatically and accurately
counts more detailed memory usage than before.
---
be/src/exec/cross_join_node.cpp | 5 +-
be/src/exec/except_node.cpp | 3 +-
be/src/exec/exec_node.cpp | 3 +
be/src/exec/hash_join_node.cpp | 5 +-
be/src/exec/intersect_node.cpp | 3 +-
be/src/exec/set_operation_node.cpp | 3 +-
be/src/olap/lru_cache.cpp | 16 ++++--
be/src/olap/tablet_manager.cpp | 15 ++---
be/src/runtime/bufferpool/buffer_allocator.cc | 17 ++++--
be/src/runtime/bufferpool/buffer_allocator.h | 3 +
be/src/runtime/memory/chunk_allocator.cpp | 5 ++
be/src/runtime/tcmalloc_hook.h | 2 +
be/src/runtime/thread_context.h | 81 +++++++++++++++++++++++----
be/src/runtime/thread_mem_tracker_mgr.cpp | 20 ++++---
be/src/runtime/thread_mem_tracker_mgr.h | 56 +++++++++++-------
be/src/service/doris_main.cpp | 16 ++++--
be/src/util/doris_metrics.cpp | 8 +++
be/src/util/doris_metrics.h | 4 ++
be/src/vec/exec/join/vhash_join_node.cpp | 5 +-
be/src/vec/exec/vaggregation_node.cpp | 8 ++-
be/src/vec/exec/vcross_join_node.cpp | 4 +-
be/src/vec/exec/vset_operation_node.cpp | 6 +-
22 files changed, 202 insertions(+), 86 deletions(-)
diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp
index 5dcdc10..5def58a 100644
--- a/be/src/exec/cross_join_node.cpp
+++ b/be/src/exec/cross_join_node.cpp
@@ -23,6 +23,7 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
@@ -52,6 +53,7 @@ Status CrossJoinNode::close(RuntimeState* state) {
Status CrossJoinNode::construct_build_side(RuntimeState* state) {
// Do a full scan of child(1) and store all build row batches.
RETURN_IF_ERROR(child(1)->open(state));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Cross join, while getting
next from child 1");
while (true) {
RowBatch* batch =
@@ -63,9 +65,6 @@ Status CrossJoinNode::construct_build_side(RuntimeState*
state) {
bool eos = false;
RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));
- // to prevent use too many memory
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Cross join, while getting
next from the child 1.");
-
SCOPED_TIMER(_build_timer);
_build_batches.add_row_batch(batch);
VLOG_ROW << build_list_debug_string();
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 8ae1701..ec3e451 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -21,6 +21,7 @@
#include "exprs/expr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
namespace doris {
ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
@@ -40,6 +41,7 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState*
state) {
Status ExceptNode::open(RuntimeState* state) {
RETURN_IF_ERROR(SetOperationNode::open(state));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Except Node, while probing
the hash table.");
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
@@ -62,7 +64,6 @@ Status ExceptNode::open(RuntimeState* state) {
while (!eos) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(),
&eos));
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Except , while probing
the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 4d8dd2a..e3891b6 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -57,6 +57,7 @@
#include "runtime/mem_tracker.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
@@ -208,6 +209,7 @@ Status ExecNode::prepare(RuntimeState* state) {
_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:" +
_runtime_profile->name(),
state->instance_mem_tracker(),
MemTrackerLevel::VERBOSE,
_runtime_profile.get());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_expr_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:Exprs:" +
_runtime_profile->name(),
_mem_tracker);
@@ -226,6 +228,7 @@ Status ExecNode::prepare(RuntimeState* state) {
}
Status ExecNode::open(RuntimeState* state) {
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
if (_vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index c92bb5a..30cd844 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -186,6 +186,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState*
state) {
// The hash join node needs to keep in memory all build tuples, including
the tuple
// row ptrs. The row ptrs are copied into the hash table's internal
structure so they
// don't need to be stored in the _build_pool.
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while
constructing the hash table.");
RowBatch build_batch(child(1)->row_desc(), state->batch_size());
RETURN_IF_ERROR(child(1)->open(state));
@@ -303,7 +304,7 @@ Status HashJoinNode::get_next(RuntimeState* state,
RowBatch* out_batch, bool* eo
// In most cases, no additional memory overhead will be applied for at
this stage,
// but if the expression calculation in this node needs to apply for
additional memory,
// it may cause the memory to exceed the limit.
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while execute
get_next.");
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while execute
get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
if (reached_limit()) {
@@ -771,11 +772,9 @@ Status HashJoinNode::process_build_batch(RuntimeState*
state, RowBatch* build_ba
_build_pool.get(), false);
}
}
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while
constructing the hash table.");
} else {
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch->tuple_data_pool(), false);
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while
constructing the hash table.");
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows()));
for (int i = 0; i < build_batch->num_rows(); ++i) {
_hash_tbl->insert_without_check(build_batch->get_row(i));
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index 2d8d2ee..df0e30d 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -21,6 +21,7 @@
#include "exprs/expr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
namespace doris {
IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
@@ -44,6 +45,7 @@ Status IntersectNode::init(const TPlanNode& tnode,
RuntimeState* state) {
// repeat [2] this for all the rest child
Status IntersectNode::open(RuntimeState* state) {
RETURN_IF_ERROR(SetOperationNode::open(state));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Intersect Node, while
probing the hash table.");
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
@@ -66,7 +68,6 @@ Status IntersectNode::open(RuntimeState* state) {
while (!eos) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(),
&eos));
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Intersect , while
probing the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
diff --git a/be/src/exec/set_operation_node.cpp
b/be/src/exec/set_operation_node.cpp
index 7faa561..5574d3b 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -23,6 +23,7 @@
#include "runtime/raw_value.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
namespace doris {
SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -137,6 +138,7 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow*
other) {
Status SetOperationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("SetOperation, while
constructing the hash table.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
// open result expr lists.
@@ -156,7 +158,6 @@ Status SetOperationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " SetOperation, while
constructing the hash table.");
// build hash table and remove duplicate items
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch.num_rows()));
for (int i = 0; i < build_batch.num_rows(); ++i) {
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 825f029..e494c54 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -364,13 +364,7 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash,
MemTracker* tracker) {
}
// free handle out of mutex, when last_ref is true, e must not be nullptr
if (last_ref) {
- size_t charge = e->charge;
e->free();
- // The parameter tracker is ShardedLRUCache::_mem_tracker,
- // because the memory released by LRUHandle is recorded in the tls mem
tracker,
- // so this part of the memory is subsidized from
ShardedLRUCache::_mem_tracker to the tls mem tracker
-
tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
- charge);
}
}
@@ -449,11 +443,15 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name,
size_t total_capacity,
: _name(name),
_last_id(1),
_mem_tracker(MemTracker::create_tracker(-1, name, nullptr,
MemTrackerLevel::OVERVIEW)) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
_shards[s] = new LRUCache(type);
_shards[s]->set_capacity(per_shard);
}
+ // After the lru cache is created in the main thread, the main thread will
not switch to the
+ // lru cache mem tracker again, so manually clear the untracked mem in tls.
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
_entity = DorisMetrics::instance()->metric_registry()->register_entity(
std::string("lru_cache:") + name, {{"name", name}});
@@ -467,6 +465,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name,
size_t total_capacity,
}
ShardedLRUCache::~ShardedLRUCache() {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
for (int s = 0; s < kNumShards; s++) {
delete _shards[s];
}
@@ -481,6 +480,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key,
void* value, size_t
// transfer the memory ownership of the value to
ShardedLRUCache::_mem_tracker.
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
charge);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter,
priority);
}
@@ -491,11 +491,13 @@ Cache::Handle* ShardedLRUCache::lookup(const CacheKey&
key) {
}
void ShardedLRUCache::release(Handle* handle) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
_shards[_shard(h->hash)]->release(handle);
}
void ShardedLRUCache::erase(const CacheKey& key) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
_shards[_shard(hash)]->erase(key, hash, _mem_tracker.get());
}
@@ -514,6 +516,7 @@ uint64_t ShardedLRUCache::new_id() {
}
int64_t ShardedLRUCache::prune() {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune();
@@ -522,6 +525,7 @@ int64_t ShardedLRUCache::prune() {
}
int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune_if(pred);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index dd51b87..ed1a711 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -192,11 +192,6 @@ OLAPStatus
TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map[tablet_id] = tablet;
_add_tablet_to_partition(tablet);
- // TODO: remove multiply 2 of tablet meta mem size
- // Because table schema will copy in tablet, there will be double mem cost
- // so here multiply 2
-
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(
- _mem_tracker.get(), tablet->tablet_meta()->mem_size() * 2);
VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" <<
tablet_id ;
@@ -215,6 +210,7 @@ bool
TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) {
OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
std::vector<DataDir*> stores) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
DorisMetrics::instance()->create_tablet_requests_total->increment(1);
int64_t tablet_id = request.tablet_id;
@@ -432,6 +428,7 @@ TabletSharedPtr
TabletManager::_create_tablet_meta_and_dir_unlocked(
OLAPStatus TabletManager::drop_tablet(TTabletId tablet_id, SchemaHash
schema_hash,
bool keep_files) {
WriteLock wrlock(_get_tablets_shard_lock(tablet_id));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
return _drop_tablet_unlocked(tablet_id, keep_files);
}
@@ -460,6 +457,7 @@ OLAPStatus TabletManager::_drop_tablet_unlocked(TTabletId
tablet_id, bool keep_f
OLAPStatus TabletManager::drop_tablets_on_error_root_path(
const std::vector<TabletInfo>& tablet_info_vec) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
OLAPStatus res = OLAP_SUCCESS;
if (tablet_info_vec.empty()) { // This is a high probability event
return res;
@@ -670,6 +668,7 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir*
data_dir, TTabletId tab
TSchemaHash schema_hash, const
string& meta_binary,
bool update_meta, bool force,
bool restore,
bool check_path) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
TabletMetaSharedPtr tablet_meta(new TabletMeta());
OLAPStatus status = tablet_meta->deserialize(meta_binary);
if (status != OLAP_SUCCESS) {
@@ -752,6 +751,7 @@ OLAPStatus TabletManager::load_tablet_from_dir(DataDir*
store, TTabletId tablet_
SchemaHash schema_hash,
const string& schema_hash_path,
bool force,
bool restore) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
LOG(INFO) << "begin to load tablet from dir. "
<< " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
<< " path = " << schema_hash_path << " force = " << force << "
restore = " << restore;
@@ -1219,11 +1219,6 @@ OLAPStatus
TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bo
}
dropped_tablet->deregister_tablet_from_dir();
- // The dropped tablet meta is expected to be released in the TabletManager
mem tracker,
- // but is actually released in the tls mem tracker.
- // So from TabletManager mem tracker compensate memory to tls tracker.
-
_mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
- dropped_tablet->tablet_meta()->mem_size() * 2);
return OLAP_SUCCESS;
}
diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc
b/be/src/runtime/bufferpool/buffer_allocator.cc
index a3bbe4c..82742a6 100644
--- a/be/src/runtime/bufferpool/buffer_allocator.cc
+++ b/be/src/runtime/bufferpool/buffer_allocator.cc
@@ -26,6 +26,7 @@
#include "util/cpu_info.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
+#include "runtime/thread_context.h"
//DECLARE_bool(disable_mem_pools);
@@ -48,7 +49,7 @@ public:
/// Add a free buffer to the free lists. May free buffers to the system
allocator
/// if the list becomes full. Caller should not hold 'lock_'
- void AddFreeBuffer(BufferHandle&& buffer);
+ bool AddFreeBuffer(BufferHandle&& buffer);
/// Try to get a free buffer of 'buffer_len' bytes from this arena.
Returns true and
/// sets 'buffer' if found or false if not found. Caller should not hold
'lock_'.
@@ -193,7 +194,8 @@ BufferPool::BufferAllocator::BufferAllocator(BufferPool*
pool, int64_t min_buffe
clean_page_bytes_limit_(clean_page_bytes_limit),
clean_page_bytes_remaining_(clean_page_bytes_limit),
per_core_arenas_(CpuInfo::get_max_num_cores()),
- max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS) {
+ max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS),
+ _mem_tracker(MemTracker::create_virtual_tracker(-1,
"BufferAllocator", nullptr, MemTrackerLevel::OVERVIEW)) {
DCHECK(BitUtil::IsPowerOf2(min_buffer_len_)) << min_buffer_len_;
DCHECK(BitUtil::IsPowerOf2(max_buffer_len_)) << max_buffer_len_;
DCHECK_LE(0, min_buffer_len_);
@@ -303,6 +305,7 @@ Status
BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
system_bytes_remaining_.add(len);
return status;
}
+ _mem_tracker->consume_cache(len);
return Status::OK();
}
@@ -375,7 +378,9 @@ void BufferPool::BufferAllocator::Free(BufferHandle&&
handle) {
handle.client_ = nullptr; // Buffer is no longer associated with a client.
FreeBufferArena* arena = per_core_arenas_[handle.home_core_].get();
handle.Poison();
- arena->AddFreeBuffer(std::move(handle));
+ if (!arena->AddFreeBuffer(std::move(handle))) {
+ _mem_tracker->release_cache(handle.len());
+ }
}
void BufferPool::BufferAllocator::AddCleanPage(const
std::unique_lock<std::mutex>& client_lock,
@@ -426,6 +431,7 @@ int64_t
BufferPool::BufferAllocator::FreeToSystem(std::vector<BufferHandle>&& bu
buffer.Unpoison();
system_allocator_->Free(std::move(buffer));
}
+ _mem_tracker->release_cache(bytes_freed);
return bytes_freed;
}
@@ -485,16 +491,17 @@ BufferPool::FreeBufferArena::~FreeBufferArena() {
}
}
-void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
+bool BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
std::lock_guard<SpinLock> al(lock_);
if (config::disable_mem_pools) {
int64_t len = buffer.len();
parent_->system_allocator_->Free(std::move(buffer));
parent_->system_bytes_remaining_.add(len);
- return;
+ return false;
}
PerSizeLists* lists = GetListsForSize(buffer.len());
lists->AddFreeBuffer(std::move(buffer));
+ return true;
}
bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page*
page) {
diff --git a/be/src/runtime/bufferpool/buffer_allocator.h
b/be/src/runtime/bufferpool/buffer_allocator.h
index f2ab5e6..19b1c6c 100644
--- a/be/src/runtime/bufferpool/buffer_allocator.h
+++ b/be/src/runtime/bufferpool/buffer_allocator.h
@@ -21,6 +21,7 @@
#include "runtime/bufferpool/buffer_pool_internal.h"
#include "runtime/bufferpool/free_list.h"
#include "util/aligned_new.h"
+#include "runtime/mem_tracker.h"
namespace doris {
@@ -235,6 +236,8 @@ private:
/// all arenas so may fail. The final attempt locks all arenas, which is
expensive
/// but is guaranteed to succeed.
int max_scavenge_attempts_;
+
+ std::shared_ptr<MemTracker> _mem_tracker;
};
} // namespace doris
diff --git a/be/src/runtime/memory/chunk_allocator.cpp
b/be/src/runtime/memory/chunk_allocator.cpp
index 6f1306c..9aec083 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -99,6 +99,7 @@ public:
// Poison this chunk to make asan can detect invalid access
ASAN_POISON_MEMORY_REGION(ptr, size);
std::lock_guard<SpinLock> l(_lock);
+ // TODO(zxy) The memory of vector resize is not recorded in chunk
allocator mem tracker
_chunk_lists[idx].push_back(ptr);
}
@@ -118,9 +119,13 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit)
_arenas(CpuInfo::get_max_num_cores()) {
_mem_tracker =
MemTracker::create_tracker(-1, "ChunkAllocator", nullptr,
MemTrackerLevel::OVERVIEW);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
for (int i = 0; i < _arenas.size(); ++i) {
_arenas[i].reset(new ChunkArena());
}
+ // After the ChunkAllocator is created in the main thread, the main thread
will not switch to the
+ // chunk allocator mem tracker again, so manually clear the untracked mem
in tls.
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
_chunk_allocator_metric_entity =
DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator");
diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h
index 4e3fbb8..9ba55fd 100644
--- a/be/src/runtime/tcmalloc_hook.h
+++ b/be/src/runtime/tcmalloc_hook.h
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#pragma once
+
#include <gperftools/malloc_hook.h>
#include <gperftools/nallocx.h>
#include <gperftools/tcmalloc.h>
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 1718e8d..7c3beaf 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -25,14 +25,29 @@
#include "runtime/runtime_state.h"
#include "runtime/thread_mem_tracker_mgr.h"
#include "runtime/threadlocal.h"
+#include "util/doris_metrics.h"
// Attach to task when thread starts
#define SCOPED_ATTACH_TASK_THREAD(type, ...) \
auto VARNAME_LINENUM(attach_task_thread) = AttachTaskThread(type,
##__VA_ARGS__)
+// Be careful to stop the thread mem tracker, because the actual order of
malloc and free memory
+// may be different from the order of execution of instructions, which will
cause the position of
+// the memory track to be unexpected.
#define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \
auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(true)
#define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \
auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(false)
+// Switch thread mem tracker during task execution.
+// After the non-query thread switches the mem tracker, if the thread will not
switch the mem
+// tracker again in the short term, can consider manually clear_untracked_mems.
+// The query thread will automatically clear_untracked_mems when detach_task.
+#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
+ auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker,
false)
+#define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
+ auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker,
true);
+#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \
+ auto VARNAME_LINENUM(witch_tracker_cb) = \
+ SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__)
namespace doris {
@@ -72,7 +87,7 @@ public:
_type = type;
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
- _thread_mem_tracker_mgr->attach_task(task_type_string(_type), task_id,
fragment_instance_id,
+ _thread_mem_tracker_mgr->attach_task(TaskTypeStr[_type], task_id,
fragment_instance_id,
mem_tracker);
}
@@ -88,10 +103,6 @@ public:
const std::string& thread_id_str() const { return _thread_id_str; }
const TUniqueId& fragment_instance_id() const { return
_fragment_instance_id; }
- inline static const std::string task_type_string(ThreadContext::TaskType
type) {
- return TaskTypeStr[type];
- }
-
void consume_mem(int64_t size) {
if (start_thread_mem_tracker) {
_thread_mem_tracker_mgr->cache_consume(size);
@@ -166,13 +177,13 @@ public:
explicit AttachTaskThread(const ThreadContext::TaskType& type,
const std::shared_ptr<MemTracker>& mem_tracker) {
- DCHECK(mem_tracker != nullptr);
+ DCHECK(mem_tracker);
thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker);
}
explicit AttachTaskThread(const TQueryType::type& query_type,
const std::shared_ptr<MemTracker>& mem_tracker) {
- DCHECK(mem_tracker != nullptr);
+ DCHECK(mem_tracker);
thread_local_ctx.get()->attach(query_to_task_type(query_type), "",
TUniqueId(),
mem_tracker);
}
@@ -182,7 +193,7 @@ public:
const std::shared_ptr<MemTracker>& mem_tracker) {
DCHECK(task_id != "");
DCHECK(fragment_instance_id != TUniqueId());
- DCHECK(mem_tracker != nullptr);
+ DCHECK(mem_tracker);
thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id,
fragment_instance_id, mem_tracker);
}
@@ -192,7 +203,7 @@ public:
#ifndef BE_TEST
DCHECK(print_id(runtime_state->query_id()) != "");
DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
- DCHECK(mem_tracker != nullptr);
+ DCHECK(mem_tracker);
thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()),
print_id(runtime_state->query_id()),
runtime_state->fragment_instance_id(),
mem_tracker);
@@ -211,7 +222,12 @@ public:
}
}
- ~AttachTaskThread() { thread_local_ctx.get()->detach(); }
+ ~AttachTaskThread() {
+#ifndef BE_TEST
+ thread_local_ctx.get()->detach();
+ DorisMetrics::instance()->attach_task_thread_count->increment(1);
+#endif
+ }
};
class StopThreadMemTracker {
@@ -228,4 +244,49 @@ private:
bool _scope;
};
+class SwitchThreadMemTracker {
+public:
+ explicit SwitchThreadMemTracker(const std::shared_ptr<MemTracker>&
mem_tracker,
+ bool in_task = true) {
+#ifndef BE_TEST
+ DCHECK(mem_tracker);
+ // The thread tracker must be switched after the attach task,
otherwise switching
+ // in the main thread will cause the cached tracker not be cleaned up
in time.
+ DCHECK(in_task == false ||
+
thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task());
+ _old_tracker_id =
+
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker(mem_tracker);
+#endif
+ }
+
+ ~SwitchThreadMemTracker() {
+#ifndef BE_TEST
+
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
+
DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
+#endif
+ }
+
+private:
+ std::string _old_tracker_id;
+};
+
+class SwitchThreadMemTrackerErrCallBack {
+public:
+ explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type,
+ bool cancel_work = true,
+ ERRCALLBACK err_call_back_func
= nullptr) {
+ DCHECK(action_type != std::string());
+ _old_tracker_cb =
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(
+ action_type, cancel_work, err_call_back_func);
+ }
+
+ ~SwitchThreadMemTrackerErrCallBack() {
+
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb);
+
DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1);
+ }
+
+private:
+ ConsumeErrCallBackInfo _old_tracker_cb;
+};
+
} // namespace doris
diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp
b/be/src/runtime/thread_mem_tracker_mgr.cpp
index 12b64d4..fd06c17 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/thread_mem_tracker_mgr.cpp
@@ -22,19 +22,21 @@
namespace doris {
-void ThreadMemTrackerMgr::attach_task(const std::string& action_type, const
std::string& task_id,
+void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const
std::string& task_id,
const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTracker>&
mem_tracker) {
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
- _consume_err_call_back.update(action_type, true, nullptr);
+ _consume_err_cb.cancel_msg = cancel_msg;
if (mem_tracker == nullptr) {
#ifdef BE_TEST
if (ExecEnv::GetInstance()->task_pool_mem_tracker_registry() ==
nullptr) {
return;
}
#endif
- _temp_task_mem_tracker =
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(task_id);
+ _temp_task_mem_tracker =
+
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(
+ task_id);
update_tracker(_temp_task_mem_tracker);
} else {
update_tracker(mem_tracker);
@@ -44,7 +46,7 @@ void ThreadMemTrackerMgr::attach_task(const std::string&
action_type, const std:
void ThreadMemTrackerMgr::detach_task() {
_task_id = "";
_fragment_instance_id = TUniqueId();
- _consume_err_call_back.init();
+ _consume_err_cb.init();
clear_untracked_mems();
_tracker_id = "process";
// The following memory changes for the two map operations of
_untracked_mems and _mem_trackers
@@ -70,12 +72,12 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const
std::string& cancel_details
void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status st) {
auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded(
- nullptr, "In TCMalloc Hook, " +
_consume_err_call_back.action_type, mem_usage, st);
- if (_consume_err_call_back.call_back_func != nullptr) {
- _consume_err_call_back.call_back_func();
+ nullptr, "In TCMalloc Hook, " + _consume_err_cb.cancel_msg,
mem_usage, st);
+ if (_consume_err_cb.cb_func != nullptr) {
+ _consume_err_cb.cb_func();
}
- if (_task_id != "") {
- if (_consume_err_call_back.cancel_task == true) {
+ if (is_attach_task()) {
+ if (_consume_err_cb.cancel_task == true) {
exceeded_cancel_task(rst.to_string());
} else {
// TODO(zxy) Need other processing, or log (not too often).
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h
b/be/src/runtime/thread_mem_tracker_mgr.h
index 7401910..2b581de 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -28,27 +28,19 @@ namespace doris {
typedef void (*ERRCALLBACK)();
struct ConsumeErrCallBackInfo {
- std::string action_type;
+ std::string cancel_msg;
bool cancel_task; // Whether to cancel the task when the current tracker
exceeds the limit
- ERRCALLBACK call_back_func;
+ ERRCALLBACK cb_func;
- ConsumeErrCallBackInfo() {
- init();
- }
-
- ConsumeErrCallBackInfo(std::string action_type, bool cancel_task,
ERRCALLBACK call_back_func)
- : action_type(action_type), cancel_task(cancel_task),
call_back_func(call_back_func) {}
+ ConsumeErrCallBackInfo() { init(); }
- void update(std::string new_action_type, bool new_cancel_task, ERRCALLBACK
new_call_back_func) {
- action_type = new_action_type;
- cancel_task = new_cancel_task;
- call_back_func = new_call_back_func;
- }
+ ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task,
ERRCALLBACK cb_func)
+ : cancel_msg(cancel_msg), cancel_task(cancel_task),
cb_func(cb_func) {}
void init() {
- action_type = "";
+ cancel_msg = "";
cancel_task = false;
- call_back_func = nullptr;
+ cb_func = nullptr;
}
};
@@ -80,7 +72,7 @@ public:
}
void clear_untracked_mems() {
- for(auto untracked_mem : _untracked_mems) {
+ for (const auto& untracked_mem : _untracked_mems) {
if (untracked_mem.second != 0) {
DCHECK(_mem_trackers[untracked_mem.first]);
_mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
@@ -91,7 +83,7 @@ public:
}
// After attach, the current thread TCMalloc Hook starts to
consume/release task mem_tracker
- void attach_task(const std::string& action_type, const std::string&
task_id,
+ void attach_task(const std::string& cancel_msg, const std::string& task_id,
const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTracker>& mem_tracker);
@@ -101,6 +93,27 @@ public:
// Thread update_tracker may be called very frequently, adding a memory
copy will be slow.
std::string update_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
+ void update_tracker_id(const std::string& tracker_id) {
+ if (tracker_id != _tracker_id) {
+ _untracked_mems[_tracker_id] += _untracked_mem;
+ _untracked_mem = 0;
+ _tracker_id = tracker_id;
+ }
+ }
+
+ inline ConsumeErrCallBackInfo update_consume_err_cb(const std::string&
cancel_msg,
+ bool cancel_task,
ERRCALLBACK cb_func) {
+ _temp_consume_err_cb = _consume_err_cb;
+ _consume_err_cb.cancel_msg = cancel_msg;
+ _consume_err_cb.cancel_task = cancel_task;
+ _consume_err_cb.cb_func = cb_func;
+ return _temp_consume_err_cb;
+ }
+
+ inline void update_consume_err_cb(const ConsumeErrCallBackInfo&
consume_err_cb) {
+ _consume_err_cb = consume_err_cb;
+ }
+
// Note that, If call the memory allocation operation in TCMalloc
new/delete Hook,
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
// must increase the control to avoid entering infinite recursion,
otherwise it may cause crash or stuck,
@@ -108,6 +121,8 @@ public:
void noncache_consume();
+ bool is_attach_task() { return _task_id != ""; }
+
std::shared_ptr<MemTracker> mem_tracker() {
DCHECK(_mem_trackers[_tracker_id]);
return _mem_trackers[_tracker_id];
@@ -137,15 +152,16 @@ private:
// Avoid memory allocation in functions and fall into an infinite loop
std::string _temp_tracker_id;
- ConsumeErrCallBackInfo _temp_consume_err_call_back;
+ ConsumeErrCallBackInfo _temp_consume_err_cb;
std::shared_ptr<MemTracker> _temp_task_mem_tracker;
std::string _task_id;
TUniqueId _fragment_instance_id;
- ConsumeErrCallBackInfo _consume_err_call_back;
+ ConsumeErrCallBackInfo _consume_err_cb;
};
-inline std::string ThreadMemTrackerMgr::update_tracker(const
std::shared_ptr<MemTracker>& mem_tracker) {
+inline std::string ThreadMemTrackerMgr::update_tracker(
+ const std::shared_ptr<MemTracker>& mem_tracker) {
DCHECK(mem_tracker);
_temp_tracker_id = mem_tracker->id();
if (_temp_tracker_id == _tracker_id) {
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 2b06779..df15f9d 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -53,7 +53,6 @@
#include "runtime/exec_env.h"
#include "runtime/heartbeat_flags.h"
#include "runtime/minidump.h"
-#include "runtime/tcmalloc_hook.h"
#include "service/backend_options.h"
#include "service/backend_service.h"
#include "service/brpc_service.h"
@@ -65,6 +64,11 @@
#include "util/thrift_server.h"
#include "util/uid_util.h"
+#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) &&
!defined(LEAK_SANITIZER) && \
+ !defined(THREAD_SANITIZER)
+#include "runtime/tcmalloc_hook.h"
+#endif
+
static void help(const char*);
#include <dlfcn.h>
@@ -336,11 +340,8 @@ int main(int argc, char** argv) {
return -1;
}
- if (doris::config::track_new_delete) {
- init_hook();
- }
-
-#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) &&
!defined(THREAD_SANITIZER)
+#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) &&
!defined(LEAK_SANITIZER) && \
+ !defined(THREAD_SANITIZER)
// Aggressive decommit is required so that unused pages in the TCMalloc
page heap are
// not backed by physical pages and do not contribute towards memory
consumption.
MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit",
1);
@@ -351,6 +352,9 @@ int main(int argc, char** argv) {
fprintf(stderr, "Failed to change TCMalloc total thread cache
size.\n");
return -1;
}
+ if (doris::config::track_new_delete) {
+ init_hook();
+ }
#endif
if (!doris::Env::init()) {
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 0c93390..a1d781a 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -132,6 +132,10 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes,
MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us,
MetricUnit::MICROSECONDS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count,
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_count,
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_err_cb_count,
MetricUnit::NOUNIT);
+
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES);
DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num,
MetricUnit::NOUNIT);
DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_fd_num_used,
MetricUnit::NOUNIT);
@@ -275,6 +279,10 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
attach_task_thread_count);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
switch_thread_mem_tracker_count);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
switch_thread_mem_tracker_err_cb_count);
+
_server_metric_entity->register_hook(_s_hook_name,
std::bind(&DorisMetrics::_update, this));
INT_UGAUGE_METRIC_REGISTER(_server_metric_entity,
query_cache_memory_total_byte);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index e99f598..ca5d05c 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -125,6 +125,10 @@ public:
IntCounter* memtable_flush_total;
IntCounter* memtable_flush_duration_us;
+ IntCounter* attach_task_thread_count;
+ IntCounter* switch_thread_mem_tracker_count;
+ IntCounter* switch_thread_mem_tracker_err_cb_count;
+
IntGauge* memory_pool_bytes_total;
IntGauge* process_thread_num;
IntGauge* process_fd_num_used;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index aea3ff7..9f2e9fb 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -20,6 +20,7 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_filter_mgr.h"
+#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "vec/core/materialize_block.h"
#include "vec/exprs/vexpr.h"
@@ -921,6 +922,7 @@ Status HashJoinNode::open(RuntimeState* state) {
Status HashJoinNode::_hash_table_build(RuntimeState* state) {
RETURN_IF_ERROR(child(1)->open(state));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while
constructing the hash table.");
SCOPED_TIMER(_build_timer);
MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
@@ -936,7 +938,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state)
{
RETURN_IF_ERROR(child(1)->get_next(state, &block, &eos));
_hash_table_mem_tracker->consume(block.allocated_bytes());
_mem_used += block.allocated_bytes();
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while getting
next from the child 1.");
if (block.rows() != 0) { mutable_block.merge(block); }
@@ -947,7 +948,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state)
{
// TODO:: Rethink may we should do the proess after we recevie all
build blocks ?
// which is better.
RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index],
index));
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while
constructing the hash table.");
mutable_block = MutableBlock();
++index;
@@ -957,7 +957,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state)
{
_build_blocks.emplace_back(mutable_block.to_block());
RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index));
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing
the hash table.");
return std::visit(
[&](auto&& arg) -> Status {
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 2f8cf06..0788303 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -22,6 +22,7 @@
#include "exec/exec_node.h"
#include "runtime/mem_pool.h"
#include "runtime/row_batch.h"
+#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_nullable.h"
@@ -332,6 +333,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
Status AggregationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute
open.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
@@ -356,7 +358,6 @@ Status AggregationNode::open(RuntimeState* state) {
}
RETURN_IF_ERROR(_executor.execute(&block));
_executor.update_memusage();
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "aggregator, while execute
open.");
}
return Status::OK();
@@ -366,7 +367,9 @@ Status AggregationNode::get_next(RuntimeState* state,
RowBatch* row_batch, bool*
return Status::NotSupported("Not Implemented Aggregation Node::get_next
scalar");
}
-Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos)
{ SCOPED_TIMER(_runtime_profile->total_time_counter());
+Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos)
{
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute
get_next.");
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
if (_is_streaming_preagg) {
bool child_eos = false;
@@ -395,7 +398,6 @@ Status AggregationNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
}
_executor.update_memusage();
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "aggregator, while execute
get_next.");
return Status::OK();
}
diff --git a/be/src/vec/exec/vcross_join_node.cpp
b/be/src/vec/exec/vcross_join_node.cpp
index 69b45dc..d2fc21d 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -23,6 +23,7 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
namespace doris::vectorized {
@@ -53,6 +54,7 @@ Status VCrossJoinNode::close(RuntimeState* state) {
Status VCrossJoinNode::construct_build_side(RuntimeState* state) {
// Do a full scan of child(1) and store all build row batches.
RETURN_IF_ERROR(child(1)->open(state));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Vec Cross join, while
getting next from the child 1");
bool eos = false;
while (true) {
@@ -70,8 +72,6 @@ Status VCrossJoinNode::construct_build_side(RuntimeState*
state) {
_build_blocks.emplace_back(std::move(block));
_block_mem_tracker->consume(mem_usage);
}
- // to prevent use too many memory
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Cross join, while getting
next from the child 1.");
if (eos) {
break;
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 0985599..bd7856f 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -17,6 +17,7 @@
#include "vec/exec/vset_operation_node.h"
+#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "vec/exprs/vexpr.h"
namespace doris {
@@ -228,6 +229,8 @@ void VSetOperationNode::hash_table_init() {
//build a hash table from child(0)
Status VSetOperationNode::hash_table_build(RuntimeState* state) {
RETURN_IF_ERROR(child(0)->open(state));
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(
+ "Vec Set Operation Node, while constructing the hash table");
Block block;
MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
@@ -244,7 +247,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState*
state) {
_hash_table_mem_tracker->consume(allocated_bytes);
_mem_used += allocated_bytes;
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node, while
getting next from the child 0.");
if (block.rows() != 0) { mutable_block.merge(block); }
// make one block for each 4 gigabytes
@@ -254,7 +256,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState*
state) {
// TODO:: Rethink may we should do the proess after we recevie all
build blocks ?
// which is better.
RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node,
while constructing the hash table.");
mutable_block = MutableBlock();
++index;
last_mem_used = _mem_used;
@@ -263,7 +264,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState*
state) {
_build_blocks.emplace_back(mutable_block.to_block());
RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
- RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node, while
constructing the hash table.");
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]