This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4654fe0a748db8696ab7b8d8774290f1b55fec90 Author: wangbo <wan...@apache.org> AuthorDate: Wed Jun 7 01:27:13 2023 +0800 [Bug](memleak) Fix emptyoperator may cause node not close (#20525) --- be/src/pipeline/exec/empty_source_operator.cpp | 2 +- be/src/pipeline/exec/empty_source_operator.h | 19 ++++++++++++++++--- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/empty_source_operator.cpp b/be/src/pipeline/exec/empty_source_operator.cpp index 5142c0c55a..78f5c94662 100644 --- a/be/src/pipeline/exec/empty_source_operator.cpp +++ b/be/src/pipeline/exec/empty_source_operator.cpp @@ -21,7 +21,7 @@ namespace doris::pipeline { OperatorPtr EmptySourceOperatorBuilder::build_operator() { - return std::make_shared<EmptySourceOperator>(this); + return std::make_shared<EmptySourceOperator>(this, _exec_node); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/empty_source_operator.h b/be/src/pipeline/exec/empty_source_operator.h index fd12b27dab..4d93a310df 100644 --- a/be/src/pipeline/exec/empty_source_operator.h +++ b/be/src/pipeline/exec/empty_source_operator.h @@ -37,8 +37,10 @@ namespace doris::pipeline { class EmptySourceOperatorBuilder final : public OperatorBuilderBase { public: - EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor) - : OperatorBuilderBase(id, "EmptySourceOperator"), _row_descriptor(row_descriptor) {} + EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor, ExecNode* exec_node) + : OperatorBuilderBase(id, "EmptySourceOperator"), + _row_descriptor(row_descriptor), + _exec_node(exec_node) {} bool is_source() const override { return true; } @@ -48,11 +50,14 @@ public: private: RowDescriptor _row_descriptor; + ExecNode* _exec_node = nullptr; }; class EmptySourceOperator final : public OperatorBase { public: - EmptySourceOperator(OperatorBuilderBase* builder) : OperatorBase(builder) {} + EmptySourceOperator(OperatorBuilderBase* builder, ExecNode* exec_node) + : OperatorBase(builder), _exec_node(exec_node) {} + bool can_read() override { return true; } bool is_pending_finish() const override { return false; } @@ -67,6 +72,14 @@ public: } Status sink(RuntimeState*, vectorized::Block*, SourceState) override { return Status::OK(); } + + Status close(RuntimeState* state) override { + _exec_node->close(state); + return Status::OK(); + } + +private: + ExecNode* _exec_node = nullptr; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 32207ad216..ebb30fe384 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -596,7 +596,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe)); } else { OperatorBuilderPtr builder = std::make_shared<EmptySourceOperatorBuilder>( - next_operator_builder_id(), node->child(1)->row_desc()); + next_operator_builder_id(), node->child(1)->row_desc(), node->child(1)); new_pipe->add_operator(builder); } OperatorBuilderPtr join_sink = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org