This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 28aad017d1c [refactor](minor) delete unused shared hash table construction (#35413) 28aad017d1c is described below commit 28aad017d1c093e1ad4fda956ea9f94ed8561cfb Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon May 27 14:40:08 2024 +0800 [refactor](minor) delete unused shared hash table construction (#35413) --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +- be/src/runtime/fragment_mgr.cpp | 80 ---------- be/src/runtime/fragment_mgr.h | 10 -- be/src/vec/exec/join/vhash_join_node.cpp | 176 ++++++--------------- be/src/vec/exec/join/vhash_join_node.h | 14 -- .../vec/runtime/shared_hash_table_controller.cpp | 55 +------ be/src/vec/runtime/shared_hash_table_controller.h | 9 +- .../main/java/org/apache/doris/qe/Coordinator.java | 6 - gensrc/thrift/PaloInternalService.thrift | 2 + 9 files changed, 56 insertions(+), 300 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 90aeb7070b6..7c486fd4b45 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -57,8 +57,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _should_build_hash_table = info.task_idx == 0; if (_should_build_hash_table) { profile()->add_info_string("ShareHashTableEnabled", "true"); - CHECK(p._shared_hashtable_controller->should_build_hash_table( - state->fragment_instance_id(), p.node_id())); + p._shared_hashtable_controller->set_builder_and_consumers( + state->fragment_instance_id(), p.node_id()); } } else { profile()->add_info_string("ShareHashTableEnabled", "false"); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index cd54edd5f54..8534638f681 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -674,7 +674,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->set_rsc_info = true; } - query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline); _set_scan_concurrency(params, query_ctx.get()); const bool is_pipeline = std::is_same_v<TPipelineFragmentParams, Params>; @@ -851,7 +850,6 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, params.query_options.enable_pipeline_x_engine) || (params.query_options.__isset.enable_pipeline_engine && params.query_options.enable_pipeline_engine)); - _setup_shared_hashtable_for_broadcast_join(params, query_ctx.get()); int64_t duration_ns = 0; std::shared_ptr<pipeline::PipelineFragmentContext> context = std::make_shared<pipeline::PipelineFragmentContext>( @@ -1440,84 +1438,6 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, return merge_status; } -void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, - QueryContext* query_ctx) { - if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join || - !params.query_options.enable_share_hash_table_for_broadcast_join) { - return; - } - - if (!params.__isset.fragment || !params.fragment.__isset.plan || - params.fragment.plan.nodes.empty()) { - return; - } - for (auto& node : params.fragment.plan.nodes) { - if (node.node_type != TPlanNodeType::HASH_JOIN_NODE || - !node.hash_join_node.__isset.is_broadcast_join || - !node.hash_join_node.is_broadcast_join) { - continue; - } - - if (params.build_hash_table_for_broadcast_join) { - query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers( - params.params.fragment_instance_id, node.node_id); - } - } -} - -void FragmentMgr::_setup_shared_hashtable_for_broadcast_join( - const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params, - QueryContext* query_ctx) { - if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join || - !params.query_options.enable_share_hash_table_for_broadcast_join) { - return; - } - - if (!params.__isset.fragment || !params.fragment.__isset.plan || - params.fragment.plan.nodes.empty()) { - return; - } - for (auto& node : params.fragment.plan.nodes) { - if (node.node_type != TPlanNodeType::HASH_JOIN_NODE || - !node.hash_join_node.__isset.is_broadcast_join || - !node.hash_join_node.is_broadcast_join) { - continue; - } - - if (local_params.build_hash_table_for_broadcast_join) { - query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers( - local_params.fragment_instance_id, node.node_id); - } - } -} - -void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params, - QueryContext* query_ctx) { - if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join || - !params.query_options.enable_share_hash_table_for_broadcast_join) { - return; - } - - if (!params.__isset.fragment || !params.fragment.__isset.plan || - params.fragment.plan.nodes.empty()) { - return; - } - for (auto& node : params.fragment.plan.nodes) { - if (node.node_type != TPlanNodeType::HASH_JOIN_NODE || - !node.hash_join_node.__isset.is_broadcast_join || - !node.hash_join_node.is_broadcast_join) { - continue; - } - - for (auto& local_param : params.local_params) { - if (local_param.build_hash_table_for_broadcast_join) { - query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers( - local_param.fragment_instance_id, node.node_id); - } - } - } -} - void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) { { std::lock_guard<std::mutex> lock(_lock); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 717b6813abc..c8298ee67b7 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -162,16 +162,6 @@ private: template <typename Param> void _set_scan_concurrency(const Param& params, QueryContext* query_ctx); - void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, - QueryContext* query_ctx); - - void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params, - const TPipelineInstanceParams& local_params, - QueryContext* query_ctx); - - void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params, - QueryContext* query_ctx); - template <typename Params> Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, std::shared_ptr<QueryContext>& query_ctx); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 9fec942d161..bee0c67c4b3 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -218,19 +218,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - _should_build_hash_table = true; if (_is_broadcast_join) { runtime_profile()->add_info_string("BroadcastJoin", "true"); - if (state->enable_share_hash_table_for_broadcast_join()) { - runtime_profile()->add_info_string("ShareHashTableEnabled", "true"); - _shared_hashtable_controller = - state->get_query_ctx()->get_shared_hash_table_controller(); - _shared_hash_table_context = _shared_hashtable_controller->get_context(id()); - _should_build_hash_table = _shared_hashtable_controller->should_build_hash_table( - state->fragment_instance_id(), id()); - } else { - runtime_profile()->add_info_string("ShareHashTableEnabled", "false"); - } } _memory_usage_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage"); @@ -245,8 +234,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { "ProbeKeyArena", TUnit::BYTES, "MemoryUsage"); // Build phase - auto* record_profile = - _should_build_hash_table ? _build_phase_profile : faker_runtime_profile(); + auto* record_profile = _build_phase_profile; _build_get_next_timer = ADD_TIMER(record_profile, "BuildGetNextTime"); _build_timer = ADD_TIMER(record_profile, "BuildTime"); _build_rows_counter = ADD_COUNTER(record_profile, "BuildRows", TUnit::UNIT); @@ -691,31 +679,26 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { SCOPED_TIMER(_build_get_next_timer); RETURN_IF_ERROR(child(1)->open(state)); } - if (_should_build_hash_table) { - bool eos = false; - Block block; - // If eos or have already met a null value using short-circuit strategy, we do not need to pull - // data from data. - while (!eos && (!_short_circuit_for_null_in_build_side || !_has_null_in_build_side) && - (!_probe_open_finish || !_is_hash_join_early_start_probe_eos(state))) { - release_block_memory(block, 1); - RETURN_IF_CANCELLED(state); - { - SCOPED_TIMER(_build_get_next_timer); - RETURN_IF_ERROR(child(1)->get_next_after_projects( - state, &block, &eos, - std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & - ExecNode::get_next, - _children[1], std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3))); - } - RETURN_IF_ERROR(sink(state, &block, eos)); + bool eos = false; + Block block; + // If eos or have already met a null value using short-circuit strategy, we do not need to pull + // data from data. + while (!eos && (!_short_circuit_for_null_in_build_side || !_has_null_in_build_side) && + (!_probe_open_finish || !_is_hash_join_early_start_probe_eos(state))) { + release_block_memory(block, 1); + RETURN_IF_CANCELLED(state); + { + SCOPED_TIMER(_build_get_next_timer); + RETURN_IF_ERROR(child(1)->get_next_after_projects( + state, &block, &eos, + std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & + ExecNode::get_next, + _children[1], std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3))); } - RETURN_IF_ERROR(child(1)->close(state)); - } else { - RETURN_IF_ERROR(child(1)->close(state)); - RETURN_IF_ERROR(sink(state, nullptr, true)); + RETURN_IF_ERROR(sink(state, &block, eos)); } + RETURN_IF_ERROR(child(1)->close(state)); return Status::OK(); } @@ -723,38 +706,36 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_build_timer); - if (_should_build_hash_table) { - // If eos or have already met a null value using short-circuit strategy, we do not need to pull - // data from probe side. - _build_side_mem_used += in_block->allocated_bytes(); - - if (_build_side_mutable_block.empty()) { - auto tmp_build_block = - VectorizedUtils::create_empty_columnswithtypename(child(1)->row_desc()); - tmp_build_block = *(tmp_build_block.create_same_struct_block(1, false)); - _build_col_ids.resize(_build_expr_ctxs.size()); - RETURN_IF_ERROR(_do_evaluate(tmp_build_block, _build_expr_ctxs, *_build_expr_call_timer, - _build_col_ids)); - _build_side_mutable_block = MutableBlock::build_mutable_block(&tmp_build_block); - } + // If eos or have already met a null value using short-circuit strategy, we do not need to pull + // data from probe side. + _build_side_mem_used += in_block->allocated_bytes(); + + if (_build_side_mutable_block.empty()) { + auto tmp_build_block = + VectorizedUtils::create_empty_columnswithtypename(child(1)->row_desc()); + tmp_build_block = *(tmp_build_block.create_same_struct_block(1, false)); + _build_col_ids.resize(_build_expr_ctxs.size()); + RETURN_IF_ERROR(_do_evaluate(tmp_build_block, _build_expr_ctxs, *_build_expr_call_timer, + _build_col_ids)); + _build_side_mutable_block = MutableBlock::build_mutable_block(&tmp_build_block); + } - if (in_block->rows() != 0) { - std::vector<int> res_col_ids(_build_expr_ctxs.size()); - RETURN_IF_ERROR(_do_evaluate(*in_block, _build_expr_ctxs, *_build_expr_call_timer, - res_col_ids)); - - SCOPED_TIMER(_build_side_merge_block_timer); - RETURN_IF_ERROR(_build_side_mutable_block.merge_ignore_overflow(*in_block)); - if (_build_side_mutable_block.rows() > JOIN_BUILD_SIZE_LIMIT) { - return Status::NotSupported( - "Hash join do not support build table rows" - " over:" + - std::to_string(JOIN_BUILD_SIZE_LIMIT)); - } + if (in_block->rows() != 0) { + std::vector<int> res_col_ids(_build_expr_ctxs.size()); + RETURN_IF_ERROR( + _do_evaluate(*in_block, _build_expr_ctxs, *_build_expr_call_timer, res_col_ids)); + + SCOPED_TIMER(_build_side_merge_block_timer); + RETURN_IF_ERROR(_build_side_mutable_block.merge_ignore_overflow(*in_block)); + if (_build_side_mutable_block.rows() > JOIN_BUILD_SIZE_LIMIT) { + return Status::NotSupported( + "Hash join do not support build table rows" + " over:" + + std::to_string(JOIN_BUILD_SIZE_LIMIT)); } } - if (_should_build_hash_table && eos) { + if (eos) { DCHECK(!_build_side_mutable_block.empty()); _build_block = std::make_shared<Block>(_build_side_mutable_block.to_block()); COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes()); @@ -762,65 +743,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs, runtime_filters()); RETURN_IF_ERROR(_process_build_block(state, *_build_block)); RETURN_IF_ERROR(process_runtime_filter_build(state, _build_block.get(), this)); - if (_shared_hashtable_controller) { - _shared_hash_table_context->status = Status::OK(); - // arena will be shared with other instances. - _shared_hash_table_context->arena = _arena; - _shared_hash_table_context->block = _build_block; - _shared_hash_table_context->hash_table_variants = _hash_table_variants; - _shared_hash_table_context->short_circuit_for_null_in_probe_side = - _has_null_in_build_side; - if (_runtime_filter_slots) { - _runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); - } - _shared_hashtable_controller->signal(id()); - } - } else if (!_should_build_hash_table) { - DCHECK(_shared_hashtable_controller != nullptr); - DCHECK(_shared_hash_table_context != nullptr); - auto* wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime"); - SCOPED_TIMER(wait_timer); - RETURN_IF_ERROR( - _shared_hashtable_controller->wait_for_signal(state, _shared_hash_table_context)); - - _build_phase_profile->add_info_string( - "SharedHashTableFrom", - print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); - _has_null_in_build_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side; - std::visit( - [](auto&& dst, auto&& src) { - if constexpr (!std::is_same_v<std::monostate, std::decay_t<decltype(dst)>> && - std::is_same_v<std::decay_t<decltype(src)>, - std::decay_t<decltype(dst)>>) { - dst.hash_table = src.hash_table; - } - }, - *_hash_table_variants, - *std::static_pointer_cast<HashTableVariants>( - _shared_hash_table_context->hash_table_variants)); - _build_block = _shared_hash_table_context->block; - - if (!_shared_hash_table_context->runtime_filters.empty()) { - auto ret = std::visit( - Overload {[&](std::monostate&) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) -> Status { - if (_runtime_filters.empty()) { - return Status::OK(); - } - _runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>( - _build_expr_ctxs, _runtime_filters); - - RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context( - _shared_hash_table_context)); - RETURN_IF_ERROR(_runtime_filter_slots->publish(true)); - return Status::OK(); - }}, - *_hash_table_variants); - RETURN_IF_ERROR(ret); - } } if (eos) { @@ -1104,12 +1026,7 @@ std::vector<uint16_t> HashJoinNode::_convert_block_to_null(Block& block) { return results; } -HashJoinNode::~HashJoinNode() { - if (_shared_hashtable_controller && _should_build_hash_table) { - // signal at here is abnormal - _shared_hashtable_controller->signal(id(), Status::Cancelled("signaled in destructor")); - } -} +HashJoinNode::~HashJoinNode() = default; void HashJoinNode::_release_mem() { _arena = nullptr; @@ -1118,7 +1035,6 @@ void HashJoinNode::_release_mem() { _null_map_column = nullptr; _tuple_is_null_left_flag_column = nullptr; _tuple_is_null_right_flag_column = nullptr; - _shared_hash_table_context = nullptr; _probe_block.clear(); } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 20245be06c2..5a10e691d06 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -46,7 +46,6 @@ #include "vec/core/types.h" #include "vec/exec/join/join_op.h" // IWYU pragma: keep #include "vec/exprs/vexpr_fwd.h" -#include "vec/runtime/shared_hash_table_controller.h" #include "vjoin_node_base.h" template <typename T> @@ -217,15 +216,6 @@ public: void debug_string(int indentation_level, std::stringstream* out) const override; - bool can_sink_write() const { - if (_should_build_hash_table) { - return true; - } - return _shared_hash_table_context && _shared_hash_table_context->signaled; - } - - bool should_build_hash_table() const { return _should_build_hash_table; } - bool have_other_join_conjunct() const { return _have_other_join_conjunct; } bool is_right_semi_anti() const { return _is_right_semi_anti; } bool is_outer_join() const { return _is_outer_join; } @@ -346,8 +336,6 @@ private: bool _build_side_ignore_null = false; bool _is_broadcast_join = false; - bool _should_build_hash_table = true; - std::shared_ptr<SharedHashTableController> _shared_hashtable_controller; std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots; std::vector<SlotId> _hash_output_slot_ids; @@ -358,8 +346,6 @@ private: int64_t _build_side_last_mem_used = 0; MutableBlock _build_side_mutable_block; - SharedHashTableContextPtr _shared_hash_table_context = nullptr; - Status _materialize_build_side(RuntimeState* state) override; Status _process_build_block(RuntimeState* state, Block& block); diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 4b1203d4822..a416ba6349e 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -25,35 +25,13 @@ #include "pipeline/exec/hashjoin_build_sink.h" -namespace doris { -namespace vectorized { +namespace doris::vectorized { void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int node_id) { // Only need to set builder and consumers with pipeline engine enabled. - DCHECK(_pipeline_engine_enabled); std::lock_guard<std::mutex> lock(_mutex); DCHECK(_builder_fragment_ids.find(node_id) == _builder_fragment_ids.cend()); _builder_fragment_ids.insert({node_id, builder}); - _dependencies.insert({node_id, {}}); - _finish_dependencies.insert({node_id, {}}); -} - -bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id, - int my_node_id) { - std::lock_guard<std::mutex> lock(_mutex); - auto it = _builder_fragment_ids.find(my_node_id); - if (_pipeline_engine_enabled) { - if (it != _builder_fragment_ids.cend()) { - return it->second == fragment_instance_id; - } - return false; - } - - if (it == _builder_fragment_ids.cend()) { - _builder_fragment_ids.insert({my_node_id, fragment_instance_id}); - return true; - } - return false; } SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) { @@ -64,20 +42,6 @@ SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) return _shared_contexts[my_node_id]; } -void SharedHashTableController::signal(int my_node_id, Status status) { - std::lock_guard<std::mutex> lock(_mutex); - auto it = _shared_contexts.find(my_node_id); - if (it != _shared_contexts.cend()) { - it->second->signaled = true; - it->second->status = status; - _shared_contexts.erase(it); - } - for (auto& dep : _dependencies[my_node_id]) { - dep->set_ready(); - } - _cv.notify_all(); -} - void SharedHashTableController::signal(int my_node_id) { std::lock_guard<std::mutex> lock(_mutex); auto it = _shared_contexts.find(my_node_id); @@ -108,19 +72,4 @@ TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int my_nod return it->second; } -Status SharedHashTableController::wait_for_signal(RuntimeState* state, - const SharedHashTableContextPtr& context) { - std::unique_lock<std::mutex> lock(_mutex); - // maybe builder signaled before other instances waiting, - // so here need to check value of `signaled` - while (!context->signaled) { - _cv.wait_for(lock, std::chrono::milliseconds(400), - [&]() { return context->signaled.load(); }); - // return if the instances is cancelled(eg. query timeout) - RETURN_IF_CANCELLED(state); - } - return context->status; -} - -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 8fe46b97b85..35f4e9334ea 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -78,19 +78,18 @@ public: SharedHashTableContextPtr get_context(int my_node_id); void signal(int my_node_id); void signal_finish(int my_node_id); - void signal(int my_node_id, Status status); - Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context); - bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id); - void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled = enabled; } void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> dep, std::shared_ptr<pipeline::Dependency> finish_dep) { std::lock_guard<std::mutex> lock(_mutex); + if (!_dependencies.contains(node_id)) { + _dependencies.insert({node_id, {}}); + _finish_dependencies.insert({node_id, {}}); + } _dependencies[node_id].push_back(dep); _finish_dependencies[node_id].push_back(finish_dep); } private: - bool _pipeline_engine_enabled = false; std::mutex _mutex; // For pipelineX, we update all dependencies once hash table is built; std::map<int /*node id*/, std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ef071e78d39..265d3afdb35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1678,7 +1678,6 @@ public class Coordinator implements CoordInterface { destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId); } else { destHosts.put(param.host, param); - param.buildHashTableForBroadcastJoin = true; TPlanFragmentDestination dest = new TPlanFragmentDestination(); param.recvrId = params.destinations.size(); dest.fragment_instance_id = param.instanceId; @@ -1802,7 +1801,6 @@ public class Coordinator implements CoordInterface { destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId); } else { destHosts.put(param.host, param); - param.buildHashTableForBroadcastJoin = true; TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = param.instanceId; param.recvrId = params.destinations.size(); @@ -3689,7 +3687,6 @@ public class Coordinator implements CoordInterface { params.setFragment(fragment.toThrift()); params.setDescTbl(descTable); params.setParams(new TPlanFragmentExecParams()); - params.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin); params.params.setQueryId(queryId); params.params.setFragmentInstanceId(instanceExecParam.instanceId); @@ -3848,7 +3845,6 @@ public class Coordinator implements CoordInterface { params.getLocalParams().size()); TPipelineInstanceParams localParams = new TPipelineInstanceParams(); - localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin); localParams.setFragmentInstanceId(instanceExecParam.instanceId); localParams.setPerNodeScanRanges(scanRanges); localParams.setPerNodeSharedScans(perNodeSharedScans); @@ -4002,8 +3998,6 @@ public class Coordinator implements CoordInterface { FragmentExecParams fragmentExecParams; - boolean buildHashTableForBroadcastJoin = false; - int recvrId = -1; List<TUniqueId> instancesSharingHashTable = Lists.newArrayList(); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 20e2dbe5e6e..586d40b8648 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -501,6 +501,7 @@ struct TExecPlanFragmentParams { // Otherwise, the fragment will start executing directly on the BE side. 20: optional bool need_wait_execution_trigger = false; + // deprecated 21: optional bool build_hash_table_for_broadcast_join = false; 22: optional list<Types.TUniqueId> instances_sharing_hash_table; @@ -705,6 +706,7 @@ struct TExportStatusResult { struct TPipelineInstanceParams { 1: required Types.TUniqueId fragment_instance_id + // deprecated 2: optional bool build_hash_table_for_broadcast_join = false; 3: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges 4: optional i32 sender_id --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org