This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ad31b6c902 [bugfix and improvement]fix mem tracker for load and simplify some macros (#11125) ad31b6c902 is described below commit ad31b6c902a3704bda5f42e4cec80ab085e094c0 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Fri Jul 22 21:59:36 2022 +0800 [bugfix and improvement]fix mem tracker for load and simplify some macros (#11125) --- be/src/exec/analytic_eval_node.cpp | 3 +-- be/src/exec/es/es_scroll_parser.cpp | 8 ++------ be/src/exec/partitioned_aggregation_node.cc | 6 ++---- be/src/exec/partitioned_hash_table.cc | 3 +-- be/src/exprs/anyval_util.cpp | 3 +-- be/src/exprs/expr_context.cpp | 5 ++--- be/src/runtime/load_channel.cpp | 4 ++-- be/src/runtime/load_channel.h | 2 +- be/src/runtime/memory/mem_tracker.h | 3 ++- be/src/runtime/memory/mem_tracker_limiter.h | 9 ++++++--- be/src/runtime/memory/mem_tracker_task_pool.cpp | 6 +++--- be/src/runtime/runtime_state.cpp | 2 +- regression-test/conf/regression-conf.groovy | 4 ++-- 13 files changed, 26 insertions(+), 32 deletions(-) diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index 08d2dc2c35..f0e7f01989 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -202,8 +202,7 @@ Status AnalyticEvalNode::open(RuntimeState* state) { "Failed to acquire initial read buffer for analytic function " "evaluation. Reducing query concurrency or increasing the memory limit may " "help this query to complete successfully."); - RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), - state, msg); + RETURN_LIMIT_EXCEEDED(state, msg); } DCHECK_EQ(_evaluators.size(), _fn_ctxs.size()); diff --git a/be/src/exec/es/es_scroll_parser.cpp b/be/src/exec/es/es_scroll_parser.cpp index 8dd3a2c9e1..a95af7bb51 100644 --- a/be/src/exec/es/es_scroll_parser.cpp +++ b/be/src/exec/es/es_scroll_parser.cpp @@ -353,9 +353,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple, if (UNLIKELY(buffer == nullptr)) { std::string details = strings::Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", len, "string slot"); - RETURN_LIMIT_EXCEEDED( - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), nullptr, - details, len, rst); + RETURN_LIMIT_EXCEEDED(nullptr, details, len, rst); } memcpy(buffer, _id.data(), len); reinterpret_cast<StringValue*>(slot)->ptr = buffer; @@ -415,9 +413,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple, if (UNLIKELY(buffer == nullptr)) { std::string details = strings::Substitute( ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot"); - RETURN_LIMIT_EXCEEDED( - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), nullptr, - details, val_size, rst); + RETURN_LIMIT_EXCEEDED(nullptr, details, val_size, rst); } memcpy(buffer, val.data(), val_size); reinterpret_cast<StringValue*>(slot)->ptr = buffer; diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index 2811375395..ad5f6788d9 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -411,8 +411,7 @@ Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_des "Cannot perform aggregation at node with id $0." " Failed to allocate $1 output bytes.", _id, sv->len); - RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), - state_, details, sv->len, rst); + RETURN_LIMIT_EXCEEDED(state_, details, sv->len, rst); } memcpy(new_ptr, sv->ptr, sv->len); sv->ptr = new_ptr; @@ -851,8 +850,7 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) { // TODO(ml): enable spill std::stringstream msg; msg << "New partitioned Aggregation in spill"; - RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), - parent->state_, msg.str()); + RETURN_LIMIT_EXCEEDED(parent->state_, msg.str()); RETURN_IF_ERROR(SerializeStreamForSpilling()); diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc index 4f7904cb56..cbcc85070c 100644 --- a/be/src/exec/partitioned_hash_table.cc +++ b/be/src/exec/partitioned_hash_table.cc @@ -313,8 +313,7 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, capacity_ = 0; string details = Substitute( "PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes", mem_usage); - RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), - state, details, mem_usage, st); + RETURN_LIMIT_EXCEEDED(state, details, mem_usage, st); } int expr_values_size = expr_values_bytes_per_row_ * capacity_; diff --git a/be/src/exprs/anyval_util.cpp b/be/src/exprs/anyval_util.cpp index 754f1f4dbd..a53031fc12 100644 --- a/be/src/exprs/anyval_util.cpp +++ b/be/src/exprs/anyval_util.cpp @@ -47,8 +47,7 @@ Status allocate_any_val(RuntimeState* state, MemPool* pool, const TypeDescriptor *result = reinterpret_cast<AnyVal*>( pool->try_allocate_aligned(anyval_size, anyval_alignment, &rst)); if (*result == nullptr) { - RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), - state, mem_limit_exceeded_msg, anyval_size, rst); + RETURN_LIMIT_EXCEEDED(state, mem_limit_exceeded_msg, anyval_size, rst); } memset(static_cast<void*>(*result), 0, anyval_size); return Status::OK(); diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp index 7076f1372f..8cd8fe7c1c 100644 --- a/be/src/exprs/expr_context.cpp +++ b/be/src/exprs/expr_context.cpp @@ -401,9 +401,8 @@ Status ExprContext::get_const_value(RuntimeState* state, Expr& expr, AnyVal** co Status rst; char* ptr_copy = reinterpret_cast<char*>(_pool->try_allocate(sv->len, &rst)); if (ptr_copy == nullptr) { - RETURN_LIMIT_EXCEEDED( - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), state, - "Could not allocate constant string value", sv->len, rst); + RETURN_LIMIT_EXCEEDED(state, "Could not allocate constant string value", sv->len, + rst); } memcpy(ptr_copy, sv->ptr, sv->len); sv->ptr = reinterpret_cast<uint8_t*>(ptr_copy); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 727206e4f0..9a9d1c808f 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -50,7 +50,7 @@ LoadChannel::~LoadChannel() { } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); + // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); int64_t index_id = params.index_id(); std::shared_ptr<TabletsChannel> channel; { @@ -137,7 +137,7 @@ bool LoadChannel::is_finished() { Status LoadChannel::cancel() { std::lock_guard<std::mutex> l(_lock); - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); + // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); for (auto& it : _tablets_channels) { it.second->cancel(); } diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 299272ae96..4137c7fafc 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -129,7 +129,7 @@ private: template <typename TabletWriterAddRequest, typename TabletWriterAddResult> Status LoadChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); + // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); int64_t index_id = request.index_id(); // 1. get tablets channel std::shared_ptr<TabletsChannel> channel; diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 9f00c23499..4e4af1723d 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -73,8 +73,9 @@ public: public: bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < consumption(); } + // Return true, no exceeded limit bool check_limit(int64_t limit, int64_t bytes) const { - return limit >= 0 && limit < consumption() + bytes; + return limit >= 0 && limit > consumption() + bytes; } // Usually, a negative values means that the statistics are not accurate, diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 7dcfd80dc7..1f3fb89800 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -270,8 +270,11 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { return Status::OK(); } -#define RETURN_LIMIT_EXCEEDED(tracker, ...) return tracker->mem_limit_exceeded(__VA_ARGS__); -#define RETURN_IF_LIMIT_EXCEEDED(tracker, state, msg) \ - if (tracker->any_limit_exceeded()) RETURN_LIMIT_EXCEEDED(tracker, state, msg); +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ + return thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->mem_limit_exceeded( \ + state, msg, ##__VA_ARGS__); +#define RETURN_IF_LIMIT_EXCEEDED(state, msg) \ + if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->any_limit_exceeded()) \ + RETURN_LIMIT_EXCEEDED(state, msg); } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 8947e019bd..86f2976f18 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -42,7 +42,7 @@ MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std: ctor(task_id, new MemTrackerLimiter(mem_limit, label, parent)); }); if (new_emplace) { - LOG(INFO) << "Register task memory tracker, task id: " << task_id + LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); } return _task_mem_trackers[task_id]; @@ -109,11 +109,11 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { for (auto tid : expired_tasks) { if (!_task_mem_trackers[tid]) { _task_mem_trackers.erase(tid); - LOG(INFO) << "Deregister null task memory tracker, task id: " << tid; + LOG(INFO) << "Deregister null query/load memory tracker, query/load id: " << tid; } else { delete _task_mem_trackers[tid]; _task_mem_trackers.erase(tid); - LOG(INFO) << "Deregister not used task memory tracker, task id: " << tid; + LOG(INFO) << "Deregister not used query/load memory tracker, query/load id: " << tid; } } } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 054390d990..02660730c3 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -352,7 +352,7 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) { Status RuntimeState::check_query_state(const std::string& msg) { // TODO: it would be nice if this also checked for cancellation, but doing so breaks // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached. - RETURN_IF_LIMIT_EXCEEDED(_instance_mem_tracker, this, msg); + RETURN_IF_LIMIT_EXCEEDED(this, msg); return query_status(); } diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index a4ebc7ea02..6f419aea16 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -20,11 +20,11 @@ // **Note**: default db will be create if not exist defaultDb = "regression_test" -jdbcUrl = "jdbc:mysql://127.0.0.1:9083/?" +jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?" jdbcUser = "root" jdbcPassword = "" -feHttpAddress = "127.0.0.1:8033" +feHttpAddress = "127.0.0.1:8030" feHttpUser = "root" feHttpPassword = "" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org