This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 89fbda1dacc [refactor](join) Refine broadcast controller (#49556)
89fbda1dacc is described below

commit 89fbda1dacc1510327ab559e9460bda20b7c64dd
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Mar 31 11:15:49 2025 +0800

    [refactor](join) Refine broadcast controller (#49556)
    
    Broadcast controller could be removed on pipelineX engine since we use 
operator and fragment to manage global state shared between instances.
---
 be/src/pipeline/dependency.cpp                     |  10 ++
 be/src/pipeline/dependency.h                       |  51 ++++----
 be/src/pipeline/exec/exchange_sink_operator.cpp    |   2 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 128 +++++++++------------
 be/src/pipeline/exec/hashjoin_build_sink.h         |  22 ++--
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  44 ++++++-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   5 +-
 be/src/pipeline/exec/jdbc_table_sink_operator.cpp  |   3 +-
 .../pipeline/exec/memory_scratch_sink_operator.cpp |   3 +-
 be/src/pipeline/exec/operator.cpp                  |  30 ++++-
 be/src/pipeline/exec/operator.h                    |   8 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   |   3 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |   6 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |   3 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  65 ++++++-----
 be/src/pipeline/pipeline_fragment_context.h        |  18 ++-
 be/src/pipeline/pipeline_task.cpp                  |  26 ++---
 be/src/pipeline/pipeline_task.h                    |  11 +-
 be/src/runtime/fragment_mgr.cpp                    |   1 -
 be/src/runtime/query_context.cpp                   |   2 -
 be/src/runtime/query_context.h                     |   6 -
 be/src/runtime_filter/runtime_filter.h             |   2 +
 be/src/runtime_filter/runtime_filter_producer.h    |  10 --
 .../runtime_filter_producer_helper.cpp             |  12 +-
 .../runtime_filter_producer_helper.h               |   7 +-
 .../vec/runtime/shared_hash_table_controller.cpp   |  70 -----------
 be/src/vec/runtime/shared_hash_table_controller.h  |  96 ----------------
 be/test/olap/wal/wal_manager_test.cpp              |   9 +-
 be/test/pipeline/local_exchanger_test.cpp          |  12 +-
 be/test/pipeline/operator/agg_operator_test.cpp    |   4 +-
 ...istinct_streaming_aggregation_operator_test.cpp |   2 +-
 .../operator/exchange_sink_operator_test.cpp       |   2 +-
 .../operator/exchange_source_operator_test.cpp     |   2 +-
 .../local_merge_sort_source_operator_test.cpp      |   2 +-
 .../partitioned_aggregation_sink_operator_test.cpp |  14 +--
 ...artitioned_aggregation_source_operator_test.cpp |  18 +--
 .../partitioned_aggregation_test_helper.cpp        |   7 +-
 .../partitioned_hash_join_probe_operator_test.cpp  |   5 +-
 .../partitioned_hash_join_sink_operator_test.cpp   |   8 +-
 .../operator/partitioned_hash_join_test_helper.cpp |   7 +-
 be/test/pipeline/operator/repeat_operator_test.cpp |   2 +-
 be/test/pipeline/operator/set_operator_test.cpp    |   6 +-
 be/test/pipeline/operator/sort_operator_test.cpp   |   4 +-
 .../operator/spill_sort_sink_operator_test.cpp     |  10 +-
 .../operator/spill_sort_source_operator_test.cpp   |  10 +-
 .../pipeline/operator/spill_sort_test_helper.cpp   |   7 +-
 be/test/pipeline/operator/union_operator_test.cpp  |   6 +-
 be/test/pipeline/pipeline_test.cpp                 |  16 +--
 .../runtime_filter_producer_helper_test.cpp        |  19 ++-
 be/test/vec/exec/vfile_scanner_exception_test.cpp  |   9 +-
 50 files changed, 368 insertions(+), 457 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 76b950f63a2..276d4d0a0c1 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -33,6 +33,7 @@
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
+
 Dependency* BasicSharedState::create_source_dependency(int operator_id, int 
node_id,
                                                        const std::string& 
name) {
     source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, 
name + "_DEPENDENCY"));
@@ -40,6 +41,15 @@ Dependency* BasicSharedState::create_source_dependency(int 
operator_id, int node
     return source_deps.back().get();
 }
 
+void BasicSharedState::create_source_dependencies(int num_sources, int 
operator_id, int node_id,
+                                                  const std::string& name) {
+    source_deps.resize(num_sources, nullptr);
+    for (auto& source_dep : source_deps) {
+        source_dep = std::make_shared<Dependency>(operator_id, node_id, name + 
"_DEPENDENCY");
+        source_dep->set_shared_state(this);
+    }
+}
+
 Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id,
                                                      const std::string& name) {
     sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name + 
"_DEPENDENCY", true));
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index e31274a8791..0bf0e404253 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -84,8 +84,13 @@ struct BasicSharedState {
     virtual ~BasicSharedState() = default;
 
     Dependency* create_source_dependency(int operator_id, int node_id, const 
std::string& name);
-
+    void create_source_dependencies(int num_sources, int operator_id, int 
node_id,
+                                    const std::string& name);
     Dependency* create_sink_dependency(int dest_id, int node_id, const 
std::string& name);
+    std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
+        DCHECK_LT(channel_id, source_deps.size());
+        return {source_deps[channel_id]};
+    }
 };
 
 class Dependency : public std::enable_shared_from_this<Dependency> {
@@ -110,11 +115,10 @@ public:
     [[nodiscard]] Dependency* is_blocked_by(PipelineTask* task = nullptr);
     // Notify downstream pipeline tasks this dependency is ready.
     virtual void set_ready();
-    void set_ready_to_read() {
-        DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
-        _shared_state->source_deps.front()->set_ready();
+    void set_ready_to_read(int channel_id = 0) {
+        DCHECK_LT(channel_id, _shared_state->source_deps.size()) << 
debug_string();
+        _shared_state->source_deps[channel_id]->set_ready();
     }
-
     void set_ready_to_write() {
         DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
         _shared_state->sink_deps.front()->set_ready();
@@ -593,19 +597,32 @@ struct JoinSharedState : public BasicSharedState {
 
 struct HashJoinSharedState : public JoinSharedState {
     ENABLE_FACTORY_CREATOR(HashJoinSharedState)
-    // mark the join column whether support null eq
-    std::vector<bool> is_null_safe_eq_join;
-
-    // mark the build hash table whether it needs to store null value
-    std::vector<bool> serialize_null_into_key;
+    HashJoinSharedState() {
+        
hash_table_variant_vector.push_back(std::make_shared<JoinDataVariants>());
+    }
+    HashJoinSharedState(int num_instances) {
+        source_deps.resize(num_instances, nullptr);
+        hash_table_variant_vector.resize(num_instances, nullptr);
+        for (int i = 0; i < num_instances; i++) {
+            hash_table_variant_vector[i] = 
std::make_shared<JoinDataVariants>();
+        }
+    }
     std::shared_ptr<vectorized::Arena> arena = 
std::make_shared<vectorized::Arena>();
 
-    // maybe share hash table with other fragment instances
-    std::shared_ptr<JoinDataVariants> hash_table_variants = 
std::make_shared<JoinDataVariants>();
     const std::vector<TupleDescriptor*> build_side_child_desc;
     size_t build_exprs_size = 0;
     std::shared_ptr<vectorized::Block> build_block;
     std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
+
+    // Used by shared hash table
+    // For probe operator, hash table in _hash_table_variants is read-only if 
visited flags is not
+    // used. (visited flags will be used only in right / full outer join).
+    //
+    // For broadcast join, although hash table is read-only, some states in 
`_hash_table_variants`
+    // are still could be written. For example, serialized keys will be 
written in a continuous
+    // memory in `_hash_table_variants`. So before execution, we should use a 
local _hash_table_variants
+    // which has a shared hash table in it.
+    std::vector<std::shared_ptr<JoinDataVariants>> hash_table_variant_vector;
 };
 
 struct PartitionedHashJoinSharedState
@@ -750,13 +767,6 @@ public:
     std::atomic<size_t> _buffer_mem_limit = 
config::local_exchange_buffer_mem_limit;
     // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
-    void create_dependencies(int local_exchange_id) {
-        for (auto& source_dep : source_deps) {
-            source_dep = std::make_shared<Dependency>(local_exchange_id, 
local_exchange_id,
-                                                      
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
-            source_dep->set_shared_state(this);
-        }
-    }
     void sub_running_sink_operators();
     void sub_running_source_operators();
     void _set_always_ready() {
@@ -770,9 +780,6 @@ public:
         }
     }
 
-    std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
-        return {source_deps[channel_id]};
-    }
     Dependency* get_sink_dep_by_channel_id(int channel_id) { return nullptr; }
 
     void set_ready_to_read(int channel_id) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 44f1b00f889..b105985e11a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -267,7 +267,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
         RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
         const TDataStreamSink& sink, const 
std::vector<TPlanFragmentDestination>& destinations,
         const std::vector<TUniqueId>& fragment_instance_ids)
-        : DataSinkOperatorX(operator_id, sink.dest_node_id, 0),
+        : DataSinkOperatorX(operator_id, sink.dest_node_id, 
std::numeric_limits<int>::max()),
           _texprs(sink.output_partition.partition_exprs),
           _row_desc(row_desc),
           _part_type(sink.output_partition.type),
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 15153d1df40..56a6a1a3784 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -43,11 +43,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
+    _task_idx = info.task_idx;
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
 
-    _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
-    _shared_state->serialize_null_into_key = p._serialize_null_into_key;
     _build_expr_ctxs.resize(p._build_expr_ctxs.size());
     for (size_t i = 0; i < _build_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._build_expr_ctxs[i]->clone(state, 
_build_expr_ctxs[i]));
@@ -56,24 +55,20 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
 
     _should_build_hash_table = true;
     profile()->add_info_string("BroadcastJoin", 
std::to_string(p._is_broadcast_join));
-    if (p._is_broadcast_join) {
-        if (state->enable_share_hash_table_for_broadcast_join()) {
-            _should_build_hash_table = info.task_idx == 0;
-            if (_should_build_hash_table) {
-                p._shared_hashtable_controller->set_builder_and_consumers(
-                        state->fragment_instance_id(), p.node_id());
-            }
-        }
+    if (p._use_shared_hash_table) {
+        _should_build_hash_table = info.task_idx == 0;
     }
     profile()->add_info_string("BuildShareHashTable", 
std::to_string(_should_build_hash_table));
-    profile()->add_info_string("ShareHashTableEnabled",
-                               
std::to_string(state->enable_share_hash_table_for_broadcast_join()));
+    profile()->add_info_string("ShareHashTableEnabled", 
std::to_string(p._use_shared_hash_table));
     if (!_should_build_hash_table) {
         _dependency->block();
         _finish_dependency->block();
-        p._shared_hashtable_controller->append_dependency(p.node_id(),
-                                                          
_dependency->shared_from_this(),
-                                                          
_finish_dependency->shared_from_this());
+        {
+            std::lock_guard<std::mutex> guard(p._mutex);
+            p._finish_dependencies.push_back(_finish_dependency);
+        }
+    } else {
+        _dependency->set_ready();
     }
 
     _build_blocks_memory_usage =
@@ -187,7 +182,7 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo
                                                          raw_ptrs, 
block.rows(), true, true,
                                                          bucket_size);
                                              }},
-                       _shared_state->hash_table_variants->method_variant);
+                       
_shared_state->hash_table_variant_vector.front()->method_variant);
         }
     }
     return size_to_reserve;
@@ -197,7 +192,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
     if (_closed) {
         return Status::OK();
     }
-    auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
+    auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     Defer defer {[&]() {
         if (!_should_build_hash_table) {
             return;
@@ -211,12 +206,19 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
 
         if (_shared_state->build_block) {
             // release the memory of unused column in probe stage
-            _shared_state->build_block->clear_column_mem_not_keep(
-                    p._should_keep_column_flags, 
bool(p._shared_hashtable_controller));
+            
_shared_state->build_block->clear_column_mem_not_keep(p._should_keep_column_flags,
+                                                                  
p._use_shared_hash_table);
         }
 
-        if (p._shared_hashtable_controller) {
-            p._shared_hashtable_controller->signal_finish(p.node_id());
+        if (p._use_shared_hash_table) {
+            std::unique_lock(p._mutex);
+            p._signaled = true;
+            for (auto& dep : _shared_state->sink_deps) {
+                dep->set_ready();
+            }
+            for (auto& dep : p._finish_dependencies) {
+                dep->set_ready();
+            }
         }
     }};
 
@@ -226,11 +228,11 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
 
     try {
         RETURN_IF_ERROR(_runtime_filter_producer_helper->process(
-                state, _shared_state->build_block.get(), 
p._shared_hash_table_context));
+                state, _shared_state->build_block.get(), 
p._use_shared_hash_table,
+                p._runtime_filters));
     } catch (Exception& e) {
-        bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
-                                                   
p._shared_hashtable_controller &&
-                                                   
!p._shared_hash_table_context->signaled;
+        bool blocked_by_shared_hash_table_signal =
+                !_should_build_hash_table && p._use_shared_hash_table && 
!p._signaled;
 
         return Status::InternalError(
                 "rf process meet error: {}, wake_up_early: {}, 
should_build_hash_table: "
@@ -309,15 +311,18 @@ std::vector<uint16_t> 
HashJoinBuildSinkLocalState::_convert_block_to_null(
 Status HashJoinBuildSinkLocalState::_extract_join_column(
         vectorized::Block& block, vectorized::ColumnUInt8::MutablePtr& 
null_map,
         vectorized::ColumnRawPtrs& raw_ptrs, const std::vector<int>& 
res_col_ids) {
+    DCHECK(_should_build_hash_table);
     auto& shared_state = *_shared_state;
     for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
         const auto* column = 
block.get_by_position(res_col_ids[i]).column.get();
-        if (!column->is_nullable() && shared_state.serialize_null_into_key[i]) 
{
+        if (!column->is_nullable() &&
+            
_parent->cast<HashJoinBuildSinkOperatorX>()._serialize_null_into_key[i]) {
             _key_columns_holder.emplace_back(
                     
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
             raw_ptrs[i] = _key_columns_holder.back().get();
         } else if (const auto* nullable = 
check_and_get_column<vectorized::ColumnNullable>(*column);
-                   !shared_state.serialize_null_into_key[i] && nullable) {
+                   
!_parent->cast<HashJoinBuildSinkOperatorX>()._serialize_null_into_key[i] &&
+                   nullable) {
             // update nulllmap and split nested out of ColumnNullable when 
serialize_null_into_key is false and column is nullable
             const auto& col_nested = nullable->get_nested_column();
             const auto& col_nullmap = nullable->get_null_map_data();
@@ -333,6 +338,7 @@ Status HashJoinBuildSinkLocalState::_extract_join_column(
 
 Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                                                         vectorized::Block& 
block) {
+    DCHECK(_should_build_hash_table);
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     SCOPED_TIMER(_build_table_timer);
     size_t rows = block.rows();
@@ -399,7 +405,8 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                                                       
arg.serialized_keys_size(true)));
                         return st;
                     }},
-            _shared_state->hash_table_variants->method_variant, 
_shared_state->join_op_variants,
+            _shared_state->hash_table_variant_vector.front()->method_variant,
+            _shared_state->join_op_variants,
             
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side),
             vectorized::make_bool_variant((p._have_other_join_conjunct)));
     return st;
@@ -407,6 +414,7 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
 
 void HashJoinBuildSinkLocalState::_set_build_side_has_external_nullmap(
         vectorized::Block& block, const std::vector<int>& res_col_ids) {
+    DCHECK(_should_build_hash_table);
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     if (p._short_circuit_for_null_in_build_side) {
         _build_side_has_external_nullmap = true;
@@ -414,7 +422,7 @@ void 
HashJoinBuildSinkLocalState::_set_build_side_has_external_nullmap(
     }
     for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) {
         const auto* column = 
block.get_by_position(res_col_ids[i]).column.get();
-        if (column->is_nullable() && 
!_shared_state->serialize_null_into_key[i]) {
+        if (column->is_nullable() && !p._serialize_null_into_key[i]) {
             _build_side_has_external_nullmap = true;
             return;
         }
@@ -438,8 +446,10 @@ Status 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
     if (_build_expr_ctxs.size() == 1) {
         p._should_keep_hash_key_column = true;
     }
-    return 
init_hash_method<JoinDataVariants>(_shared_state->hash_table_variants.get(), 
data_types,
-                                              true);
+    return init_hash_method<JoinDataVariants>(
+            _shared_state->hash_table_variant_vector[p._use_shared_hash_table 
? _task_idx : 0]
+                    .get(),
+            data_types, true);
 }
 
 HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int 
operator_id,
@@ -514,13 +524,8 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
 
 Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
     
RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::prepare(state));
-    if (_is_broadcast_join) {
-        if (state->enable_share_hash_table_for_broadcast_join()) {
-            _shared_hashtable_controller =
-                    state->get_query_ctx()->get_shared_hash_table_controller();
-            _shared_hash_table_context = 
_shared_hashtable_controller->get_context(node_id());
-        }
-    }
+    _use_shared_hash_table =
+            _is_broadcast_join && 
state->enable_share_hash_table_for_broadcast_join();
     auto init_keep_column_flags = [&](auto& tuple_descs, auto& 
output_slot_flags) {
         for (const auto& tuple_desc : tuple_descs) {
             for (const auto& slot_desc : tuple_desc->slots()) {
@@ -593,39 +598,18 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
         RETURN_IF_ERROR(
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
-        if (_shared_hashtable_controller) {
-            _shared_hash_table_context->status = Status::OK();
-            // arena will be shared with other instances.
-            _shared_hash_table_context->arena = 
local_state._shared_state->arena;
-            _shared_hash_table_context->hash_table_variants =
-                    local_state._shared_state->hash_table_variants;
-            _shared_hash_table_context->short_circuit_for_null_in_probe_side =
-                    local_state._shared_state->_has_null_in_build_side;
-            _shared_hash_table_context->block = 
local_state._shared_state->build_block;
-            _shared_hash_table_context->build_indexes_null =
-                    local_state._shared_state->build_indexes_null;
-        }
+        local_state.init_short_circuit_for_probe();
     } else if (!local_state._should_build_hash_table) {
-        DCHECK(_shared_hashtable_controller != nullptr);
-        DCHECK(_shared_hash_table_context != nullptr);
         // the instance which is not build hash table, it's should wait the 
signal of hash table build finished.
         // but if it's running and signaled == false, maybe the source 
operator have closed caused by some short circuit
         // return eof will make task marked as wake_up_early
         // todo: remove signaled after we can guarantee that wake up eraly is 
always set accurately
-        if (!_shared_hash_table_context->signaled || 
state->get_task()->wake_up_early()) {
+        if (!_signaled || state->get_task()->wake_up_early()) {
             return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
         }
 
-        if (!_shared_hash_table_context->status.ok()) {
-            return _shared_hash_table_context->status;
-        }
-
-        local_state.profile()->add_info_string(
-                "SharedHashTableFrom",
-                print_id(
-                        
_shared_hashtable_controller->get_builder_fragment_instance_id(node_id())));
-        local_state._shared_state->_has_null_in_build_side =
-                
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
+        DCHECK_LE(local_state._task_idx,
+                  local_state._shared_state->hash_table_variant_vector.size());
         std::visit(
                 [](auto&& dst, auto&& src) {
                     if constexpr (!std::is_same_v<std::monostate, 
std::decay_t<decltype(dst)>> &&
@@ -634,20 +618,18 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                         dst.hash_table = src.hash_table;
                     }
                 },
-                local_state._shared_state->hash_table_variants->method_variant,
-                std::static_pointer_cast<JoinDataVariants>(
-                        _shared_hash_table_context->hash_table_variants)
-                        ->method_variant);
-
-        local_state._shared_state->build_block = 
_shared_hash_table_context->block;
-        local_state._shared_state->build_indexes_null =
-                _shared_hash_table_context->build_indexes_null;
+                
local_state._shared_state->hash_table_variant_vector[local_state._task_idx]
+                        ->method_variant,
+                
local_state._shared_state->hash_table_variant_vector.front()->method_variant);
     }
 
     if (eos) {
         local_state._eos = true;
-        local_state.init_short_circuit_for_probe();
-        local_state._dependency->set_ready_to_read();
+        // If a shared hash table is used, states are shared by all tasks.
+        // Sink and source has n-n relationship If a shared hash table is used 
otherwise 1-1 relationship.
+        // So we should notify the `_task_idx` source task if a shared hash 
table is used.
+        local_state._dependency->set_ready_to_read(_use_shared_hash_table ? 
local_state._task_idx
+                                                                          : 0);
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 7ac62160bbd..ca743069726 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -54,8 +54,6 @@ public:
 
     Status close(RuntimeState* state, Status exec_status) override;
 
-    Status disable_runtime_filters(RuntimeState* state);
-
     [[nodiscard]] MOCK_FUNCTION size_t get_reserve_mem_size(RuntimeState* 
state, bool eos);
 
 protected:
@@ -82,6 +80,7 @@ protected:
 
     size_t _evaluate_mem_usage = 0;
     size_t _build_side_rows = 0;
+    int _task_idx;
 
     vectorized::MutableBlock _build_side_mutable_block;
     std::shared_ptr<RuntimeFilterProducerHelper> 
_runtime_filter_producer_helper;
@@ -154,6 +153,7 @@ public:
         return _join_distribution != TJoinDistributionType::BROADCAST &&
                _join_distribution != TJoinDistributionType::NONE;
     }
+    std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
 
 private:
     friend class HashJoinBuildSinkLocalState;
@@ -168,9 +168,6 @@ private:
     std::vector<bool> _is_null_safe_eq_join;
 
     bool _is_broadcast_join = false;
-    std::shared_ptr<vectorized::SharedHashTableController> 
_shared_hashtable_controller;
-
-    vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
     const std::vector<TExpr> _partition_exprs;
 
     std::vector<SlotId> _hash_output_slot_ids;
@@ -179,6 +176,12 @@ private:
     // if build side has variant column and need output variant column
     // need to finalize variant column to speed up the join op
     bool _need_finalize_variant_column = false;
+
+    bool _use_shared_hash_table = false;
+    std::atomic<bool> _signaled = false;
+    std::mutex _mutex;
+    std::vector<std::shared_ptr<pipeline::Dependency>> _finish_dependencies;
+    std::map<int, std::shared_ptr<RuntimeFilterWrapper>> _runtime_filters;
 };
 
 template <class HashTableContext>
@@ -226,8 +229,13 @@ struct ProcessHashTableBuild {
             with_other_conjuncts) {
             // null aware join with other conjuncts
             keep_null_key = true;
-        } else if (_parent->_shared_state->is_null_safe_eq_join.size() == 1 &&
-                   _parent->_shared_state->is_null_safe_eq_join[0]) {
+        } else if (_parent->parent()
+                                   ->cast<HashJoinBuildSinkOperatorX>()
+                                   .is_null_safe_eq_join()
+                                   .size() == 1 &&
+                   _parent->parent()
+                           ->cast<HashJoinBuildSinkOperatorX>()
+                           .is_null_safe_eq_join()[0]) {
             // single null safe eq
             keep_null_key = true;
         }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 9d8f3544ba2..ab52f01fa5b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -38,6 +38,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
+    _task_idx = info.task_idx;
     auto& p = _parent->cast<HashJoinProbeOperatorX>();
     _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
     for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
@@ -124,7 +125,8 @@ bool 
HashJoinProbeLocalState::_need_probe_null_map(vectorized::Block& block,
                                                    const std::vector<int>& 
res_col_ids) {
     for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
         const auto* column = 
block.get_by_position(res_col_ids[i]).column.get();
-        if (column->is_nullable() && 
!_shared_state->serialize_null_into_key[i]) {
+        if (column->is_nullable() &&
+            
!_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) {
             return true;
         }
     }
@@ -239,7 +241,11 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                         st = Status::InternalError("uninited hash table 
probe");
                     }
                 },
-                local_state._shared_state->hash_table_variants->method_variant,
+                local_state._shared_state->hash_table_variant_vector.size() == 
1
+                        ? 
local_state._shared_state->hash_table_variant_vector[0]->method_variant
+                        : local_state._shared_state
+                                  
->hash_table_variant_vector[local_state._task_idx]
+                                  ->method_variant,
                 *local_state._process_hashtable_ctx_variants);
     } else if (local_state._probe_eos) {
         if (_is_right_semi_anti || (_is_outer_join && _join_op != 
TJoinOp::LEFT_OUTER_JOIN)) {
@@ -258,7 +264,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                             st = Status::InternalError("uninited hash table 
probe");
                         }
                     },
-                    
local_state._shared_state->hash_table_variants->method_variant,
+                    
local_state._shared_state->hash_table_variant_vector.size() == 1
+                            ? 
local_state._shared_state->hash_table_variant_vector[0]
+                                      ->method_variant
+                            : local_state._shared_state
+                                      
->hash_table_variant_vector[local_state._task_idx]
+                                      ->method_variant,
                     *local_state._process_hashtable_ctx_variants);
         } else {
             *eos = true;
@@ -311,12 +322,14 @@ Status 
HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
     auto& shared_state = *_shared_state;
     for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
         const auto* column = 
block.get_by_position(res_col_ids[i]).column.get();
-        if (!column->is_nullable() && shared_state.serialize_null_into_key[i]) 
{
+        if (!column->is_nullable() &&
+            
_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) {
             _key_columns_holder.emplace_back(
                     
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
             _probe_columns[i] = _key_columns_holder.back().get();
         } else if (const auto* nullable = 
check_and_get_column<vectorized::ColumnNullable>(*column);
-                   nullable && !shared_state.serialize_null_into_key[i]) {
+                   nullable &&
+                   
!_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) {
             // update nulllmap and split nested out of ColumnNullable when 
serialize_null_into_key is false and column is nullable
             const auto& col_nested = nullable->get_nested_column();
             const auto& col_nullmap = nullable->get_null_map_data();
@@ -429,6 +442,27 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state)
         vectorized::VExprContextSPtr ctx;
         
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, 
ctx));
         _probe_expr_ctxs.push_back(ctx);
+
+        /// null safe equal means null = null is true, the operator in SQL 
should be: <=>.
+        const bool is_null_safe_equal =
+                eq_join_conjunct.__isset.opcode &&
+                (eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL) &&
+                // For a null safe equal join, FE may generate a plan that
+                // both sides of the conjuct are not nullable, we just treat it
+                // as a normal equal join conjunct.
+                (eq_join_conjunct.right.nodes[0].is_nullable ||
+                 eq_join_conjunct.left.nodes[0].is_nullable);
+
+        if (eq_join_conjuncts.size() == 1) {
+            // single column key serialize method must use nullmap for 
represent null to instead serialize null into key
+            _serialize_null_into_key.emplace_back(false);
+        } else if (is_null_safe_equal) {
+            // use serialize null into key to represent multi column null value
+            _serialize_null_into_key.emplace_back(true);
+        } else {
+            // on normal conditions, because null!=null, it can be expressed 
directly with nullmap.
+            _serialize_null_into_key.emplace_back(false);
+        }
     }
 
     if (tnode.hash_join_node.__isset.other_join_conjuncts &&
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index eaa0b9376ca..3914fc0d58d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -106,7 +106,7 @@ private:
     std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
             std::make_unique<HashTableCtxVariants>();
 
-    ssize_t _estimated_mem_in_push = -1;
+    int _task_idx;
 
     RuntimeProfile::Counter* _probe_expr_call_timer = nullptr;
     RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
@@ -142,6 +142,7 @@ public:
                                                      _partition_exprs)
                                   : 
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
     }
+    bool is_broadcast_join() const { return _is_broadcast_join; }
 
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
@@ -172,6 +173,8 @@ private:
     vectorized::VExprContextSPtrs _other_join_conjuncts;
 
     vectorized::VExprContextSPtrs _mark_join_conjuncts;
+    // mark the build hash table whether it needs to store null value
+    std::vector<bool> _serialize_null_into_key;
 
     // probe expr
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp 
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index d0abe6aa0d2..c213616e25c 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -28,7 +28,8 @@ namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc, 
int operator_id,
                                                const std::vector<TExpr>& 
t_output_expr)
-        : DataSinkOperatorX(operator_id, 0, 0),
+        : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+                            std::numeric_limits<int>::max()),
           _row_desc(row_desc),
           _t_output_expr(t_output_expr) {}
 
diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp 
b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
index 86afd607432..fe67703bfe9 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
@@ -66,7 +66,8 @@ Status MemoryScratchSinkLocalState::close(RuntimeState* 
state, Status exec_statu
 MemoryScratchSinkOperatorX::MemoryScratchSinkOperatorX(const RowDescriptor& 
row_desc,
                                                        int operator_id,
                                                        const 
std::vector<TExpr>& t_output_expr)
-        : DataSinkOperatorX(operator_id, 0, 0),
+        : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+                            std::numeric_limits<int>::max()),
           _row_desc(row_desc),
           _t_output_expr(t_output_expr) {}
 
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index a82207c92c1..1f0d40069ed 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -470,14 +470,18 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     info.parent_profile->add_child(_runtime_profile.get(), /*indent=*/false, 
nullptr);
     constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, 
FakeSharedState>;
     if constexpr (!is_fake_shared) {
-        if constexpr (std::is_same_v<LocalExchangeSharedState, 
SharedStateArg>) {
-            DCHECK(info.le_state_map.find(_parent->operator_id()) != 
info.le_state_map.end());
-            _shared_state = 
info.le_state_map.at(_parent->operator_id()).first.get();
+        if (info.shared_state_map.find(_parent->operator_id()) != 
info.shared_state_map.end()) {
+            _shared_state = info.shared_state_map.at(_parent->operator_id())
+                                    .first.get()
+                                    ->template cast<SharedStateArg>();
 
             _dependency = 
_shared_state->get_dep_by_channel_id(info.task_idx).front().get();
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                     _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time", 1);
         } else if (info.shared_state) {
+            if constexpr (std::is_same_v<LocalExchangeSharedState, 
SharedStateArg>) {
+                DCHECK(false);
+            }
             // For UnionSourceOperator without children, there is no shared 
state.
             _shared_state = info.shared_state->template cast<SharedStateArg>();
 
@@ -485,6 +489,10 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
                     _parent->operator_id(), _parent->node_id(), 
_parent->get_name());
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                     _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time", 1);
+        } else {
+            if constexpr (std::is_same_v<LocalExchangeSharedState, 
SharedStateArg>) {
+                DCHECK(false);
+            }
         }
     }
 
@@ -543,11 +551,21 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
     _wait_for_finish_dependency_timer = ADD_TIMER(_profile, 
"PendingFinishDependency");
     constexpr auto is_fake_shared = std::is_same_v<SharedState, 
FakeSharedState>;
     if constexpr (!is_fake_shared) {
-        if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
-            DCHECK(info.le_state_map.find(_parent->dests_id().front()) != 
info.le_state_map.end());
-            _dependency = 
info.le_state_map.at(_parent->dests_id().front()).second.get();
+        if (info.shared_state_map.find(_parent->dests_id().front()) !=
+            info.shared_state_map.end()) {
+            if constexpr (std::is_same_v<LocalExchangeSharedState, 
SharedState>) {
+                
DCHECK(info.shared_state_map.at(_parent->dests_id().front()).second.size() == 
1);
+            }
+            _dependency = info.shared_state_map.at(_parent->dests_id().front())
+                                  
.second[std::is_same_v<LocalExchangeSharedState, SharedState>
+                                                  ? 0
+                                                  : info.task_idx]
+                                  .get();
             _shared_state = _dependency->shared_state()->template 
cast<SharedState>();
         } else {
+            if constexpr (std::is_same_v<LocalExchangeSharedState, 
SharedState>) {
+                DCHECK(false);
+            }
             _shared_state = info.shared_state->template cast<SharedState>();
             _dependency = _shared_state->create_sink_dependency(
                     _parent->dests_id().front(), _parent->node_id(), 
_parent->get_name());
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index fa8849640b6..ad982e509f2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -69,8 +69,8 @@ struct LocalStateInfo {
     RuntimeProfile* parent_profile = nullptr;
     const std::vector<TScanRangeParams>& scan_ranges;
     BasicSharedState* shared_state;
-    const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
-                                  std::shared_ptr<Dependency>>>& le_state_map;
+    const std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                  std::vector<std::shared_ptr<Dependency>>>>& 
shared_state_map;
     const int task_idx;
 };
 
@@ -80,8 +80,8 @@ struct LocalSinkStateInfo {
     RuntimeProfile* parent_profile = nullptr;
     const int sender_id;
     BasicSharedState* shared_state;
-    const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
-                                  std::shared_ptr<Dependency>>>& le_state_map;
+    const std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                  std::vector<std::shared_ptr<Dependency>>>>& 
shared_state_map;
     const TDataSink& tsink;
 };
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index c6a024d99a3..79b74d4c313 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -180,7 +180,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
     if (auto* tmp_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state()) {
         inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state);
     }
-    _shared_state->inner_shared_state->hash_table_variants.reset();
+    
DCHECK_EQ(_shared_state->inner_shared_state->hash_table_variant_vector.size(), 
1);
+    
_shared_state->inner_shared_state->hash_table_variant_vector.front().reset();
     if (inner_sink_state) {
         COUNTER_UPDATE(_memory_used_counter,
                        -(inner_sink_state->_hash_table_memory_usage->value() +
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index de2e2922b26..0e2667c752f 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -36,7 +36,8 @@ 
ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent
 
 ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const 
RowDescriptor& row_desc,
                                                  const std::vector<TExpr>& 
t_output_expr)
-        : DataSinkOperatorX(operator_id, 0, 0),
+        : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+                            std::numeric_limits<int>::max()),
           _row_desc(row_desc),
           _t_output_expr(t_output_expr) {}
 
@@ -44,7 +45,8 @@ ResultFileSinkOperatorX::ResultFileSinkOperatorX(
         int operator_id, const RowDescriptor& row_desc, const TResultFileSink& 
sink,
         const std::vector<TPlanFragmentDestination>& destinations,
         const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs)
-        : DataSinkOperatorX(operator_id, 0, 0),
+        : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+                            std::numeric_limits<int>::max()),
           _row_desc(row_desc),
           _t_output_expr(t_output_expr),
           _dests(destinations),
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 9f3647b93fb..6ad81a2673f 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -101,7 +101,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
 ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& 
row_desc,
                                          const std::vector<TExpr>& 
t_output_expr,
                                          const TResultSink& sink)
-        : DataSinkOperatorX(operator_id, 0, 0),
+        : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+                            std::numeric_limits<int>::max()),
           _sink_type(!sink.__isset.type || sink.type == 
TResultSinkType::MYSQL_PROTOCAL
                              ? TResultSinkType::MYSQL_PROTOCAL
                              : sink.type),
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index e62addd95ad..b782474a3a8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -159,7 +159,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     _runtime_state.reset();
     _runtime_filter_states.clear();
     _runtime_filter_mgr_map.clear();
-    _op_id_to_le_state.clear();
+    _op_id_to_shared_state.clear();
 }
 
 bool PipelineFragmentContext::is_timeout(timespec now) const {
@@ -381,23 +381,26 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
                 std::make_unique<RuntimeFilterMgr>(request.query_id, 
_runtime_filter_states[i],
                                                    
_query_ctx->query_mem_tracker(), false);
         std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
-        auto get_local_exchange_state = [&](PipelinePtr pipeline)
-                -> std::map<int, 
std::pair<std::shared_ptr<LocalExchangeSharedState>,
-                                           std::shared_ptr<Dependency>>> {
-            std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
-                                    std::shared_ptr<Dependency>>>
-                    le_state_map;
-            auto source_id = pipeline->operators().front()->operator_id();
-            if (auto iter = _op_id_to_le_state.find(source_id); iter != 
_op_id_to_le_state.end()) {
-                le_state_map.insert({source_id, iter->second});
+        auto get_shared_state = [&](PipelinePtr pipeline)
+                -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                           
std::vector<std::shared_ptr<Dependency>>>> {
+            std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                    std::vector<std::shared_ptr<Dependency>>>>
+                    shared_state_map;
+            for (auto& op : pipeline->operators()) {
+                auto source_id = op->operator_id();
+                if (auto iter = _op_id_to_shared_state.find(source_id);
+                    iter != _op_id_to_shared_state.end()) {
+                    shared_state_map.insert({source_id, iter->second});
+                }
             }
             for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
-                if (auto iter = _op_id_to_le_state.find(sink_to_source_id);
-                    iter != _op_id_to_le_state.end()) {
-                    le_state_map.insert({sink_to_source_id, iter->second});
+                if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
+                    iter != _op_id_to_shared_state.end()) {
+                    shared_state_map.insert({sink_to_source_id, iter->second});
                 }
             }
-            return le_state_map;
+            return shared_state_map;
         };
 
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
@@ -449,10 +452,9 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
                 auto cur_task_id = _total_tasks++;
                 task_runtime_state->set_task_id(cur_task_id);
                 task_runtime_state->set_task_num(pipeline->num_tasks());
-                auto task = std::make_unique<PipelineTask>(pipeline, 
cur_task_id,
-                                                           
task_runtime_state.get(), this,
-                                                           
pipeline_id_to_profile[pip_idx].get(),
-                                                           
get_local_exchange_state(pipeline), i);
+                auto task = std::make_unique<PipelineTask>(
+                        pipeline, cur_task_id, task_runtime_state.get(), this,
+                        pipeline_id_to_profile[pip_idx].get(), 
get_shared_state(pipeline), i);
                 pipeline->incr_created_tasks(i, task.get());
                 task_runtime_state->set_task(task.get());
                 pipeline_id_to_task.insert({pipeline->id(), task.get()});
@@ -552,7 +554,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
         }
     }
     _pipeline_parent_map.clear();
-    _op_id_to_le_state.clear();
+    _op_id_to_shared_state.clear();
 
     return Status::OK();
 }
@@ -821,11 +823,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
         return Status::InternalError("Unsupported local exchange type : " +
                                      
std::to_string((int)data_distribution.distribution_type));
     }
-    auto sink_dep = std::make_shared<Dependency>(sink_id, local_exchange_id,
-                                                 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
-    sink_dep->set_shared_state(shared_state.get());
-    shared_state->sink_deps.push_back(sink_dep);
-    _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
+    shared_state->create_source_dependencies(_num_instances, 
local_exchange_id, local_exchange_id,
+                                             "LOCAL_EXCHANGE_OPERATOR");
+    shared_state->create_sink_dependency(sink_id, local_exchange_id, 
"LOCAL_EXCHANGE_SINK");
+    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, 
shared_state->sink_deps}});
 
     // 3. Set two pipelines' operator list. For example, split pipeline [Scan 
- AggSink] to
     // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource 
- AggSink].
@@ -848,8 +849,6 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     }
     operators.insert(operators.begin(), source_op);
 
-    shared_state->create_dependencies(local_exchange_id);
-
     // 5. Set children for two pipelines separately.
     std::vector<std::shared_ptr<Pipeline>> new_children;
     std::vector<PipelineId> edges_with_source;
@@ -1431,6 +1430,20 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
             op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
         }
+        if (is_broadcast_join && 
_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+            std::shared_ptr<HashJoinSharedState> shared_state =
+                    HashJoinSharedState::create_shared(_num_instances);
+            for (int i = 0; i < _num_instances; i++) {
+                auto sink_dep = 
std::make_shared<Dependency>(op->operator_id(), op->node_id(),
+                                                             
"HASH_JOIN_BUILD_DEPENDENCY");
+                sink_dep->set_shared_state(shared_state.get());
+                shared_state->sink_deps.push_back(sink_dep);
+            }
+            shared_state->create_source_dependencies(_num_instances, 
op->operator_id(),
+                                                     op->node_id(), 
"HASH_JOIN_PROBE");
+            _op_id_to_shared_state.insert(
+                    {op->operator_id(), {shared_state, 
shared_state->sink_deps}});
+        }
         _require_bucket_distribution =
                 _require_bucket_distribution || 
op->require_data_distribution();
         break;
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 992f86bc76d..587a1989fa7 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -279,8 +279,22 @@ private:
 
     int _operator_id = 0;
     int _sink_operator_id = 0;
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            _op_id_to_le_state;
+    /**
+     * Some states are shared by tasks in different instances (e.g. local 
exchange , broadcast join).
+     *
+     * local exchange sink 0 ->                               -> local 
exchange source 0
+     *                            LocalExchangeSharedState
+     * local exchange sink 1 ->                               -> local 
exchange source 1
+     *
+     * hash join build sink 0 ->                               -> hash join 
build source 0
+     *                              HashJoinSharedState
+     * hash join build sink 1 ->                               -> hash join 
build source 1
+     *
+     * So we should keep states here.
+     */
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            _op_id_to_shared_state;
 
     std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
     std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 0db8a26efca..bfcfa03070e 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -53,13 +53,13 @@ class RuntimeState;
 
 namespace doris::pipeline {
 
-PipelineTask::PipelineTask(
-        PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
-        PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile,
-        std::map<int,
-                 std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-                le_state_map,
-        int task_idx)
+PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, 
RuntimeState* state,
+                           PipelineFragmentContext* fragment_context,
+                           RuntimeProfile* parent_profile,
+                           std::map<int, 
std::pair<std::shared_ptr<BasicSharedState>,
+                                                   
std::vector<std::shared_ptr<Dependency>>>>
+                                   shared_state_map,
+                           int task_idx)
         : _index(task_id),
           _pipeline(pipeline),
           _opened(false),
@@ -70,7 +70,7 @@ PipelineTask::PipelineTask(
           _source(_operators.front().get()),
           _root(_operators.back().get()),
           _sink(pipeline->sink_shared_pointer()),
-          _le_state_map(std::move(le_state_map)),
+          _shared_state_map(std::move(shared_state_map)),
           _task_idx(task_idx),
           _execution_dep(state->get_query_ctx()->get_execution_dependency()),
           _memory_sufficient_dependency(
@@ -96,9 +96,9 @@ Status PipelineTask::prepare(const 
std::vector<TScanRangeParams>& scan_range, co
     });
     {
         // set sink local state
-        LocalSinkStateInfo info {_task_idx,     _task_profile.get(),
-                                 sender_id,     get_sink_shared_state().get(),
-                                 _le_state_map, tsink};
+        LocalSinkStateInfo info {_task_idx,         _task_profile.get(),
+                                 sender_id,         
get_sink_shared_state().get(),
+                                 _shared_state_map, tsink};
         RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
     }
 
@@ -108,7 +108,7 @@ Status PipelineTask::prepare(const 
std::vector<TScanRangeParams>& scan_range, co
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
         LocalStateInfo info {parent_profile, _scan_ranges, 
get_op_shared_state(op->operator_id()),
-                             _le_state_map, _task_idx};
+                             _shared_state_map, _task_idx};
         RETURN_IF_ERROR(op->setup_local_state(_state, info));
         parent_profile = _state->get_local_state(op->operator_id())->profile();
     }
@@ -604,7 +604,7 @@ Status PipelineTask::finalize() {
     RETURN_IF_ERROR(_state_transition(State::FINALIZED));
     _sink_shared_state.reset();
     _op_shared_states.clear();
-    _le_state_map.clear();
+    _shared_state_map.clear();
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 0c39f09c90b..a4fd3b4eac9 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -48,9 +48,9 @@ class PipelineTask {
 public:
     PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
                  PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile,
-                 std::map<int, 
std::pair<std::shared_ptr<LocalExchangeSharedState>,
-                                         std::shared_ptr<Dependency>>>
-                         le_state_map,
+                 std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                         
std::vector<std::shared_ptr<Dependency>>>>
+                         shared_state_map,
                  int task_idx);
 
     Status prepare(const std::vector<TScanRangeParams>& scan_range, const int 
sender_id,
@@ -284,8 +284,9 @@ private:
     std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
     std::shared_ptr<BasicSharedState> _sink_shared_state;
     std::vector<TScanRangeParams> _scan_ranges;
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            _le_state_map;
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            _shared_state_map;
     int _task_idx;
     bool _dry_run = false;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9b0cbb4665c..233175005d4 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -90,7 +90,6 @@
 #include "util/threadpool.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
-#include "vec/runtime/shared_hash_table_controller.h"
 
 namespace doris {
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index fb6eee2bfd6..76fd17e7d94 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -125,7 +125,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
     _init_resource_context();
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
     _query_watcher.start();
-    _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
     _execution_dependency =
             pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", 
false);
     _memory_sufficient_dependency =
@@ -264,7 +263,6 @@ QueryContext::~QueryContext() {
 #endif
     _runtime_filter_mgr.reset();
     _execution_dependency.reset();
-    _shared_hash_table_controller.reset();
     _runtime_predicates.clear();
     file_scan_range_params_map.clear();
     obj_pool.clear();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 84128875176..53f008438ec 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -41,7 +41,6 @@
 #include "util/hash_util.hpp"
 #include "util/threadpool.h"
 #include "vec/exec/scan/scanner_scheduler.h"
-#include "vec/runtime/shared_hash_table_controller.h"
 #include "workload_group/workload_group.h"
 
 namespace doris {
@@ -185,10 +184,6 @@ public:
 
     void set_ready_to_execute_only();
 
-    std::shared_ptr<vectorized::SharedHashTableController> 
get_shared_hash_table_controller() {
-        return _shared_hash_table_controller;
-    }
-
     bool has_runtime_predicate(int source_node_id) {
         return _runtime_predicates.contains(source_node_id);
     }
@@ -414,7 +409,6 @@ private:
     void _init_resource_context();
     void _init_query_mem_tracker();
 
-    std::shared_ptr<vectorized::SharedHashTableController> 
_shared_hash_table_controller;
     std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
 
     std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
diff --git a/be/src/runtime_filter/runtime_filter.h 
b/be/src/runtime_filter/runtime_filter.h
index 0a66bc02ba0..06b12a69835 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -82,6 +82,8 @@ public:
     }
 
     virtual std::string debug_string() const = 0;
+    std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; }
+    void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper 
= wrapper; }
 
 protected:
     RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* 
desc)
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index bfb9239a0b4..7a56d97189d 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -22,7 +22,6 @@
 #include "pipeline/dependency.h"
 #include "runtime/query_context.h"
 #include "runtime_filter/runtime_filter.h"
-#include "vec/runtime/shared_hash_table_controller.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -109,15 +108,6 @@ public:
         }
     }
 
-    void copy_to_shared_context(const vectorized::SharedHashTableContextPtr& 
context) {
-        DCHECK(!context->runtime_filters.contains(_wrapper->filter_id()));
-        context->runtime_filters[_wrapper->filter_id()] = _wrapper;
-    }
-    void copy_from_shared_context(const vectorized::SharedHashTableContextPtr& 
context) {
-        DCHECK(context->runtime_filters.contains(_wrapper->filter_id()));
-        _wrapper = context->runtime_filters[_wrapper->filter_id()];
-    }
-
     bool set_state(State state) {
         std::unique_lock<std::mutex> l(_mtx);
         if (_rf_state == State::PUBLISHED ||
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index 24861b61b92..b9f3caca364 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -85,8 +85,8 @@ Status RuntimeFilterProducerHelper::_publish(RuntimeState* 
state) {
 }
 
 Status RuntimeFilterProducerHelper::process(
-        RuntimeState* state, const vectorized::Block* block,
-        const vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) {
+        RuntimeState* state, const vectorized::Block* block, bool 
use_shared_table,
+        std::map<int, std::shared_ptr<RuntimeFilterWrapper>>& runtime_filters) 
{
     if (_skip_runtime_filters_process) {
         return Status::OK();
     }
@@ -106,12 +106,14 @@ Status RuntimeFilterProducerHelper::process(
     }
 
     for (const auto& filter : _producers) {
-        if (shared_hash_table_ctx && !wake_up_early) {
+        if (use_shared_table && !wake_up_early) {
             DCHECK(_is_broadcast_join);
             if (_should_build_hash_table) {
-                filter->copy_to_shared_context(shared_hash_table_ctx);
+                
DCHECK(!runtime_filters.contains(filter->wrapper()->filter_id()));
+                runtime_filters[filter->wrapper()->filter_id()] = 
filter->wrapper();
             } else {
-                filter->copy_from_shared_context(shared_hash_table_ctx);
+                
DCHECK(runtime_filters.contains(filter->wrapper()->filter_id()));
+                
filter->set_wrapper(runtime_filters[filter->wrapper()->filter_id()]);
             }
         }
         filter->set_wrapper_state_and_ready_to_publish(wrapper_state);
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h 
b/be/src/runtime_filter/runtime_filter_producer_helper.h
index 3a703c9f591..fc8944ea3cb 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.h
@@ -25,13 +25,12 @@
 #include "runtime_filter/runtime_filter_producer.h"
 #include "vec/core/block.h" // IWYU pragma: keep
 #include "vec/exprs/vexpr_context.h"
-#include "vec/runtime/shared_hash_table_controller.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
 // this class used in hash join node
 /**
- * init -> (skip_runtime_filters ->) send_filter_size -> process
+ * init -> (skip_process ->) send_filter_size -> (share_filters ->) process
  */
 class RuntimeFilterProducerHelper {
 public:
@@ -64,8 +63,8 @@ public:
     MOCK_FUNCTION Status skip_process(RuntimeState* state);
 
     // build rf's predicate and publish rf
-    Status process(RuntimeState* state, const vectorized::Block* block,
-                   const vectorized::SharedHashTableContextPtr& 
shared_hash_table_ctx);
+    Status process(RuntimeState* state, const vectorized::Block* block, bool 
use_shared_table,
+                   std::map<int, std::shared_ptr<RuntimeFilterWrapper>>& 
runtime_filters);
 
 protected:
     virtual void _init_expr(const vectorized::VExprContextSPtrs& 
build_expr_ctxs,
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
deleted file mode 100644
index 286f32bb38b..00000000000
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "shared_hash_table_controller.h"
-
-#include <glog/logging.h>
-#include <runtime/runtime_state.h>
-// IWYU pragma: no_include <bits/chrono.h>
-#include <chrono> // IWYU pragma: keep
-#include <utility>
-
-#include "pipeline/exec/hashjoin_build_sink.h"
-
-namespace doris::vectorized {
-#include "common/compile_check_begin.h"
-
-void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, 
int node_id) {
-    // Only need to set builder and consumers with pipeline engine enabled.
-    std::lock_guard<std::mutex> lock(_mutex);
-    DCHECK(_builder_fragment_ids.find(node_id) == 
_builder_fragment_ids.cend());
-    _builder_fragment_ids.insert({node_id, builder});
-}
-
-SharedHashTableContextPtr SharedHashTableController::get_context(int 
my_node_id) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    if (!_shared_contexts.contains(my_node_id)) {
-        _shared_contexts.insert({my_node_id, 
std::make_shared<SharedHashTableContext>()});
-    }
-    return _shared_contexts[my_node_id];
-}
-
-void SharedHashTableController::signal_finish(int my_node_id) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    auto it = _shared_contexts.find(my_node_id);
-    if (it != _shared_contexts.cend()) {
-        it->second->signaled = true;
-        _shared_contexts.erase(it);
-    }
-    for (auto& dep : _dependencies[my_node_id]) {
-        dep->set_ready();
-    }
-    for (auto& dep : _finish_dependencies[my_node_id]) {
-        dep->set_ready();
-    }
-}
-
-TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int 
my_node_id) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    auto it = _builder_fragment_ids.find(my_node_id);
-    if (it == _builder_fragment_ids.cend()) {
-        return TUniqueId {};
-    }
-    return it->second;
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
deleted file mode 100644
index 51f4cfda3b8..00000000000
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <gen_cpp/Types_types.h>
-
-#include <condition_variable>
-#include <map>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include "common/status.h"
-#include "runtime_filter/runtime_filter_definitions.h"
-#include "runtime_filter/runtime_filter_wrapper.h"
-#include "vec/core/block.h"
-
-namespace doris {
-#include "common/compile_check_begin.h"
-
-class RuntimeState;
-class MinMaxFuncBase;
-class HybridSetBase;
-class BloomFilterFuncBase;
-class BitmapFilterFuncBase;
-
-namespace pipeline {
-class Dependency;
-}
-namespace vectorized {
-
-class Arena;
-
-struct SharedHashTableContext {
-    SharedHashTableContext()
-            : hash_table_variants(nullptr), 
block(std::make_shared<vectorized::Block>()) {}
-
-    Status status;
-    std::shared_ptr<Arena> arena;
-    std::shared_ptr<void> hash_table_variants;
-    std::shared_ptr<Block> block;
-    std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
-    std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
-    std::atomic<bool> signaled = false;
-    bool short_circuit_for_null_in_probe_side = false;
-};
-
-using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
-
-class SharedHashTableController {
-public:
-    /// set hash table builder's fragment instance id and consumers' fragment 
instance id
-    void set_builder_and_consumers(TUniqueId builder, int node_id);
-    TUniqueId get_builder_fragment_instance_id(int my_node_id);
-    SharedHashTableContextPtr get_context(int my_node_id);
-    void signal_finish(int my_node_id);
-    void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> 
dep,
-                           std::shared_ptr<pipeline::Dependency> finish_dep) {
-        std::lock_guard<std::mutex> lock(_mutex);
-        if (!_dependencies.contains(node_id)) {
-            _dependencies.insert({node_id, {}});
-            _finish_dependencies.insert({node_id, {}});
-        }
-        _dependencies[node_id].push_back(dep);
-        _finish_dependencies[node_id].push_back(finish_dep);
-    }
-
-private:
-    std::mutex _mutex;
-    // For pipelineX, we update all dependencies once hash table is built;
-    std::map<int /*node id*/, 
std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies;
-    std::map<int /*node id*/, 
std::vector<std::shared_ptr<pipeline::Dependency>>>
-            _finish_dependencies;
-    std::map<int /*node id*/, TUniqueId /*fragment instance id*/> 
_builder_fragment_ids;
-    std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
-};
-
-} // namespace vectorized
-} // namespace doris
-
-#include "common/compile_check_end.h"
diff --git a/be/test/olap/wal/wal_manager_test.cpp 
b/be/test/olap/wal/wal_manager_test.cpp
index 459a61d4b76..2229ee73cf7 100644
--- a/be/test/olap/wal/wal_manager_test.cpp
+++ b/be/test/olap/wal/wal_manager_test.cpp
@@ -282,10 +282,11 @@ void WalManagerTest::init() {
     auto local_state =
             pipeline::FileScanLocalState::create_unique(&_runtime_state, 
_scan_node.get());
     std::vector<TScanRangeParams> scan_ranges;
-    std::map<int, 
std::pair<std::shared_ptr<pipeline::LocalExchangeSharedState>,
-                            std::shared_ptr<pipeline::Dependency>>>
-            le_state_map;
-    pipeline::LocalStateInfo info {&_global_profile, scan_ranges, nullptr, 
le_state_map, 0};
+    pipeline::LocalStateInfo info {.parent_profile = &_global_profile,
+                                   .scan_ranges = scan_ranges,
+                                   .shared_state = nullptr,
+                                   .shared_state_map = {},
+                                   .task_idx = 0};
     WARN_IF_ERROR(local_state->init(&_runtime_state, info), "fail to init 
local_state");
     _runtime_state.emplace_local_state(_scan_node->operator_id(), 
std::move(local_state));
 
diff --git a/be/test/pipeline/local_exchanger_test.cpp 
b/be/test/pipeline/local_exchanger_test.cpp
index 9a6c4acb9f3..d3a9b0e2d5d 100644
--- a/be/test/pipeline/local_exchanger_test.cpp
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -95,7 +95,7 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) {
     auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
-    shared_state->create_dependencies(0);
+    shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
 
     auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
     for (size_t i = 0; i < num_sink; i++) {
@@ -338,7 +338,7 @@ TEST_F(LocalExchangerTest, PassthroughExchanger) {
     auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
-    shared_state->create_dependencies(0);
+    shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
 
     auto* exchanger = (PassthroughExchanger*)shared_state->exchanger.get();
     for (size_t i = 0; i < num_sink; i++) {
@@ -532,7 +532,7 @@ TEST_F(LocalExchangerTest, PassToOneExchanger) {
     auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
-    shared_state->create_dependencies(0);
+    shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
 
     auto* exchanger = (PassToOneExchanger*)shared_state->exchanger.get();
     for (size_t i = 0; i < num_sink; i++) {
@@ -734,7 +734,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) {
     auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
-    shared_state->create_dependencies(0);
+    shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
 
     auto* exchanger = (BroadcastExchanger*)shared_state->exchanger.get();
     for (size_t i = 0; i < num_sink; i++) {
@@ -931,7 +931,7 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) {
     auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
-    shared_state->create_dependencies(0);
+    shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
 
     auto* exchanger = 
(AdaptivePassthroughExchanger*)shared_state->exchanger.get();
     for (size_t i = 0; i < num_sink; i++) {
@@ -1140,7 +1140,7 @@ TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) {
     auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
-    shared_state->create_dependencies(0);
+    shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
 
     auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
     auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF,
diff --git a/be/test/pipeline/operator/agg_operator_test.cpp 
b/be/test/pipeline/operator/agg_operator_test.cpp
index de9af15c1ba..1cb002c85d9 100644
--- a/be/test/pipeline/operator/agg_operator_test.cpp
+++ b/be/test/pipeline/operator/agg_operator_test.cpp
@@ -42,7 +42,7 @@ auto static 
init_sink_and_source(std::shared_ptr<AggSinkOperatorX> sink_op,
                                  .parent_profile = &ctx.profile,
                                  .sender_id = 0,
                                  .shared_state = shared_state.get(),
-                                 .le_state_map = {},
+                                 .shared_state_map = {},
                                  .tsink = TDataSink {}};
         EXPECT_TRUE(local_state->init(&ctx.state, info).ok());
         ctx.state.emplace_sink_local_state(0, std::move(local_state));
@@ -54,7 +54,7 @@ auto static 
init_sink_and_source(std::shared_ptr<AggSinkOperatorX> sink_op,
         LocalStateInfo info {.parent_profile = &ctx.profile,
                              .scan_ranges = {},
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .task_idx = 0};
 
         EXPECT_TRUE(local_state->init(&ctx.state, info).ok());
diff --git 
a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp 
b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
index c1e92272739..1377ee03328 100644
--- a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
+++ b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
@@ -57,7 +57,7 @@ struct DistinctStreamingAggOperatorTest : public 
::testing::Test {
         LocalStateInfo info {.parent_profile = &profile,
                              .scan_ranges = {},
                              .shared_state = nullptr,
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .task_idx = 0};
         EXPECT_TRUE(local_state->init(state.get(), info));
         state->resize_op_id_to_local_state(-100);
diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp 
b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
index 945eab84ec5..aa3fc6f7877 100644
--- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
@@ -92,7 +92,7 @@ auto create_exchange_sink(std::vector<ChannelInfo> 
channel_info) {
                              .parent_profile = &ctx->profile,
                              .sender_id = 0,
                              .shared_state = nullptr,
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = TDataSink {}};
     EXPECT_TRUE(local_state->init(&ctx->state, info).ok());
     ctx->state.emplace_sink_local_state(0, std::move(local_state));
diff --git a/be/test/pipeline/operator/exchange_source_operator_test.cpp 
b/be/test/pipeline/operator/exchange_source_operator_test.cpp
index db71bc6982a..709921ac48c 100644
--- a/be/test/pipeline/operator/exchange_source_operator_test.cpp
+++ b/be/test/pipeline/operator/exchange_source_operator_test.cpp
@@ -88,7 +88,7 @@ struct ExchangeSourceOperatorXTest : public ::testing::Test {
         LocalStateInfo info {.parent_profile = &profile,
                              .scan_ranges = {},
                              .shared_state = nullptr,
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .task_idx = 0};
         auto st = local_state->init(state.get(), info);
         state->resize_op_id_to_local_state(-100);
diff --git 
a/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp 
b/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp
index 6264c1c52ab..ee653a950cf 100644
--- a/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp
+++ b/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp
@@ -66,7 +66,7 @@ struct LocalMergeSOrtSourceOperatorTest : public 
testing::Test {
             LocalStateInfo info {.parent_profile = &profile,
                                  .scan_ranges = {},
                                  .shared_state = shared_states[i].get(),
-                                 .le_state_map = {},
+                                 .shared_state_map = {},
                                  .task_idx = i};
             EXPECT_TRUE(local_state->init(runtime_states[i].get(), info));
             runtime_states[i]->resize_op_id_to_local_state(-100);
diff --git 
a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
index ba24d77d5d7..c4b04260b4c 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
@@ -62,7 +62,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, Init) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -101,7 +101,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, Sink) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -152,7 +152,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithEmptyEOS) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -204,7 +204,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithSpill) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -274,7 +274,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithSpillAndEmptyEOS) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -341,7 +341,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithSpillLargeData) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -424,7 +424,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithSpilError) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
diff --git 
a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
index 8daff47ead5..67d49eb24c3 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
@@ -68,7 +68,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, Init) {
             .parent_profile = _helper.runtime_profile.get(),
             .scan_ranges = {},
             .shared_state = shared_state.get(),
-            .le_state_map = {},
+            .shared_state_map = {},
             .task_idx = 0,
     };
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -113,7 +113,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockEmpty) {
                                   .parent_profile = 
_helper.runtime_profile.get(),
                                   .sender_id = 0,
                                   .shared_state = shared_state.get(),
-                                  .le_state_map = {},
+                                  .shared_state_map = {},
                                   .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -128,7 +128,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockEmpty) {
             .parent_profile = _helper.runtime_profile.get(),
             .scan_ranges = {},
             .shared_state = shared_state.get(),
-            .le_state_map = {},
+            .shared_state_map = {},
             .task_idx = 0,
     };
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -184,7 +184,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) {
                                   .parent_profile = 
_helper.runtime_profile.get(),
                                   .sender_id = 0,
                                   .shared_state = shared_state.get(),
-                                  .le_state_map = {},
+                                  .shared_state_map = {},
                                   .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -218,7 +218,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) {
             .parent_profile = _helper.runtime_profile.get(),
             .scan_ranges = {},
             .shared_state = shared_state.get(),
-            .le_state_map = {},
+            .shared_state_map = {},
             .task_idx = 0,
     };
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -276,7 +276,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpill) {
                                   .parent_profile = 
_helper.runtime_profile.get(),
                                   .sender_id = 0,
                                   .shared_state = shared_state.get(),
-                                  .le_state_map = {},
+                                  .shared_state_map = {},
                                   .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -323,7 +323,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpill) {
             .parent_profile = _helper.runtime_profile.get(),
             .scan_ranges = {},
             .shared_state = shared_state.get(),
-            .le_state_map = {},
+            .shared_state_map = {},
             .task_idx = 0,
     };
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -389,7 +389,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpillError) {
                                   .parent_profile = 
_helper.runtime_profile.get(),
                                   .sender_id = 0,
                                   .shared_state = shared_state.get(),
-                                  .le_state_map = {},
+                                  .shared_state_map = {},
                                   .tsink = TDataSink()};
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
     ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -436,7 +436,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpillError) {
             .parent_profile = _helper.runtime_profile.get(),
             .scan_ranges = {},
             .shared_state = shared_state.get(),
-            .le_state_map = {},
+            .shared_state_map = {},
             .task_idx = 0,
     };
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
index 0d2a653c6c3..c8480cbb2ec 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
@@ -181,10 +181,11 @@ PartitionedAggregationTestHelper::create_operators() {
     EXPECT_TRUE(sink_operator->set_child(child_operator));
 
     // Setup task and state
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            le_state_map;
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
     pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, 
runtime_state.get(), nullptr,
-                                                   nullptr, le_state_map, 0);
+                                                   nullptr, shared_state_map, 
0);
     runtime_state->set_task(pipeline_task.get());
     return {std::move(source_operator), std::move(sink_operator)};
 }
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index 9f2c3b48790..b9b1a1329df 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -67,9 +67,6 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, debug_string) {
 TEST_F(PartitionedHashJoinProbeOperatorTest, InitAndOpen) {
     auto [probe_operator, sink_operator] = _helper.create_operators();
 
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            le_state_map;
-
     auto local_state = PartitionedHashJoinProbeLocalState::create_shared(
             _helper.runtime_state.get(), probe_operator.get());
 
@@ -77,7 +74,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, InitAndOpen) {
     LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
                          .scan_ranges = {},
                          .shared_state = shared_state.get(),
-                         .le_state_map = le_state_map,
+                         .shared_state_map = {},
                          .task_idx = 0};
     auto st = local_state->init(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st) << "init failed: " << st.to_string();
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
index 29cb355b0a0..a9dae776a2d 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -118,12 +118,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, 
InitLocalState) {
     ASSERT_TRUE(st.ok()) << "Prepare failed: " << st.to_string();
 
     RuntimeProfile runtime_profile("test");
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            le_state_map;
     TDataSink t_sink;
     LocalSinkStateInfo info {.parent_profile = &runtime_profile,
                              .shared_state = shared_state.get(),
-                             .le_state_map = le_state_map,
+                             .shared_state_map = {},
                              .tsink = t_sink};
     st = local_state->init(_helper.runtime_state.get(), info);
     ASSERT_TRUE(st) << "init failed: " << st.to_string();
@@ -222,12 +220,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, 
SinkEosAndSpill) {
 
     auto shared_state = std::make_shared<MockPartitionedHashJoinSharedState>();
 
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            le_state_map;
     LocalSinkStateInfo sink_info {.task_idx = 0,
                                   .parent_profile = 
_helper.runtime_profile.get(),
                                   .shared_state = shared_state.get(),
-                                  .le_state_map = le_state_map,
+                                  .shared_state_map = {},
                                   .tsink = TDataSink()};
     auto st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
     ASSERT_TRUE(st.ok()) << "Setup local state failed: " << st.to_string();
diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
index 001bcd8e224..7d15e80fa67 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
@@ -154,10 +154,11 @@ PartitionedHashJoinTestHelper::create_operators() {
     sink_operator->set_inner_operators(inner_sink_operator, 
inner_probe_operator);
 
     // Setup task and state
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            le_state_map;
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
     pipeline_task = std::make_shared<PipelineTask>(probe_pipeline, 0, 
runtime_state.get(), nullptr,
-                                                   nullptr, le_state_map, 0);
+                                                   nullptr, shared_state_map, 
0);
     runtime_state->set_task(pipeline_task.get());
     return {probe_operator, sink_operator};
 }
diff --git a/be/test/pipeline/operator/repeat_operator_test.cpp 
b/be/test/pipeline/operator/repeat_operator_test.cpp
index a1c120e95f3..e49c7c452df 100644
--- a/be/test/pipeline/operator/repeat_operator_test.cpp
+++ b/be/test/pipeline/operator/repeat_operator_test.cpp
@@ -50,7 +50,7 @@ struct RepeatOperatorTest : public ::testing::Test {
         LocalStateInfo info {.parent_profile = &profile,
                              .scan_ranges = {},
                              .shared_state = nullptr,
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .task_idx = 0};
         EXPECT_TRUE(local_state->init(state.get(), info));
         state->resize_op_id_to_local_state(-100);
diff --git a/be/test/pipeline/operator/set_operator_test.cpp 
b/be/test/pipeline/operator/set_operator_test.cpp
index 78bc2c02a36..62675144ffa 100644
--- a/be/test/pipeline/operator/set_operator_test.cpp
+++ b/be/test/pipeline/operator/set_operator_test.cpp
@@ -78,7 +78,7 @@ struct SetOperatorTest : public ::testing::Test {
             LocalStateInfo info {.parent_profile = &profile,
                                  .scan_ranges = {},
                                  .shared_state = shared_state_sptr.get(),
-                                 .le_state_map = {},
+                                 .shared_state_map = {},
                                  .task_idx = 0};
             EXPECT_TRUE(source_local_state->init(state.get(), info));
             state->resize_op_id_to_local_state(-100);
@@ -91,7 +91,7 @@ struct SetOperatorTest : public ::testing::Test {
                                      .parent_profile = &profile,
                                      .sender_id = 0,
                                      .shared_state = shared_state_sptr.get(),
-                                     .le_state_map = {},
+                                     .shared_state_map = {},
                                      .tsink = TDataSink {}};
             EXPECT_TRUE(sink_local_state->init(state.get(), info));
             state->emplace_sink_local_state(sink_op->operator_id(),
@@ -112,7 +112,7 @@ struct SetOperatorTest : public ::testing::Test {
                                      .parent_profile = &profile,
                                      .sender_id = 0,
                                      .shared_state = shared_state_sptr.get(),
-                                     .le_state_map = {},
+                                     .shared_state_map = {},
                                      .tsink = TDataSink {}};
             EXPECT_TRUE(probe_sink_local_state[i]->init(states[i].get(), 
info));
             
states[i]->emplace_sink_local_state(probe_sink_ops[i]->operator_id(),
diff --git a/be/test/pipeline/operator/sort_operator_test.cpp 
b/be/test/pipeline/operator/sort_operator_test.cpp
index 1867f31dbf1..cd1a4c35d85 100644
--- a/be/test/pipeline/operator/sort_operator_test.cpp
+++ b/be/test/pipeline/operator/sort_operator_test.cpp
@@ -90,7 +90,7 @@ struct SortOperatorTest : public ::testing::Test {
                                      .parent_profile = &profile,
                                      .sender_id = 0,
                                      .shared_state = shared_state.get(),
-                                     .le_state_map = {},
+                                     .shared_state_map = {},
                                      .tsink = TDataSink {}};
             EXPECT_TRUE(sink_local_state_uptr->init(state.get(), info).ok());
             state->emplace_sink_local_state(0, 
std::move(sink_local_state_uptr));
@@ -102,7 +102,7 @@ struct SortOperatorTest : public ::testing::Test {
             LocalStateInfo info {.parent_profile = &profile,
                                  .scan_ranges = {},
                                  .shared_state = shared_state.get(),
-                                 .le_state_map = {},
+                                 .shared_state_map = {},
                                  .task_idx = 0};
 
             EXPECT_TRUE(source_local_state_uptr->init(state.get(), info).ok());
diff --git a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp 
b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
index 75101ae4edb..ac5da984389 100644
--- a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
@@ -60,7 +60,7 @@ TEST_F(SpillSortSinkOperatorTest, Basic) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = {}};
 
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -112,7 +112,7 @@ TEST_F(SpillSortSinkOperatorTest, Sink) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = {}};
 
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -209,7 +209,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = {}};
 
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -287,7 +287,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill2) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = {}};
 
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -349,7 +349,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpillError) {
                              .parent_profile = _helper.runtime_profile.get(),
                              .sender_id = 0,
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .tsink = {}};
 
     st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
diff --git a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp 
b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
index aa9340a7d9b..87e5c9d1bf5 100644
--- a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
+++ b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
@@ -62,7 +62,7 @@ TEST_F(SpillSortSourceOperatorTest, Basic) {
     LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
                          .scan_ranges = {},
                          .shared_state = shared_state.get(),
-                         .le_state_map = {},
+                         .shared_state_map = {},
                          .task_idx = 0};
 
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -109,7 +109,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlock) {
     LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
                          .scan_ranges = {},
                          .shared_state = shared_state.get(),
-                         .le_state_map = {},
+                         .shared_state_map = {},
                          .task_idx = 0};
 
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -186,7 +186,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) {
     LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
                          .scan_ranges = {},
                          .shared_state = shared_state.get(),
-                         .le_state_map = {},
+                         .shared_state_map = {},
                          .task_idx = 0};
 
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -332,7 +332,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) {
     LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
                          .scan_ranges = {},
                          .shared_state = shared_state.get(),
-                         .le_state_map = {},
+                         .shared_state_map = {},
                          .task_idx = 0};
 
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -482,7 +482,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError) 
{
     LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
                          .scan_ranges = {},
                          .shared_state = shared_state.get(),
-                         .le_state_map = {},
+                         .shared_state_map = {},
                          .task_idx = 0};
 
     st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
diff --git a/be/test/pipeline/operator/spill_sort_test_helper.cpp 
b/be/test/pipeline/operator/spill_sort_test_helper.cpp
index ca93291d273..0e291343737 100644
--- a/be/test/pipeline/operator/spill_sort_test_helper.cpp
+++ b/be/test/pipeline/operator/spill_sort_test_helper.cpp
@@ -159,10 +159,11 @@ SpillSortTestHelper::create_operators() {
     EXPECT_TRUE(sink_operator->set_child(child_operator));
 
     // Setup task and state
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            le_state_map;
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
     pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, 
runtime_state.get(), nullptr,
-                                                   nullptr, le_state_map, 0);
+                                                   nullptr, shared_state_map, 
0);
     runtime_state->set_task(pipeline_task.get());
     return {std::move(source_operator), std::move(sink_operator)};
 }
diff --git a/be/test/pipeline/operator/union_operator_test.cpp 
b/be/test/pipeline/operator/union_operator_test.cpp
index d655b7bf4ed..2c57d10b89e 100644
--- a/be/test/pipeline/operator/union_operator_test.cpp
+++ b/be/test/pipeline/operator/union_operator_test.cpp
@@ -96,7 +96,7 @@ TEST_F(UnionOperatorTest, test_all_const_expr) {
     LocalStateInfo info {.parent_profile = &profile,
                          .scan_ranges = {},
                          .shared_state = nullptr,
-                         .le_state_map = {},
+                         .shared_state_map = {},
                          .task_idx = 0};
     EXPECT_TRUE(source_local_state->init(state.get(), info));
     state->resize_op_id_to_local_state(-100);
@@ -190,7 +190,7 @@ TEST_F(UnionOperatorTest, test_sink_and_source) {
         LocalStateInfo info {.parent_profile = &profile,
                              .scan_ranges = {},
                              .shared_state = shared_state.get(),
-                             .le_state_map = {},
+                             .shared_state_map = {},
                              .task_idx = 0};
         EXPECT_TRUE(source_local_state->init(state.get(), info));
         state->resize_op_id_to_local_state(-100);
@@ -209,7 +209,7 @@ TEST_F(UnionOperatorTest, test_sink_and_source) {
                                      .parent_profile = &profile,
                                      .sender_id = 0,
                                      .shared_state = shared_state.get(),
-                                     .le_state_map = {},
+                                     .shared_state_map = {},
                                      .tsink = TDataSink {}};
             EXPECT_TRUE(sink_local_state->init(sink_state[i].get(), info));
             sink_state[i]->resize_op_id_to_local_state(-100);
diff --git a/be/test/pipeline/pipeline_test.cpp 
b/be/test/pipeline/pipeline_test.cpp
index 337b214b62f..1f9d345087a 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -272,12 +272,12 @@ TEST_F(PipelineTest, HAPPY_PATH) {
 
         _pipeline_profiles[cur_pipe->id()] =
                 std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(cur_pipe->id()));
-        std::map<int,
-                 std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-                le_state_map;
+        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                std::vector<std::shared_ptr<Dependency>>>>
+                shared_state_map;
         auto task = std::make_unique<PipelineTask>(
                 cur_pipe, task_id, local_runtime_state.get(), 
_context.back().get(),
-                _pipeline_profiles[cur_pipe->id()].get(), le_state_map, 
task_id);
+                _pipeline_profiles[cur_pipe->id()].get(), shared_state_map, 
task_id);
         cur_pipe->incr_created_tasks(task_id, task.get());
         local_runtime_state->set_task(task.get());
         task->set_task_queue(_task_queue.get());
@@ -936,12 +936,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                         
std::static_pointer_cast<TaskExecutionContext>(_context.back()));
                 
local_runtime_state->set_runtime_filter_mgr(_runtime_filter_mgrs[j].get());
                 
_runtime_filter_mgrs[j]->_state->set_state(local_runtime_state.get());
-                std::map<int, 
std::pair<std::shared_ptr<LocalExchangeSharedState>,
-                                        std::shared_ptr<Dependency>>>
-                        le_state_map;
+                std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                        
std::vector<std::shared_ptr<Dependency>>>>
+                        shared_state_map;
                 auto task = std::make_unique<PipelineTask>(
                         _pipelines[i], task_id, local_runtime_state.get(), 
_context.back().get(),
-                        _pipeline_profiles[_pipelines[i]->id()].get(), 
le_state_map, j);
+                        _pipeline_profiles[_pipelines[i]->id()].get(), 
shared_state_map, j);
                 _pipelines[i]->incr_created_tasks(j, task.get());
                 local_runtime_state->set_task(task.get());
                 task->set_task_queue(_task_queue.get());
diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
index af48faffcf4..63f10a69838 100644
--- a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
@@ -76,9 +76,9 @@ TEST_F(RuntimeFilterProducerHelperTest, basic) {
     column->insert(2);
     block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
 
-    vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
+    std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
+            helper.process(_runtime_states[0].get(), &block, false, 
runtime_filters));
 }
 
 TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
@@ -101,10 +101,10 @@ TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
     column->insert(2);
     block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
 
-    vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
     _tasks[0]->set_wake_up_early();
+    std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
+            helper.process(_runtime_states[0].get(), &block, false, 
runtime_filters));
 }
 
 TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
@@ -132,9 +132,9 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
     column->insert(2);
     block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
 
-    vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
+    std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
+            helper.process(_runtime_states[0].get(), &block, false, 
runtime_filters));
 }
 
 TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
@@ -156,16 +156,15 @@ TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
     column->insert(2);
     block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
 
-    vectorized::SharedHashTableContextPtr shared_hash_table_ctx =
-            std::make_shared<vectorized::SharedHashTableContext>();
+    std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
+            helper.process(_runtime_states[0].get(), &block, true, 
runtime_filters));
 
     auto helper2 = RuntimeFilterProducerHelper(&_profile, false, true);
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
             helper2.init(_runtime_states[1].get(), build_expr_ctxs, 
runtime_filter_descs));
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            helper2.process(_runtime_states[1].get(), &block, 
shared_hash_table_ctx));
+            helper2.process(_runtime_states[1].get(), &block, true, 
runtime_filters));
 }
 
 } // namespace doris
diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp 
b/be/test/vec/exec/vfile_scanner_exception_test.cpp
index 0a0dee944da..8fa37c26278 100644
--- a/be/test/vec/exec/vfile_scanner_exception_test.cpp
+++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp
@@ -252,10 +252,11 @@ void VfileScannerExceptionTest::init() {
     auto local_state =
             pipeline::FileScanLocalState::create_unique(&_runtime_state, 
_scan_node.get());
     std::vector<TScanRangeParams> scan_ranges;
-    std::map<int, 
std::pair<std::shared_ptr<pipeline::LocalExchangeSharedState>,
-                            std::shared_ptr<pipeline::Dependency>>>
-            le_state_map;
-    pipeline::LocalStateInfo info {&_global_profile, scan_ranges, nullptr, 
le_state_map, 0};
+    pipeline::LocalStateInfo info {.parent_profile = &_global_profile,
+                                   .scan_ranges = scan_ranges,
+                                   .shared_state = nullptr,
+                                   .shared_state_map = {},
+                                   .task_idx = 0};
     WARN_IF_ERROR(local_state->init(&_runtime_state, info), "fail to init 
local_state");
     _runtime_state.emplace_local_state(_scan_node->operator_id(), 
std::move(local_state));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to