This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a8ae590ecb [fix](pipelineX) Fix unexpected OOM on pipelineX (#29436)
3a8ae590ecb is described below

commit 3a8ae590ecb4ecbfa7c6d0e26cbd75671fb0a4d6
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Jan 3 10:40:44 2024 +0800

    [fix](pipelineX) Fix unexpected OOM on pipelineX (#29436)
---
 be/src/pipeline/exec/aggregation_sink_operator.h |  2 +-
 be/src/pipeline/pipeline_x/dependency.h          |  2 +-
 be/src/pipeline/pipeline_x/operator.cpp          | 10 ++++------
 3 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index d02e9a0f52a..c72cbd09201 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -53,7 +53,7 @@ public:
     ~AggSinkDependency() override = default;
 
     void set_ready() override {
-        if (_is_streaming_agg_state()) {
+        if (_shared_state && _is_streaming_agg_state()) {
             if (((SharedState*)Dependency::_shared_state.get())
                         ->data_queue->has_enough_space_to_push()) {
                 Dependency::set_ready();
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index a0504d17886..5161424d8d1 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -59,7 +59,6 @@ struct BasicSharedState {
     DependencySPtr source_dep = nullptr;
     DependencySPtr sink_dep = nullptr;
 
-    virtual Status close(RuntimeState* state) { return Status::OK(); }
     virtual ~BasicSharedState() = default;
 };
 
@@ -90,6 +89,7 @@ public:
     void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
         _shared_state = shared_state;
     }
+    void clear_shared_state() { _shared_state.reset(); }
     virtual std::string debug_string(int indentation_level = 0);
 
     // Start the watcher. We use it to count how long this dependency block 
the current pipeline task.
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 7d40e96f38c..bc16d70b86c 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -376,11 +376,9 @@ Status 
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    if (_shared_state) {
-        RETURN_IF_ERROR(_shared_state->close(state));
-    }
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
+        _dependency->clear_shared_state();
     }
     if (_rows_returned_counter != nullptr) {
         COUNTER_SET(_rows_returned_counter, _num_rows_returned);
@@ -439,11 +437,11 @@ Status 
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
     if (_closed) {
         return Status::OK();
     }
-    if (_shared_state) {
-        RETURN_IF_ERROR(_shared_state->close(state));
-    }
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
+        if constexpr (!std::is_same_v<LocalExchangeSinkDependency, 
DependencyType>) {
+            _dependency->clear_shared_state();
+        }
     }
     if (_peak_memory_usage_counter) {
         _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to