This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3a8ae590ecb [fix](pipelineX) Fix unexpected OOM on pipelineX (#29436) 3a8ae590ecb is described below commit 3a8ae590ecb4ecbfa7c6d0e26cbd75671fb0a4d6 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Jan 3 10:40:44 2024 +0800 [fix](pipelineX) Fix unexpected OOM on pipelineX (#29436) --- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- be/src/pipeline/pipeline_x/dependency.h | 2 +- be/src/pipeline/pipeline_x/operator.cpp | 10 ++++------ 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index d02e9a0f52a..c72cbd09201 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -53,7 +53,7 @@ public: ~AggSinkDependency() override = default; void set_ready() override { - if (_is_streaming_agg_state()) { + if (_shared_state && _is_streaming_agg_state()) { if (((SharedState*)Dependency::_shared_state.get()) ->data_queue->has_enough_space_to_push()) { Dependency::set_ready(); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index a0504d17886..5161424d8d1 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -59,7 +59,6 @@ struct BasicSharedState { DependencySPtr source_dep = nullptr; DependencySPtr sink_dep = nullptr; - virtual Status close(RuntimeState* state) { return Status::OK(); } virtual ~BasicSharedState() = default; }; @@ -90,6 +89,7 @@ public: void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) { _shared_state = shared_state; } + void clear_shared_state() { _shared_state.reset(); } virtual std::string debug_string(int indentation_level = 0); // Start the watcher. We use it to count how long this dependency block the current pipeline task. diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 7d40e96f38c..bc16d70b86c 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -376,11 +376,9 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - if (_shared_state) { - RETURN_IF_ERROR(_shared_state->close(state)); - } if constexpr (!std::is_same_v<DependencyType, FakeDependency>) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); + _dependency->clear_shared_state(); } if (_rows_returned_counter != nullptr) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); @@ -439,11 +437,11 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu if (_closed) { return Status::OK(); } - if (_shared_state) { - RETURN_IF_ERROR(_shared_state->close(state)); - } if constexpr (!std::is_same_v<DependencyType, FakeDependency>) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); + if constexpr (!std::is_same_v<LocalExchangeSinkDependency, DependencyType>) { + _dependency->clear_shared_state(); + } } if (_peak_memory_usage_counter) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org