This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 276625d17d4 [pipelineX](refactor) refine relationship between shared 
state and dependency (#30294)
276625d17d4 is described below

commit 276625d17d4b5e189250d15b8b8865b0a19819b8
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Jan 24 17:00:48 2024 +0800

    [pipelineX](refactor) refine relationship between shared state and 
dependency (#30294)
---
 be/src/pipeline/exec/aggregation_sink_operator.h   | 14 ++--
 be/src/pipeline/exec/aggregation_source_operator.h |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  4 -
 be/src/pipeline/exec/analytic_source_operator.h    |  4 +-
 .../pipeline/exec/multi_cast_data_stream_sink.cpp  | 11 ---
 be/src/pipeline/exec/multi_cast_data_stream_sink.h |  3 +-
 .../exec/multi_cast_data_stream_source.cpp         |  7 --
 .../pipeline/exec/multi_cast_data_stream_source.h  |  1 -
 be/src/pipeline/exec/multi_cast_data_streamer.cpp  | 32 ++-----
 be/src/pipeline/exec/multi_cast_data_streamer.h    | 13 +--
 be/src/pipeline/exec/set_probe_sink_operator.h     |  2 +-
 be/src/pipeline/exec/set_sink_operator.h           |  2 +-
 be/src/pipeline/exec/set_source_operator.cpp       |  9 +-
 be/src/pipeline/exec/union_source_operator.cpp     | 28 ++-----
 be/src/pipeline/exec/union_source_operator.h       |  1 -
 be/src/pipeline/pipeline_x/dependency.cpp          |  6 +-
 be/src/pipeline/pipeline_x/dependency.h            | 97 +++++-----------------
 .../local_exchange_source_operator.cpp             | 14 +---
 .../local_exchange_source_operator.h               |  1 -
 be/src/pipeline/pipeline_x/operator.cpp            | 76 +++++++++++------
 be/src/pipeline/pipeline_x/operator.h              | 23 +++--
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 12 ++-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 20 +++--
 be/src/pipeline/pipeline_x/pipeline_x_task.h       | 26 +++++-
 24 files changed, 164 insertions(+), 244 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index bf41fe30536..a7b30d46117 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -53,9 +53,8 @@ public:
     ~AggSinkDependency() override = default;
 
     void set_ready() override {
-        std::shared_ptr<BasicSharedState> shared_state = _shared_state;
-        if (shared_state && _is_streaming_agg_state(shared_state)) {
-            if 
(((SharedState*)shared_state.get())->data_queue->has_enough_space_to_push()) {
+        if (_shared_state && _is_streaming_agg_state(_shared_state)) {
+            if 
(((SharedState*)_shared_state)->data_queue->has_enough_space_to_push()) {
                 Dependency::set_ready();
             }
         } else {
@@ -64,9 +63,8 @@ public:
     }
 
     void block() override {
-        std::shared_ptr<BasicSharedState> shared_state = _shared_state;
-        if (_is_streaming_agg_state(shared_state)) {
-            if 
(!((SharedState*)shared_state.get())->data_queue->has_enough_space_to_push()) {
+        if (_is_streaming_agg_state(_shared_state)) {
+            if 
(!((SharedState*)_shared_state)->data_queue->has_enough_space_to_push()) {
                 Dependency::block();
             }
         } else {
@@ -75,8 +73,8 @@ public:
     }
 
 private:
-    static bool _is_streaming_agg_state(const 
std::shared_ptr<BasicSharedState>& shared_state) {
-        return ((SharedState*)shared_state.get())->data_queue != nullptr;
+    static bool _is_streaming_agg_state(const BasicSharedState* shared_state) {
+        return ((SharedState*)shared_state)->data_queue != nullptr;
     }
 };
 
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index a7125c42ff6..c03aefcc04b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -63,7 +63,7 @@ public:
 
 private:
     bool _is_streaming_agg_state() {
-        return ((SharedState*)Dependency::_shared_state.get())->data_queue != 
nullptr;
+        return ((SharedState*)Dependency::_shared_state)->data_queue != 
nullptr;
     }
 };
 
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 0214b22c006..064d68f189a 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -63,10 +63,6 @@ public:
             : PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
-    Status close(RuntimeState* state, Status exec_status) override {
-        _shared_state->release_sink_dep();
-        return PipelineXSinkLocalState<AnalyticSinkDependency>::close(state, 
exec_status);
-    }
 
 private:
     friend class AnalyticSinkOperatorX;
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index dc86bc95062..f4e2f10a719 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -84,9 +84,7 @@ private:
         if (need_more_input) {
             _dependency->block();
             _dependency->set_ready_to_write();
-            if (!_shared_state->sink_released_flag) {
-                _shared_state->sink_dep->set_ready();
-            }
+            _shared_state->sink_dep->set_ready();
         } else {
             _dependency->set_block_to_write();
             _dependency->set_ready();
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index 8a45634027f..6b5506be269 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -35,15 +35,4 @@ std::string MultiCastDataStreamSinkLocalState::name_suffix() 
{
     return id_name;
 }
 
-Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
-    auto multi_cast_data_streamer = 
static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)
-                                            
->create_multi_cast_data_streamer();
-    auto& deps = info.dependencys;
-    for (auto dep : deps) {
-        
((MultiCastSinkDependency*)dep.get())->set_shared_state(multi_cast_data_streamer);
-    }
-    RETURN_IF_ERROR(Base::init(state, info));
-    return Status::OK();
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index 965605fd3c7..84a75720a66 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -59,7 +59,6 @@ class MultiCastDataStreamSinkLocalState final
     friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
     using Base = PipelineXSinkLocalState<MultiCastSinkDependency>;
     using Parent = MultiCastDataStreamSinkOperatorX;
-    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     std::string name_suffix() override;
 
 private:
@@ -113,7 +112,7 @@ private:
     friend class MultiCastDataStreamSinkLocalState;
     ObjectPool* _pool;
     RowDescriptor _row_desc;
-    int _cast_sender_count;
+    const int _cast_sender_count;
     const TMultiCastDataStreamSink& _sink;
     friend class MultiCastDataStreamSinkLocalState;
 };
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index d360e2eb5dd..6ac06ee5f10 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -152,13 +152,6 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     return Status::OK();
 }
 
-Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
-    _shared_state->multi_cast_data_streamer.released_dependency(
-            _parent->cast<Parent>()._consumer_id);
-    RETURN_IF_ERROR(Base::close(state));
-    return Status::OK();
-}
-
 Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
                                                        vectorized::Block* 
block,
                                                        SourceState& 
source_state) {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 73c506f9cb8..5dae2be31f7 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -117,7 +117,6 @@ public:
         RETURN_IF_ERROR(_acquire_runtime_filter());
         return Status::OK();
     }
-    Status close(RuntimeState* state) override;
     friend class MultiCastDataStreamerSourceOperatorX;
 
     RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); }
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index f3e44731aef..175a21469b8 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -104,37 +104,15 @@ void MultiCastDataStreamer::_set_ready_for_read(int 
sender_idx) {
     if (_dependencies.empty()) {
         return;
     }
-    if (_dependencies_release_flag[sender_idx]) {
-        return;
-    }
-    {
-        std::unique_lock<std::mutex> lc(_release_lock);
-        if (_dependencies_release_flag[sender_idx]) {
-            return;
-        }
-        auto* dep = _dependencies[sender_idx];
-        DCHECK(dep);
-        dep->set_ready();
-    }
+    auto* dep = _dependencies[sender_idx];
+    DCHECK(dep);
+    dep->set_ready();
 }
 
 void MultiCastDataStreamer::_set_ready_for_read() {
-    size_t i = 0;
     for (auto* dep : _dependencies) {
-        if (_dependencies_release_flag[i]) {
-            i++;
-            continue;
-        }
-        {
-            std::unique_lock<std::mutex> lc(_release_lock);
-            if (_dependencies_release_flag[i]) {
-                i++;
-                continue;
-            }
-            DCHECK(dep);
-            dep->set_ready();
-            i++;
-        }
+        DCHECK(dep);
+        dep->set_ready();
     }
 }
 
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 7f221d622c0..5e4179e0cad 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -38,14 +38,10 @@ public:
                           bool with_dependencies = false)
             : _row_desc(row_desc),
               _profile(pool->add(new 
RuntimeProfile("MultiCastDataStreamSink"))),
-              _cast_sender_count(cast_sender_count),
-              _dependencies_release_flag(cast_sender_count) {
+              _cast_sender_count(cast_sender_count) {
         _sender_pos_to_read.resize(cast_sender_count, 
_multi_cast_blocks.end());
         if (with_dependencies) {
             _dependencies.resize(cast_sender_count, nullptr);
-            for (size_t i = 0; i < cast_sender_count; i++) {
-                _dependencies_release_flag[i] = false;
-            }
         }
 
         _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
@@ -83,11 +79,6 @@ public:
         _block_reading(sender_idx);
     }
 
-    void released_dependency(int sender_idx) {
-        std::unique_lock<std::mutex> lc(_release_lock);
-        _dependencies_release_flag[sender_idx] = true;
-    }
-
 private:
     void _set_ready_for_read(int sender_idx);
     void _set_ready_for_read();
@@ -106,8 +97,6 @@ private:
     RuntimeProfile::Counter* _process_rows = nullptr;
     RuntimeProfile::Counter* _peak_mem_usage = nullptr;
 
-    std::mutex _release_lock;
-    std::vector<std::atomic<bool>> _dependencies_release_flag;
     std::vector<MultiCastSourceDependency*> _dependencies;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 6ac09ccfd9d..b8687376059 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -76,7 +76,7 @@ public:
 
     void set_cur_child_id(int id) {
         _child_idx = id;
-        
((SetSharedState*)_shared_state.get())->probe_finished_children_dependency[id] 
= this;
+        
((SetSharedState*)_shared_state)->probe_finished_children_dependency[id] = this;
         block();
     }
 
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index b851e18ce5b..946720cd179 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -68,7 +68,7 @@ public:
     ~SetSinkDependency() override = default;
 
     void set_cur_child_id(int id) {
-        
((SetSharedState*)_shared_state.get())->probe_finished_children_dependency[id] 
= this;
+        
((SetSharedState*)_shared_state)->probe_finished_children_dependency[id] = this;
     }
 };
 
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 03cd67477db..1e64bf8a50d 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -51,13 +51,14 @@ template class SetSourceOperator<false>;
 
 template <bool is_intersect>
 Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, 
LocalStateInfo& info) {
-    std::shared_ptr<typename SetSourceDependency::SharedState> ss = nullptr;
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
     auto& deps = info.upstream_dependencies;
-    ss.reset(new typename SetSourceDependency::SharedState(deps.size()));
+    _shared_state->probe_finished_children_dependency.resize(deps.size(), 
nullptr);
     for (auto& dep : deps) {
-        ((SetSourceDependency*)dep.get())->set_shared_state(ss);
+        dep->set_shared_state(_dependency->shared_state());
     }
-    RETURN_IF_ERROR(Base::init(state, info));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index e8ef1ba7207..709e89368a8 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -105,28 +105,19 @@ Status UnionSourceOperator::get_block(RuntimeState* 
state, vectorized::Block* bl
 }
 
 Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<Parent>();
     int child_count = p.get_child_count();
-    auto ss = create_shared_state();
     if (child_count != 0) {
         auto& deps = info.upstream_dependencies;
         for (auto& dep : deps) {
-            ((UnionSinkDependency*)dep.get())->set_shared_state(ss);
+            dep->set_shared_state(_dependency->shared_state());
         }
-    } else {
-        auto& deps = info.upstream_dependencies;
-        DCHECK(child_count == 0);
-        DCHECK(deps.size() == 1);
-        DCHECK(deps.front() == nullptr);
-        //child_count == 0 , we need to creat a  UnionDependency
-        deps.front() = std::make_shared<UnionSourceDependency>(
-                _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
-        ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
     }
-    RETURN_IF_ERROR(Base::init(state, info));
-    ss->data_queue.set_source_dependency(info.dependency);
-    SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    ((UnionSharedState*)_dependency->shared_state())
+            ->data_queue.set_source_dependency(info.dependency);
     // Const exprs materialized by this node. These exprs don't refer to any 
children.
     // Only materialized by the first fragment instance to avoid duplication.
     if (state->per_fragment_instance_idx() == 0) {
@@ -151,13 +142,6 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     return Status::OK();
 }
 
-std::shared_ptr<UnionSharedState> UnionSourceLocalState::create_shared_state() 
{
-    auto& p = _parent->cast<Parent>();
-    std::shared_ptr<UnionSharedState> data_queue =
-            std::make_shared<UnionSharedState>(p._child_size);
-    return data_queue;
-}
-
 std::string UnionSourceLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 0c150a072b6..887c0cb9639 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -86,7 +86,6 @@ public:
     UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
-    std::shared_ptr<UnionSharedState> create_shared_state();
 
     [[nodiscard]] std::string debug_string(int indentation_level = 0) const 
override;
 
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index bb27a688820..9cb6e99b44f 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -189,13 +189,9 @@ void 
LocalExchangeSharedState::sub_running_sink_operators() {
     }
 }
 
-LocalExchangeSharedState::LocalExchangeSharedState(int num_instances)
-        : dependencies_release_flag(num_instances) {
+LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
     source_dependencies.resize(num_instances, nullptr);
     mem_trackers.resize(num_instances, nullptr);
-    for (size_t i = 0; i < num_instances; i++) {
-        dependencies_release_flag[i] = false;
-    }
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 172f7383f3e..59f9fee3775 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -57,22 +57,22 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L 
* 1000L * 1000L;
 static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
 
 struct BasicSharedState {
-    Dependency* source_dep = nullptr;
-    Dependency* sink_dep = nullptr;
-
-    std::atomic_bool source_released_flag {false};
-    std::atomic_bool sink_released_flag {false};
-    std::mutex source_release_lock;
-    std::mutex sink_release_lock;
-
-    void release_source_dep() {
-        std::unique_lock<std::mutex> lc(source_release_lock);
-        source_released_flag = true;
+    template <class TARGET>
+    TARGET* cast() {
+        DCHECK(dynamic_cast<TARGET*>(this))
+                << " Mismatch type! Current type is " << typeid(*this).name()
+                << " and expect type is" << typeid(TARGET).name();
+        return reinterpret_cast<TARGET*>(this);
     }
-    void release_sink_dep() {
-        std::unique_lock<std::mutex> lc(sink_release_lock);
-        sink_released_flag = true;
+    template <class TARGET>
+    const TARGET* cast() const {
+        DCHECK(dynamic_cast<const TARGET*>(this))
+                << " Mismatch type! Current type is " << typeid(*this).name()
+                << " and expect type is" << typeid(TARGET).name();
+        return reinterpret_cast<const TARGET*>(this);
     }
+    DependencySPtr source_dep = nullptr;
+    DependencySPtr sink_dep = nullptr;
     virtual ~BasicSharedState() = default;
 };
 
@@ -99,11 +99,8 @@ public:
     [[nodiscard]] int id() const { return _id; }
     [[nodiscard]] virtual std::string name() const { return _name; }
     void add_child(std::shared_ptr<Dependency> child) { 
_children.push_back(child); }
-    std::shared_ptr<BasicSharedState> shared_state() { return _shared_state; }
-    void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
-        _shared_state = shared_state;
-    }
-    void clear_shared_state() { _shared_state.reset(); }
+    BasicSharedState* shared_state() { return _shared_state; }
+    void set_shared_state(BasicSharedState* shared_state) { _shared_state = 
shared_state; }
     virtual std::string debug_string(int indentation_level = 0);
 
     // Start the watcher. We use it to count how long this dependency block 
the current pipeline task.
@@ -121,47 +118,19 @@ public:
     virtual void set_ready();
     void set_ready_to_read() {
         DCHECK(_is_write_dependency) << debug_string();
-        if (_shared_state->source_released_flag) {
-            return;
-        }
-        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
-        if (_shared_state->source_released_flag) {
-            return;
-        }
         DCHECK(_shared_state->source_dep != nullptr) << debug_string();
         _shared_state->source_dep->set_ready();
     }
     void set_block_to_read() {
         DCHECK(_is_write_dependency) << debug_string();
-        if (_shared_state->source_released_flag) {
-            return;
-        }
-        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
-        if (_shared_state->source_released_flag) {
-            return;
-        }
         DCHECK(_shared_state->source_dep != nullptr) << debug_string();
         _shared_state->source_dep->block();
     }
     void set_ready_to_write() {
-        if (_shared_state->sink_released_flag) {
-            return;
-        }
-        std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
-        if (_shared_state->sink_released_flag) {
-            return;
-        }
         DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
         _shared_state->sink_dep->set_ready();
     }
     void set_block_to_write() {
-        if (_shared_state->sink_released_flag) {
-            return;
-        }
-        std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
-        if (_shared_state->sink_released_flag) {
-            return;
-        }
         DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
         _shared_state->sink_dep->block();
     }
@@ -180,7 +149,7 @@ protected:
     std::atomic<bool> _ready;
     const QueryContext* _query_ctx = nullptr;
 
-    std::shared_ptr<BasicSharedState> _shared_state = nullptr;
+    BasicSharedState* _shared_state = nullptr;
     MonotonicStopWatch _watcher;
     std::list<std::shared_ptr<Dependency>> _children;
 
@@ -524,7 +493,6 @@ public:
 
 struct SetSharedState : public BasicSharedState {
 public:
-    SetSharedState(int num_deps) { 
probe_finished_children_dependency.resize(num_deps, nullptr); }
     /// default init
     vectorized::Block build_block; // build to source
     //record element size in hashtable
@@ -666,45 +634,24 @@ public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     LocalExchangeSharedState(int num_instances);
     std::unique_ptr<Exchanger> exchanger {};
-    std::vector<Dependency*> source_dependencies;
-    std::vector<std::atomic_bool> dependencies_release_flag;
-    Dependency* sink_dependency;
+    std::vector<DependencySPtr> source_dependencies;
+    DependencySPtr sink_dependency;
     std::vector<MemTracker*> mem_trackers;
     std::atomic<size_t> mem_usage = 0;
     std::mutex le_lock;
     void sub_running_sink_operators();
     void _set_ready_for_read() {
-        size_t i = 0;
         for (auto& dep : source_dependencies) {
-            if (dependencies_release_flag[i]) {
-                i++;
-                continue;
-            }
-            {
-                std::unique_lock<std::mutex> lc(source_release_lock);
-                if (dependencies_release_flag[i]) {
-                    i++;
-                    continue;
-                }
-                DCHECK(dep);
-                dep->set_ready();
-                i++;
-            }
+            DCHECK(dep);
+            dep->set_ready();
         }
     }
 
-    void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+    void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
         source_dependencies[channel_id] = dep;
     }
 
     void set_ready_to_read(int channel_id) {
-        if (dependencies_release_flag[channel_id]) {
-            return;
-        }
-        std::unique_lock<std::mutex> lc(source_release_lock);
-        if (dependencies_release_flag[channel_id]) {
-            return;
-        }
         auto& dep = source_dependencies[channel_id];
         DCHECK(dep) << channel_id;
         dep->set_ready();
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 9e98e3b6e8f..edfb8114811 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -22,11 +22,11 @@
 namespace doris::pipeline {
 
 void LocalExchangeSourceDependency::block() {
-    if 
(((LocalExchangeSharedState*)_shared_state.get())->exchanger->_running_sink_operators
 == 0) {
+    if 
(((LocalExchangeSharedState*)_shared_state)->exchanger->_running_sink_operators 
== 0) {
         return;
     }
-    std::unique_lock<std::mutex> 
lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
-    if 
(((LocalExchangeSharedState*)_shared_state.get())->exchanger->_running_sink_operators
 == 0) {
+    std::unique_lock<std::mutex> 
lc(((LocalExchangeSharedState*)_shared_state)->le_lock);
+    if 
(((LocalExchangeSharedState*)_shared_state)->exchanger->_running_sink_operators 
== 0) {
         return;
     }
     Dependency::block();
@@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     _channel_id = info.task_idx;
-    _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
+    _shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
     _shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
     _exchanger = _shared_state->exchanger.get();
     DCHECK(_exchanger != nullptr);
@@ -61,12 +61,6 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
     return fmt::to_string(debug_string_buffer);
 }
 
-Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
-    _shared_state->dependencies_release_flag[_channel_id] = true;
-    RETURN_IF_ERROR(Base::close(state));
-    return Status::OK();
-}
-
 Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                SourceState& source_state) {
     auto& local_state = get_local_state(state);
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 4c95a84b533..63d71bbe08b 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -48,7 +48,6 @@ public:
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     std::string debug_string(int indentation_level) const override;
-    Status close(RuntimeState* state) override;
 
 private:
     friend class LocalExchangeSourceOperatorX;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 21453dfbc3c..db687865657 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -280,18 +280,32 @@ template <>
 inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false;
 
 template <typename LocalStateType>
-void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& 
dependency,
-                                                       QueryContext* ctx) {
-    std::shared_ptr<typename LocalStateType::DependencyType::SharedState> ss = 
nullptr;
+void DataSinkOperatorX<LocalStateType>::get_dependency(
+        vector<DependencySPtr>& dependency,
+        std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, 
QueryContext* ctx) {
+    std::shared_ptr<BasicSharedState> ss = nullptr;
     if constexpr (NeedToCreate<typename 
LocalStateType::DependencyType::SharedState>) {
         ss.reset(new typename LocalStateType::DependencyType::SharedState());
+        DCHECK(!shared_states.contains(dests_id().front()));
+        if constexpr (!std::is_same_v<typename 
LocalStateType::DependencyType::SharedState,
+                                      FakeSharedState>) {
+            shared_states.insert({dests_id().front(), ss});
+        }
+    } else if constexpr (std::is_same_v<typename 
LocalStateType::DependencyType::SharedState,
+                                        MultiCastSharedState>) {
+        ss = 
((MultiCastDataStreamSinkOperatorX*)this)->create_multi_cast_data_streamer();
+        auto& dests = dests_id();
+        for (auto& dest_id : dests) {
+            DCHECK(!shared_states.contains(dest_id));
+            shared_states.insert({dest_id, ss});
+        }
     }
     if constexpr (!std::is_same_v<typename LocalStateType::DependencyType, 
FakeDependency>) {
         auto& dests = dests_id();
         for (auto& dest_id : dests) {
             dependency.push_back(std::make_shared<typename 
LocalStateType::DependencyType>(
                     dest_id, _node_id, ctx));
-            dependency.back()->set_shared_state(ss);
+            dependency.back()->set_shared_state(ss.get());
         }
     } else {
         dependency.push_back(nullptr);
@@ -299,8 +313,23 @@ void 
DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d
 }
 
 template <typename LocalStateType>
-DependencySPtr OperatorX<LocalStateType>::get_dependency(QueryContext* ctx) {
-    return std::make_shared<typename 
LocalStateType::DependencyType>(_operator_id, _node_id, ctx);
+DependencySPtr OperatorX<LocalStateType>::get_dependency(
+        QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>& 
shared_states) {
+    std::shared_ptr<BasicSharedState> ss = nullptr;
+    if constexpr (std::is_same_v<typename 
LocalStateType::DependencyType::SharedState,
+                                 SetSharedState>) {
+        ss.reset(new typename LocalStateType::DependencyType::SharedState());
+        shared_states.insert({operator_id(), ss});
+    } else if constexpr (std::is_same_v<typename 
LocalStateType::DependencyType::SharedState,
+                                        UnionSharedState>) {
+        ss.reset(new typename LocalStateType::DependencyType::SharedState(
+                ((UnionSourceOperatorX*)this)->get_child_count()));
+        shared_states.insert({operator_id(), ss});
+    }
+    auto dep =
+            std::make_shared<typename 
LocalStateType::DependencyType>(_operator_id, _node_id, ctx);
+    dep->set_shared_state(ss.get());
+    return dep;
 }
 
 template <typename LocalStateType>
@@ -338,19 +367,20 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
                 _runtime_profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
         auto& deps = info.upstream_dependencies;
         if constexpr (std::is_same_v<LocalExchangeSourceDependency, 
DependencyType>) {
-            
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first);
-            _shared_state =
-                    (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
+            
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first.get());
+            _shared_state = _dependency->shared_state()
+                                    ->template cast<typename 
DependencyType::SharedState>();
 
-            _shared_state->source_dep = info.dependency.get();
-            _shared_state->sink_dep = deps.front().get();
+            _shared_state->source_dep = info.dependency;
         } else if constexpr (!is_fake_shared) {
-            _dependency->set_shared_state(deps.front()->shared_state());
-            _shared_state =
-                    (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
+            _dependency->set_shared_state(info.shared_state);
+            _shared_state = _dependency->shared_state()
+                                    ->template cast<typename 
DependencyType::SharedState>();
 
-            _shared_state->source_dep = info.dependency.get();
-            _shared_state->sink_dep = deps.front().get();
+            _shared_state->source_dep = info.dependency;
+            if (!deps.empty()) {
+                _shared_state->sink_dep = deps.front();
+            }
         }
     }
 
@@ -382,12 +412,8 @@ Status 
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    if (_shared_state) {
-        _shared_state->release_source_dep();
-    }
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
-        _dependency->clear_shared_state();
     }
     if (_rows_returned_counter != nullptr) {
         COUNTER_SET(_rows_returned_counter, _num_rows_returned);
@@ -410,22 +436,21 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
     constexpr auto is_fake_shared =
             std::is_same_v<typename DependencyType::SharedState, 
FakeSharedState>;
     if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
-        auto& deps = info.dependencys;
+        auto& deps = info.dependencies;
         _dependency = (DependencyType*)deps.front().get();
         if constexpr (std::is_same_v<LocalExchangeSinkDependency, 
DependencyType>) {
             _dependency = 
info.le_state_map[_parent->dests_id().front()].second.get();
         }
         if (_dependency) {
             if constexpr (!is_fake_shared) {
-                _shared_state =
-                        (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
+                _shared_state = (typename 
DependencyType::SharedState*)_dependency->shared_state();
             }
 
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                     _profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
         }
     } else {
-        auto& deps = info.dependencys;
+        auto& deps = info.dependencies;
         deps.front() = std::make_shared<FakeDependency>(0, 0, 
state->get_query_ctx());
         _dependency = (DependencyType*)deps.front().get();
     }
@@ -448,9 +473,6 @@ Status 
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
     }
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
-        if constexpr (!std::is_same_v<LocalExchangeSinkDependency, 
DependencyType>) {
-            _dependency->clear_shared_state();
-        }
     }
     if (_peak_memory_usage_counter) {
         _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index e40c7849c09..1c076bd5a69 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -34,6 +34,7 @@ struct LocalStateInfo {
     RuntimeProfile* parent_profile = nullptr;
     const std::vector<TScanRangeParams> scan_ranges;
     std::vector<DependencySPtr>& upstream_dependencies;
+    BasicSharedState* shared_state;
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
                             std::shared_ptr<LocalExchangeSinkDependency>>>
             le_state_map;
@@ -47,7 +48,7 @@ struct LocalSinkStateInfo {
     const int task_idx;
     RuntimeProfile* parent_profile = nullptr;
     const int sender_id;
-    std::vector<DependencySPtr>& dependencys;
+    std::vector<DependencySPtr>& dependencies;
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
                             std::shared_ptr<LocalExchangeSinkDependency>>>
             le_state_map;
@@ -187,7 +188,8 @@ public:
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
-    [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
+    [[nodiscard]] virtual DependencySPtr get_dependency(
+            QueryContext* ctx, std::map<int, 
std::shared_ptr<BasicSharedState>>& shared_states) = 0;
     [[nodiscard]] virtual DataDistribution required_data_distribution() const {
         return _child_x && _child_x->ignore_data_distribution() && !is_source()
                        ? DataDistribution(ExchangeType::PASSTHROUGH)
@@ -340,7 +342,9 @@ public:
         return state->get_local_state(operator_id())->template 
cast<LocalState>();
     }
 
-    DependencySPtr get_dependency(QueryContext* ctx) override;
+    DependencySPtr get_dependency(
+            QueryContext* ctx,
+            std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) 
override;
 };
 
 template <typename DependencyArg = FakeDependency>
@@ -448,10 +452,7 @@ protected:
 class DataSinkOperatorXBase : public OperatorBase {
 public:
     DataSinkOperatorXBase(const int operator_id, const int node_id)
-            : OperatorBase(nullptr),
-              _operator_id(operator_id),
-              _node_id(node_id),
-              _dests_id({operator_id}) {}
+            : OperatorBase(nullptr), _operator_id(operator_id), 
_node_id(node_id), _dests_id({1}) {}
 
     DataSinkOperatorXBase(const int operator_id, const int node_id, const int 
dest_id)
             : OperatorBase(nullptr),
@@ -498,7 +499,9 @@ public:
         return reinterpret_cast<const TARGET&>(*this);
     }
 
-    virtual void get_dependency(std::vector<DependencySPtr>& dependency, 
QueryContext* ctx) = 0;
+    virtual void get_dependency(std::vector<DependencySPtr>& dependency,
+                                std::map<int, 
std::shared_ptr<BasicSharedState>>& shared_states,
+                                QueryContext* ctx) = 0;
     [[nodiscard]] virtual DataDistribution required_data_distribution() const {
         return _child_x && _child_x->ignore_data_distribution()
                        ? DataDistribution(ExchangeType::PASSTHROUGH)
@@ -595,7 +598,9 @@ public:
     ~DataSinkOperatorX() override = default;
 
     Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
-    void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* 
ctx) override;
+    void get_dependency(std::vector<DependencySPtr>& dependency,
+                        std::map<int, std::shared_ptr<BasicSharedState>>& 
shared_states,
+                        QueryContext* ctx) override;
 
     using LocalState = LocalStateType;
     [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index aa580b49d48..de0a544f18c 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -618,10 +618,16 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                     for (auto& dep : deps) {
                         if (pipeline_id_to_task.contains(dep)) {
                             task->add_upstream_dependency(
-                                    
pipeline_id_to_task[dep]->get_downstream_dependency());
+                                    
pipeline_id_to_task[dep]->get_downstream_dependency(),
+                                    
pipeline_id_to_task[dep]->get_shared_states());
                         }
                     }
                 }
+            }
+        }
+        for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+            if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
+                auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
                 RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx));
             }
         }
@@ -773,8 +779,8 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
     }
     auto sink_dep = std::make_shared<LocalExchangeSinkDependency>(sink_id, 
local_exchange_id,
                                                                   
_runtime_state->get_query_ctx());
-    sink_dep->set_shared_state(shared_state);
-    shared_state->sink_dependency = sink_dep.get();
+    sink_dep->set_shared_state(shared_state.get());
+    shared_state->sink_dependency = sink_dep;
     _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
 
     // 3. Set two pipelines' operator list. For example, split pipeline [Scan 
- AggSink] to
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index fece30296c9..9a88c417be0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -60,9 +60,10 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t 
task_id, RuntimeSta
           _task_idx(task_idx),
           _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
     _pipeline_task_watcher.start();
-    _sink->get_dependency(_downstream_dependency, state->get_query_ctx());
+    _sink->get_dependency(_downstream_dependency, _shared_states, 
state->get_query_ctx());
     for (auto& op : _operators) {
-        _source_dependency.insert({op->operator_id(), 
op->get_dependency(state->get_query_ctx())});
+        _source_dependency.insert(
+                {op->operator_id(), op->get_dependency(state->get_query_ctx(), 
_shared_states)});
     }
     pipeline->incr_created_tasks();
 }
@@ -94,8 +95,13 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& 
local_params, const
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
         auto& deps = get_upstream_dependency(op->operator_id());
-        LocalStateInfo info {parent_profile, scan_ranges, deps,
-                             _le_state_map,  _task_idx,   
_source_dependency[op->operator_id()]};
+        LocalStateInfo info {parent_profile,
+                             scan_ranges,
+                             deps,
+                             get_shared_state(op->operator_id()),
+                             _le_state_map,
+                             _task_idx,
+                             _source_dependency[op->operator_id()]};
         RETURN_IF_ERROR(op->setup_local_state(_state, info));
         parent_profile = _state->get_local_state(op->operator_id())->profile();
         query_ctx->register_query_statistics(
@@ -297,9 +303,9 @@ void PipelineXTask::finalize() {
     std::unique_lock<std::mutex> lc(_release_lock);
     _finished = true;
     std::vector<DependencySPtr> {}.swap(_downstream_dependency);
-    DependencyMap {}.swap(_upstream_dependency);
-    std::map<int, DependencySPtr> {}.swap(_source_dependency);
-
+    _upstream_dependency.clear();
+    _source_dependency.clear();
+    _shared_states.clear();
     _le_state_map.clear();
 }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 96069cbbea2..a558dbeb40e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -95,8 +95,10 @@ public:
     bool is_pending_finish() override { return _finish_blocked_dependency() != 
nullptr; }
 
     std::vector<DependencySPtr>& get_downstream_dependency() { return 
_downstream_dependency; }
+    std::map<int, std::shared_ptr<BasicSharedState>>& get_shared_states() { 
return _shared_states; }
 
-    void add_upstream_dependency(std::vector<DependencySPtr>& 
multi_upstream_dependency) {
+    void add_upstream_dependency(std::vector<DependencySPtr>& 
multi_upstream_dependency,
+                                 std::map<int, 
std::shared_ptr<BasicSharedState>>& shared_states) {
         for (auto dep : multi_upstream_dependency) {
             int dst_id = dep->id();
             if (!_upstream_dependency.contains(dst_id)) {
@@ -104,16 +106,31 @@ public:
             } else {
                 _upstream_dependency[dst_id].push_back(dep);
             }
+
+            if (shared_states.contains(dst_id) && 
!_shared_states.contains(dst_id)) {
+                // Shared state is created by upstream task's sink operator 
and shared by source operator of this task.
+                _shared_states.insert({dst_id, shared_states[dst_id]});
+            } else if (_shared_states.contains(dst_id) && 
!shared_states.contains(dst_id)) {
+                // Shared state is created by this task's source operator and 
shared by upstream task's sink operator.
+                shared_states.insert({dst_id, _shared_states[dst_id]});
+            }
         }
     }
 
     std::vector<DependencySPtr>& get_upstream_dependency(int id) {
         if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
-            _upstream_dependency.insert({id, {DependencySPtr {}}});
+            _upstream_dependency.insert({id, {}});
         }
         return _upstream_dependency[id];
     }
 
+    BasicSharedState* get_shared_state(int id) {
+        if (!_shared_states.contains(id)) {
+            return nullptr;
+        }
+        return _shared_states[id].get();
+    }
+
     bool is_pipelineX() const override { return true; }
 
     void wake_up();
@@ -190,9 +207,14 @@ private:
     std::vector<Dependency*> _finish_dependencies;
     RuntimeFilterDependency* _filter_dependency;
 
+    // Write dependencies of upstream pipeline tasks.
     DependencyMap _upstream_dependency;
+    // Read dependencies of this pipeline task.
     std::map<int, DependencySPtr> _source_dependency;
+    // Write dependencies of this pipeline tasks.
     std::vector<DependencySPtr> _downstream_dependency;
+    // All shared states of this pipeline task.
+    std::map<int, std::shared_ptr<BasicSharedState>> _shared_states;
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
                             std::shared_ptr<LocalExchangeSinkDependency>>>
             _le_state_map;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to