This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new c58b3e934da branch-3.1: (memtracker) memory not consumed by memtracker
#55796 (#55823)
c58b3e934da is described below
commit c58b3e934daabc1262fbff26df74859d716e8d5d
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 11 10:46:11 2025 +0800
branch-3.1: (memtracker) memory not consumed by memtracker #55796 (#55823)
picked from #55796
---
be/src/pipeline/pipeline_task.cpp | 19 ++++++++++++++++++-
be/src/pipeline/pipeline_task.h | 3 +++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index dfda613af52..04f869007da 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -74,13 +74,30 @@ PipelineTask::PipelineTask(
_task_idx(task_idx),
_pipeline_name(_pipeline->name()) {
_pipeline_task_watcher.start();
+#ifndef BE_TEST
+ _query_mem_tracker = fragment_context->get_query_ctx()->query_mem_tracker;
+#endif
_execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency());
auto shared_state = _sink->create_shared_state();
if (shared_state) {
_sink_shared_state = shared_state;
}
}
-
+PipelineTask::~PipelineTask() {
+// PipelineTask is also hold by task queue(
https://github.com/apache/doris/pull/49753),
+// so that it maybe the last one to be destructed.
+// But pipeline task hold some objects, like operators, shared state, etc. So
that should release
+// memory manually.
+#ifndef BE_TEST
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+#endif
+ _sink_shared_state.reset();
+ _op_shared_states.clear();
+ _sink.reset();
+ _operators.clear();
+ _block.reset();
+ _pipeline.reset();
+}
Status PipelineTask::prepare(const TPipelineInstanceParams& local_params,
const TDataSink& tsink,
QueryContext* query_ctx) {
DCHECK(_sink);
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 3a50436280d..645879b7043 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -54,6 +54,7 @@ public:
std::shared_ptr<Dependency>>>
le_state_map,
int task_idx);
+ ~PipelineTask();
Status prepare(const TPipelineInstanceParams& local_params, const
TDataSink& tsink,
QueryContext* query_ctx);
@@ -316,6 +317,8 @@ private:
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_early = false;
const std::string _pipeline_name;
+ // PipelineTask maybe hold by TaskQueue
+ std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
};
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]