This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d431d234887c9521933238cea3c0de1c2e4153d5 Author: yiguolei <676222...@qq.com> AuthorDate: Thu Feb 8 13:59:44 2024 +0800 [improvement](memtracker) should counter memory usage to query when exchange sink buffer rpc (#30964) * [improvement](memtracker) should counter memory usage to query when rpc callback * f --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 9 +++++++++ be/src/pipeline/exec/exchange_sink_buffer.h | 1 + be/src/pipeline/pipeline_task.cpp | 1 - be/src/pipeline/pipeline_task.h | 2 ++ be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 1 - be/src/pipeline/task_scheduler.cpp | 6 ++++-- 6 files changed, 16 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 2ea8f6e576e..ed7f18bfcb7 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -93,6 +93,7 @@ ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId de _dest_node_id(dest_node_id), _sender_id(send_id), _be_number(be_number), + _state(state), _context(state->get_query_ctx()) {} template <typename Parent> @@ -281,6 +282,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. return; } + // attach task for memory tracker and query id when core + SCOPED_ATTACH_TASK(_state); _failed(id, err); }); send_callback->start_rpc_time = GetCurrentTimeNanos(); @@ -293,6 +296,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. return; } + // attach task for memory tracker and query id when core + SCOPED_ATTACH_TASK(_state); set_rpc_time(id, start_rpc_time, result.receive_time()); Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { @@ -356,6 +361,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. return; } + // attach task for memory tracker and query id when core + SCOPED_ATTACH_TASK(_state); _failed(id, err); }); send_callback->start_rpc_time = GetCurrentTimeNanos(); @@ -368,6 +375,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. return; } + // attach task for memory tracker and query id when core + SCOPED_ATTACH_TASK(_state); set_rpc_time(id, start_rpc_time, result.receive_time()); Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index c04d2973d5e..87fd378df4c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -257,6 +257,7 @@ private: int _sender_id; int _be_number; std::atomic<int64_t> _rpc_count = 0; + RuntimeState* _state = nullptr; QueryContext* _context = nullptr; Status _send_rpc(InstanceLoId); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index f31a39df31a..a6b6329cc0c 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -211,7 +211,6 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) { Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); - SCOPED_ATTACH_TASK(_state); int64_t time_spent = 0; ThreadCpuStopWatch cpu_time_stop_watch; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index be1531de4e1..b54a6de593d 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -286,6 +286,8 @@ public: } } + RuntimeState* runtime_state() { return _state; } + protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index f0b87ce8613..529eff5068f 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -214,7 +214,6 @@ Status PipelineXTask::_open() { Status PipelineXTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); - SCOPED_ATTACH_TASK(_state); int64_t time_spent = 0; ThreadCpuStopWatch cpu_time_stop_watch; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 77cdddfb4a7..02af0991208 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -241,11 +241,13 @@ void TaskScheduler::_do_work(size_t index) { task->set_running(true); task->set_task_queue(_task_queue.get()); auto* fragment_ctx = task->fragment_context(); - signal::query_id_hi = fragment_ctx->get_query_id().hi; - signal::query_id_lo = fragment_ctx->get_query_id().lo; bool canceled = fragment_ctx->is_canceled(); auto state = task->get_state(); + // Has to attach memory tracker here, because the close task will also release some memory. + // Should count the memory to the query or the query's memory will not decrease when part of + // task finished. + SCOPED_ATTACH_TASK(task->runtime_state()); // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish // has to return false. The task is finished and need to close now. if (state == PipelineTaskState::PENDING_FINISH) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org