This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 28aad017d1c [refactor](minor) delete unused shared hash table 
construction (#35413)
28aad017d1c is described below

commit 28aad017d1c093e1ad4fda956ea9f94ed8561cfb
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon May 27 14:40:08 2024 +0800

    [refactor](minor) delete unused shared hash table construction (#35413)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |   4 +-
 be/src/runtime/fragment_mgr.cpp                    |  80 ----------
 be/src/runtime/fragment_mgr.h                      |  10 --
 be/src/vec/exec/join/vhash_join_node.cpp           | 176 ++++++---------------
 be/src/vec/exec/join/vhash_join_node.h             |  14 --
 .../vec/runtime/shared_hash_table_controller.cpp   |  55 +------
 be/src/vec/runtime/shared_hash_table_controller.h  |   9 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |   6 -
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 9 files changed, 56 insertions(+), 300 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 90aeb7070b6..7c486fd4b45 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -57,8 +57,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
             _should_build_hash_table = info.task_idx == 0;
             if (_should_build_hash_table) {
                 profile()->add_info_string("ShareHashTableEnabled", "true");
-                CHECK(p._shared_hashtable_controller->should_build_hash_table(
-                        state->fragment_instance_id(), p.node_id()));
+                p._shared_hashtable_controller->set_builder_and_consumers(
+                        state->fragment_instance_id(), p.node_id());
             }
         } else {
             profile()->add_info_string("ShareHashTableEnabled", "false");
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index cd54edd5f54..8534638f681 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -674,7 +674,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
             query_ctx->set_rsc_info = true;
         }
 
-        
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
         _set_scan_concurrency(params, query_ctx.get());
         const bool is_pipeline = std::is_same_v<TPipelineFragmentParams, 
Params>;
 
@@ -851,7 +850,6 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             params.query_options.enable_pipeline_x_engine) ||
            (params.query_options.__isset.enable_pipeline_engine &&
             params.query_options.enable_pipeline_engine));
-    _setup_shared_hashtable_for_broadcast_join(params, query_ctx.get());
     int64_t duration_ns = 0;
     std::shared_ptr<pipeline::PipelineFragmentContext> context =
             std::make_shared<pipeline::PipelineFragmentContext>(
@@ -1440,84 +1438,6 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
     return merge_status;
 }
 
-void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const 
TExecPlanFragmentParams& params,
-                                                             QueryContext* 
query_ctx) {
-    if 
(!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
-        !params.query_options.enable_share_hash_table_for_broadcast_join) {
-        return;
-    }
-
-    if (!params.__isset.fragment || !params.fragment.__isset.plan ||
-        params.fragment.plan.nodes.empty()) {
-        return;
-    }
-    for (auto& node : params.fragment.plan.nodes) {
-        if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
-            !node.hash_join_node.__isset.is_broadcast_join ||
-            !node.hash_join_node.is_broadcast_join) {
-            continue;
-        }
-
-        if (params.build_hash_table_for_broadcast_join) {
-            
query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
-                    params.params.fragment_instance_id, node.node_id);
-        }
-    }
-}
-
-void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(
-        const TPipelineFragmentParams& params, const TPipelineInstanceParams& 
local_params,
-        QueryContext* query_ctx) {
-    if 
(!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
-        !params.query_options.enable_share_hash_table_for_broadcast_join) {
-        return;
-    }
-
-    if (!params.__isset.fragment || !params.fragment.__isset.plan ||
-        params.fragment.plan.nodes.empty()) {
-        return;
-    }
-    for (auto& node : params.fragment.plan.nodes) {
-        if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
-            !node.hash_join_node.__isset.is_broadcast_join ||
-            !node.hash_join_node.is_broadcast_join) {
-            continue;
-        }
-
-        if (local_params.build_hash_table_for_broadcast_join) {
-            
query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
-                    local_params.fragment_instance_id, node.node_id);
-        }
-    }
-}
-
-void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const 
TPipelineFragmentParams& params,
-                                                             QueryContext* 
query_ctx) {
-    if 
(!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
-        !params.query_options.enable_share_hash_table_for_broadcast_join) {
-        return;
-    }
-
-    if (!params.__isset.fragment || !params.fragment.__isset.plan ||
-        params.fragment.plan.nodes.empty()) {
-        return;
-    }
-    for (auto& node : params.fragment.plan.nodes) {
-        if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
-            !node.hash_join_node.__isset.is_broadcast_join ||
-            !node.hash_join_node.is_broadcast_join) {
-            continue;
-        }
-
-        for (auto& local_param : params.local_params) {
-            if (local_param.build_hash_table_for_broadcast_join) {
-                
query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
-                        local_param.fragment_instance_id, node.node_id);
-            }
-        }
-    }
-}
-
 void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
query_info_list) {
     {
         std::lock_guard<std::mutex> lock(_lock);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 717b6813abc..c8298ee67b7 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -162,16 +162,6 @@ private:
     template <typename Param>
     void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
 
-    void _setup_shared_hashtable_for_broadcast_join(const 
TExecPlanFragmentParams& params,
-                                                    QueryContext* query_ctx);
-
-    void _setup_shared_hashtable_for_broadcast_join(const 
TPipelineFragmentParams& params,
-                                                    const 
TPipelineInstanceParams& local_params,
-                                                    QueryContext* query_ctx);
-
-    void _setup_shared_hashtable_for_broadcast_join(const 
TPipelineFragmentParams& params,
-                                                    QueryContext* query_ctx);
-
     template <typename Params>
     Status _get_query_ctx(const Params& params, TUniqueId query_id, bool 
pipeline,
                           std::shared_ptr<QueryContext>& query_ctx);
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 9fec942d161..bee0c67c4b3 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -218,19 +218,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
 Status HashJoinNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
-    _should_build_hash_table = true;
     if (_is_broadcast_join) {
         runtime_profile()->add_info_string("BroadcastJoin", "true");
-        if (state->enable_share_hash_table_for_broadcast_join()) {
-            runtime_profile()->add_info_string("ShareHashTableEnabled", 
"true");
-            _shared_hashtable_controller =
-                    state->get_query_ctx()->get_shared_hash_table_controller();
-            _shared_hash_table_context = 
_shared_hashtable_controller->get_context(id());
-            _should_build_hash_table = 
_shared_hashtable_controller->should_build_hash_table(
-                    state->fragment_instance_id(), id());
-        } else {
-            runtime_profile()->add_info_string("ShareHashTableEnabled", 
"false");
-        }
     }
 
     _memory_usage_counter = ADD_LABEL_COUNTER(runtime_profile(), 
"MemoryUsage");
@@ -245,8 +234,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
             "ProbeKeyArena", TUnit::BYTES, "MemoryUsage");
 
     // Build phase
-    auto* record_profile =
-            _should_build_hash_table ? _build_phase_profile : 
faker_runtime_profile();
+    auto* record_profile = _build_phase_profile;
     _build_get_next_timer = ADD_TIMER(record_profile, "BuildGetNextTime");
     _build_timer = ADD_TIMER(record_profile, "BuildTime");
     _build_rows_counter = ADD_COUNTER(record_profile, "BuildRows", 
TUnit::UNIT);
@@ -691,31 +679,26 @@ Status 
HashJoinNode::_materialize_build_side(RuntimeState* state) {
         SCOPED_TIMER(_build_get_next_timer);
         RETURN_IF_ERROR(child(1)->open(state));
     }
-    if (_should_build_hash_table) {
-        bool eos = false;
-        Block block;
-        // If eos or have already met a null value using short-circuit 
strategy, we do not need to pull
-        // data from data.
-        while (!eos && (!_short_circuit_for_null_in_build_side || 
!_has_null_in_build_side) &&
-               (!_probe_open_finish || 
!_is_hash_join_early_start_probe_eos(state))) {
-            release_block_memory(block, 1);
-            RETURN_IF_CANCELLED(state);
-            {
-                SCOPED_TIMER(_build_get_next_timer);
-                RETURN_IF_ERROR(child(1)->get_next_after_projects(
-                        state, &block, &eos,
-                        std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
-                                          ExecNode::get_next,
-                                  _children[1], std::placeholders::_1, 
std::placeholders::_2,
-                                  std::placeholders::_3)));
-            }
-            RETURN_IF_ERROR(sink(state, &block, eos));
+    bool eos = false;
+    Block block;
+    // If eos or have already met a null value using short-circuit strategy, 
we do not need to pull
+    // data from data.
+    while (!eos && (!_short_circuit_for_null_in_build_side || 
!_has_null_in_build_side) &&
+           (!_probe_open_finish || 
!_is_hash_join_early_start_probe_eos(state))) {
+        release_block_memory(block, 1);
+        RETURN_IF_CANCELLED(state);
+        {
+            SCOPED_TIMER(_build_get_next_timer);
+            RETURN_IF_ERROR(child(1)->get_next_after_projects(
+                    state, &block, &eos,
+                    std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
+                                      ExecNode::get_next,
+                              _children[1], std::placeholders::_1, 
std::placeholders::_2,
+                              std::placeholders::_3)));
         }
-        RETURN_IF_ERROR(child(1)->close(state));
-    } else {
-        RETURN_IF_ERROR(child(1)->close(state));
-        RETURN_IF_ERROR(sink(state, nullptr, true));
+        RETURN_IF_ERROR(sink(state, &block, eos));
     }
+    RETURN_IF_ERROR(child(1)->close(state));
     return Status::OK();
 }
 
@@ -723,38 +706,36 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
     SCOPED_TIMER(_exec_timer);
     SCOPED_TIMER(_build_timer);
 
-    if (_should_build_hash_table) {
-        // If eos or have already met a null value using short-circuit 
strategy, we do not need to pull
-        // data from probe side.
-        _build_side_mem_used += in_block->allocated_bytes();
-
-        if (_build_side_mutable_block.empty()) {
-            auto tmp_build_block =
-                    
VectorizedUtils::create_empty_columnswithtypename(child(1)->row_desc());
-            tmp_build_block = *(tmp_build_block.create_same_struct_block(1, 
false));
-            _build_col_ids.resize(_build_expr_ctxs.size());
-            RETURN_IF_ERROR(_do_evaluate(tmp_build_block, _build_expr_ctxs, 
*_build_expr_call_timer,
-                                         _build_col_ids));
-            _build_side_mutable_block = 
MutableBlock::build_mutable_block(&tmp_build_block);
-        }
+    // If eos or have already met a null value using short-circuit strategy, 
we do not need to pull
+    // data from probe side.
+    _build_side_mem_used += in_block->allocated_bytes();
+
+    if (_build_side_mutable_block.empty()) {
+        auto tmp_build_block =
+                
VectorizedUtils::create_empty_columnswithtypename(child(1)->row_desc());
+        tmp_build_block = *(tmp_build_block.create_same_struct_block(1, 
false));
+        _build_col_ids.resize(_build_expr_ctxs.size());
+        RETURN_IF_ERROR(_do_evaluate(tmp_build_block, _build_expr_ctxs, 
*_build_expr_call_timer,
+                                     _build_col_ids));
+        _build_side_mutable_block = 
MutableBlock::build_mutable_block(&tmp_build_block);
+    }
 
-        if (in_block->rows() != 0) {
-            std::vector<int> res_col_ids(_build_expr_ctxs.size());
-            RETURN_IF_ERROR(_do_evaluate(*in_block, _build_expr_ctxs, 
*_build_expr_call_timer,
-                                         res_col_ids));
-
-            SCOPED_TIMER(_build_side_merge_block_timer);
-            
RETURN_IF_ERROR(_build_side_mutable_block.merge_ignore_overflow(*in_block));
-            if (_build_side_mutable_block.rows() > JOIN_BUILD_SIZE_LIMIT) {
-                return Status::NotSupported(
-                        "Hash join do not support build table rows"
-                        " over:" +
-                        std::to_string(JOIN_BUILD_SIZE_LIMIT));
-            }
+    if (in_block->rows() != 0) {
+        std::vector<int> res_col_ids(_build_expr_ctxs.size());
+        RETURN_IF_ERROR(
+                _do_evaluate(*in_block, _build_expr_ctxs, 
*_build_expr_call_timer, res_col_ids));
+
+        SCOPED_TIMER(_build_side_merge_block_timer);
+        
RETURN_IF_ERROR(_build_side_mutable_block.merge_ignore_overflow(*in_block));
+        if (_build_side_mutable_block.rows() > JOIN_BUILD_SIZE_LIMIT) {
+            return Status::NotSupported(
+                    "Hash join do not support build table rows"
+                    " over:" +
+                    std::to_string(JOIN_BUILD_SIZE_LIMIT));
         }
     }
 
-    if (_should_build_hash_table && eos) {
+    if (eos) {
         DCHECK(!_build_side_mutable_block.empty());
         _build_block = 
std::make_shared<Block>(_build_side_mutable_block.to_block());
         COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
@@ -762,65 +743,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                 std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs, 
runtime_filters());
         RETURN_IF_ERROR(_process_build_block(state, *_build_block));
         RETURN_IF_ERROR(process_runtime_filter_build(state, 
_build_block.get(), this));
-        if (_shared_hashtable_controller) {
-            _shared_hash_table_context->status = Status::OK();
-            // arena will be shared with other instances.
-            _shared_hash_table_context->arena = _arena;
-            _shared_hash_table_context->block = _build_block;
-            _shared_hash_table_context->hash_table_variants = 
_hash_table_variants;
-            _shared_hash_table_context->short_circuit_for_null_in_probe_side =
-                    _has_null_in_build_side;
-            if (_runtime_filter_slots) {
-                
_runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
-            }
-            _shared_hashtable_controller->signal(id());
-        }
-    } else if (!_should_build_hash_table) {
-        DCHECK(_shared_hashtable_controller != nullptr);
-        DCHECK(_shared_hash_table_context != nullptr);
-        auto* wait_timer = ADD_TIMER(_build_phase_profile, 
"WaitForSharedHashTableTime");
-        SCOPED_TIMER(wait_timer);
-        RETURN_IF_ERROR(
-                _shared_hashtable_controller->wait_for_signal(state, 
_shared_hash_table_context));
-
-        _build_phase_profile->add_info_string(
-                "SharedHashTableFrom",
-                
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
-        _has_null_in_build_side = 
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
-        std::visit(
-                [](auto&& dst, auto&& src) {
-                    if constexpr (!std::is_same_v<std::monostate, 
std::decay_t<decltype(dst)>> &&
-                                  std::is_same_v<std::decay_t<decltype(src)>,
-                                                 std::decay_t<decltype(dst)>>) 
{
-                        dst.hash_table = src.hash_table;
-                    }
-                },
-                *_hash_table_variants,
-                *std::static_pointer_cast<HashTableVariants>(
-                        _shared_hash_table_context->hash_table_variants));
-        _build_block = _shared_hash_table_context->block;
-
-        if (!_shared_hash_table_context->runtime_filters.empty()) {
-            auto ret = std::visit(
-                    Overload {[&](std::monostate&) -> Status {
-                                  LOG(FATAL) << "FATAL: uninited hash table";
-                                  __builtin_unreachable();
-                              },
-                              [&](auto&& arg) -> Status {
-                                  if (_runtime_filters.empty()) {
-                                      return Status::OK();
-                                  }
-                                  _runtime_filter_slots = 
std::make_shared<VRuntimeFilterSlots>(
-                                          _build_expr_ctxs, _runtime_filters);
-
-                                  
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
-                                          _shared_hash_table_context));
-                                  
RETURN_IF_ERROR(_runtime_filter_slots->publish(true));
-                                  return Status::OK();
-                              }},
-                    *_hash_table_variants);
-            RETURN_IF_ERROR(ret);
-        }
     }
 
     if (eos) {
@@ -1104,12 +1026,7 @@ std::vector<uint16_t> 
HashJoinNode::_convert_block_to_null(Block& block) {
     return results;
 }
 
-HashJoinNode::~HashJoinNode() {
-    if (_shared_hashtable_controller && _should_build_hash_table) {
-        // signal at here is abnormal
-        _shared_hashtable_controller->signal(id(), Status::Cancelled("signaled 
in destructor"));
-    }
-}
+HashJoinNode::~HashJoinNode() = default;
 
 void HashJoinNode::_release_mem() {
     _arena = nullptr;
@@ -1118,7 +1035,6 @@ void HashJoinNode::_release_mem() {
     _null_map_column = nullptr;
     _tuple_is_null_left_flag_column = nullptr;
     _tuple_is_null_right_flag_column = nullptr;
-    _shared_hash_table_context = nullptr;
     _probe_block.clear();
 }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 20245be06c2..5a10e691d06 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -46,7 +46,6 @@
 #include "vec/core/types.h"
 #include "vec/exec/join/join_op.h" // IWYU pragma: keep
 #include "vec/exprs/vexpr_fwd.h"
-#include "vec/runtime/shared_hash_table_controller.h"
 #include "vjoin_node_base.h"
 
 template <typename T>
@@ -217,15 +216,6 @@ public:
 
     void debug_string(int indentation_level, std::stringstream* out) const 
override;
 
-    bool can_sink_write() const {
-        if (_should_build_hash_table) {
-            return true;
-        }
-        return _shared_hash_table_context && 
_shared_hash_table_context->signaled;
-    }
-
-    bool should_build_hash_table() const { return _should_build_hash_table; }
-
     bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
     bool is_right_semi_anti() const { return _is_right_semi_anti; }
     bool is_outer_join() const { return _is_outer_join; }
@@ -346,8 +336,6 @@ private:
     bool _build_side_ignore_null = false;
 
     bool _is_broadcast_join = false;
-    bool _should_build_hash_table = true;
-    std::shared_ptr<SharedHashTableController> _shared_hashtable_controller;
     std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots;
 
     std::vector<SlotId> _hash_output_slot_ids;
@@ -358,8 +346,6 @@ private:
     int64_t _build_side_last_mem_used = 0;
     MutableBlock _build_side_mutable_block;
 
-    SharedHashTableContextPtr _shared_hash_table_context = nullptr;
-
     Status _materialize_build_side(RuntimeState* state) override;
 
     Status _process_build_block(RuntimeState* state, Block& block);
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 4b1203d4822..a416ba6349e 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -25,35 +25,13 @@
 
 #include "pipeline/exec/hashjoin_build_sink.h"
 
-namespace doris {
-namespace vectorized {
+namespace doris::vectorized {
 
 void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, 
int node_id) {
     // Only need to set builder and consumers with pipeline engine enabled.
-    DCHECK(_pipeline_engine_enabled);
     std::lock_guard<std::mutex> lock(_mutex);
     DCHECK(_builder_fragment_ids.find(node_id) == 
_builder_fragment_ids.cend());
     _builder_fragment_ids.insert({node_id, builder});
-    _dependencies.insert({node_id, {}});
-    _finish_dependencies.insert({node_id, {}});
-}
-
-bool SharedHashTableController::should_build_hash_table(const TUniqueId& 
fragment_instance_id,
-                                                        int my_node_id) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    auto it = _builder_fragment_ids.find(my_node_id);
-    if (_pipeline_engine_enabled) {
-        if (it != _builder_fragment_ids.cend()) {
-            return it->second == fragment_instance_id;
-        }
-        return false;
-    }
-
-    if (it == _builder_fragment_ids.cend()) {
-        _builder_fragment_ids.insert({my_node_id, fragment_instance_id});
-        return true;
-    }
-    return false;
 }
 
 SharedHashTableContextPtr SharedHashTableController::get_context(int 
my_node_id) {
@@ -64,20 +42,6 @@ SharedHashTableContextPtr 
SharedHashTableController::get_context(int my_node_id)
     return _shared_contexts[my_node_id];
 }
 
-void SharedHashTableController::signal(int my_node_id, Status status) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    auto it = _shared_contexts.find(my_node_id);
-    if (it != _shared_contexts.cend()) {
-        it->second->signaled = true;
-        it->second->status = status;
-        _shared_contexts.erase(it);
-    }
-    for (auto& dep : _dependencies[my_node_id]) {
-        dep->set_ready();
-    }
-    _cv.notify_all();
-}
-
 void SharedHashTableController::signal(int my_node_id) {
     std::lock_guard<std::mutex> lock(_mutex);
     auto it = _shared_contexts.find(my_node_id);
@@ -108,19 +72,4 @@ TUniqueId 
SharedHashTableController::get_builder_fragment_instance_id(int my_nod
     return it->second;
 }
 
-Status SharedHashTableController::wait_for_signal(RuntimeState* state,
-                                                  const 
SharedHashTableContextPtr& context) {
-    std::unique_lock<std::mutex> lock(_mutex);
-    // maybe builder signaled before other instances waiting,
-    // so here need to check value of `signaled`
-    while (!context->signaled) {
-        _cv.wait_for(lock, std::chrono::milliseconds(400),
-                     [&]() { return context->signaled.load(); });
-        // return if the instances is cancelled(eg. query timeout)
-        RETURN_IF_CANCELLED(state);
-    }
-    return context->status;
-}
-
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index 8fe46b97b85..35f4e9334ea 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -78,19 +78,18 @@ public:
     SharedHashTableContextPtr get_context(int my_node_id);
     void signal(int my_node_id);
     void signal_finish(int my_node_id);
-    void signal(int my_node_id, Status status);
-    Status wait_for_signal(RuntimeState* state, const 
SharedHashTableContextPtr& context);
-    bool should_build_hash_table(const TUniqueId& fragment_instance_id, int 
my_node_id);
-    void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled 
= enabled; }
     void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> 
dep,
                            std::shared_ptr<pipeline::Dependency> finish_dep) {
         std::lock_guard<std::mutex> lock(_mutex);
+        if (!_dependencies.contains(node_id)) {
+            _dependencies.insert({node_id, {}});
+            _finish_dependencies.insert({node_id, {}});
+        }
         _dependencies[node_id].push_back(dep);
         _finish_dependencies[node_id].push_back(finish_dep);
     }
 
 private:
-    bool _pipeline_engine_enabled = false;
     std::mutex _mutex;
     // For pipelineX, we update all dependencies once hash table is built;
     std::map<int /*node id*/, 
std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ef071e78d39..265d3afdb35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1678,7 +1678,6 @@ public class Coordinator implements CoordInterface {
                             
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
                         } else {
                             destHosts.put(param.host, param);
-                            param.buildHashTableForBroadcastJoin = true;
                             TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
                             param.recvrId = params.destinations.size();
                             dest.fragment_instance_id = param.instanceId;
@@ -1802,7 +1801,6 @@ public class Coordinator implements CoordInterface {
                             
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
                         } else {
                             destHosts.put(param.host, param);
-                            param.buildHashTableForBroadcastJoin = true;
                             TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
                             dest.fragment_instance_id = param.instanceId;
                             param.recvrId = params.destinations.size();
@@ -3689,7 +3687,6 @@ public class Coordinator implements CoordInterface {
                 params.setFragment(fragment.toThrift());
                 params.setDescTbl(descTable);
                 params.setParams(new TPlanFragmentExecParams());
-                
params.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
                 params.params.setQueryId(queryId);
                 
params.params.setFragmentInstanceId(instanceExecParam.instanceId);
 
@@ -3848,7 +3845,6 @@ public class Coordinator implements CoordInterface {
                         params.getLocalParams().size());
                 TPipelineInstanceParams localParams = new 
TPipelineInstanceParams();
 
-                
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
                 
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
                 localParams.setPerNodeScanRanges(scanRanges);
                 localParams.setPerNodeSharedScans(perNodeSharedScans);
@@ -4002,8 +3998,6 @@ public class Coordinator implements CoordInterface {
 
         FragmentExecParams fragmentExecParams;
 
-        boolean buildHashTableForBroadcastJoin = false;
-
         int recvrId = -1;
 
         List<TUniqueId> instancesSharingHashTable = Lists.newArrayList();
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 20e2dbe5e6e..586d40b8648 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -501,6 +501,7 @@ struct TExecPlanFragmentParams {
   // Otherwise, the fragment will start executing directly on the BE side.
   20: optional bool need_wait_execution_trigger = false;
 
+  // deprecated
   21: optional bool build_hash_table_for_broadcast_join = false;
 
   22: optional list<Types.TUniqueId> instances_sharing_hash_table;
@@ -705,6 +706,7 @@ struct TExportStatusResult {
 
 struct TPipelineInstanceParams {
   1: required Types.TUniqueId fragment_instance_id
+  // deprecated
   2: optional bool build_hash_table_for_broadcast_join = false;
   3: required map<Types.TPlanNodeId, list<TScanRangeParams>> 
per_node_scan_ranges
   4: optional i32 sender_id


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to