This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d431d234887c9521933238cea3c0de1c2e4153d5
Author: yiguolei <676222...@qq.com>
AuthorDate: Thu Feb 8 13:59:44 2024 +0800

    [improvement](memtracker) should counter memory usage to query when 
exchange sink buffer rpc (#30964)
    
    * [improvement](memtracker) should counter memory usage to query when rpc 
callback
    
    * f
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp  | 9 +++++++++
 be/src/pipeline/exec/exchange_sink_buffer.h    | 1 +
 be/src/pipeline/pipeline_task.cpp              | 1 -
 be/src/pipeline/pipeline_task.h                | 2 ++
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 1 -
 be/src/pipeline/task_scheduler.cpp             | 6 ++++--
 6 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 2ea8f6e576e..ed7f18bfcb7 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -93,6 +93,7 @@ ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId 
query_id, PlanNodeId de
           _dest_node_id(dest_node_id),
           _sender_id(send_id),
           _be_number(be_number),
+          _state(state),
           _context(state->get_query_ctx()) {}
 
 template <typename Parent>
@@ -281,6 +282,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
                 // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
                 return;
             }
+            // attach task for memory tracker and query id when core
+            SCOPED_ATTACH_TASK(_state);
             _failed(id, err);
         });
         send_callback->start_rpc_time = GetCurrentTimeNanos();
@@ -293,6 +296,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
                 // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
                 return;
             }
+            // attach task for memory tracker and query id when core
+            SCOPED_ATTACH_TASK(_state);
             set_rpc_time(id, start_rpc_time, result.receive_time());
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
@@ -356,6 +361,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
                 // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
                 return;
             }
+            // attach task for memory tracker and query id when core
+            SCOPED_ATTACH_TASK(_state);
             _failed(id, err);
         });
         send_callback->start_rpc_time = GetCurrentTimeNanos();
@@ -368,6 +375,8 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
                 // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
                 return;
             }
+            // attach task for memory tracker and query id when core
+            SCOPED_ATTACH_TASK(_state);
             set_rpc_time(id, start_rpc_time, result.receive_time());
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index c04d2973d5e..87fd378df4c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -257,6 +257,7 @@ private:
     int _sender_id;
     int _be_number;
     std::atomic<int64_t> _rpc_count = 0;
+    RuntimeState* _state = nullptr;
     QueryContext* _context = nullptr;
 
     Status _send_rpc(InstanceLoId);
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index f31a39df31a..a6b6329cc0c 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -211,7 +211,6 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
 Status PipelineTask::execute(bool* eos) {
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_TIMER(_exec_timer);
-    SCOPED_ATTACH_TASK(_state);
     int64_t time_spent = 0;
 
     ThreadCpuStopWatch cpu_time_stop_watch;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index be1531de4e1..b54a6de593d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -286,6 +286,8 @@ public:
         }
     }
 
+    RuntimeState* runtime_state() { return _state; }
+
 protected:
     void _finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index f0b87ce8613..529eff5068f 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -214,7 +214,6 @@ Status PipelineXTask::_open() {
 Status PipelineXTask::execute(bool* eos) {
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_TIMER(_exec_timer);
-    SCOPED_ATTACH_TASK(_state);
     int64_t time_spent = 0;
 
     ThreadCpuStopWatch cpu_time_stop_watch;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 77cdddfb4a7..02af0991208 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -241,11 +241,13 @@ void TaskScheduler::_do_work(size_t index) {
         task->set_running(true);
         task->set_task_queue(_task_queue.get());
         auto* fragment_ctx = task->fragment_context();
-        signal::query_id_hi = fragment_ctx->get_query_id().hi;
-        signal::query_id_lo = fragment_ctx->get_query_id().lo;
         bool canceled = fragment_ctx->is_canceled();
 
         auto state = task->get_state();
+        // Has to attach memory tracker here, because the close task will also 
release some memory.
+        // Should count the memory to the query or the query's memory will not 
decrease when part of
+        // task finished.
+        SCOPED_ATTACH_TASK(task->runtime_state());
         // If the state is PENDING_FINISH, then the task is come from blocked 
queue, its is_pending_finish
         // has to return false. The task is finished and need to close now.
         if (state == PipelineTaskState::PENDING_FINISH) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to