This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 89fbda1dacc [refactor](join) Refine broadcast controller (#49556) 89fbda1dacc is described below commit 89fbda1dacc1510327ab559e9460bda20b7c64dd Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Mon Mar 31 11:15:49 2025 +0800 [refactor](join) Refine broadcast controller (#49556) Broadcast controller could be removed on pipelineX engine since we use operator and fragment to manage global state shared between instances. --- be/src/pipeline/dependency.cpp | 10 ++ be/src/pipeline/dependency.h | 51 ++++---- be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 128 +++++++++------------ be/src/pipeline/exec/hashjoin_build_sink.h | 22 ++-- be/src/pipeline/exec/hashjoin_probe_operator.cpp | 44 ++++++- be/src/pipeline/exec/hashjoin_probe_operator.h | 5 +- be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 3 +- .../pipeline/exec/memory_scratch_sink_operator.cpp | 3 +- be/src/pipeline/exec/operator.cpp | 30 ++++- be/src/pipeline/exec/operator.h | 8 +- .../exec/partitioned_hash_join_sink_operator.cpp | 3 +- be/src/pipeline/exec/result_file_sink_operator.cpp | 6 +- be/src/pipeline/exec/result_sink_operator.cpp | 3 +- be/src/pipeline/pipeline_fragment_context.cpp | 65 ++++++----- be/src/pipeline/pipeline_fragment_context.h | 18 ++- be/src/pipeline/pipeline_task.cpp | 26 ++--- be/src/pipeline/pipeline_task.h | 11 +- be/src/runtime/fragment_mgr.cpp | 1 - be/src/runtime/query_context.cpp | 2 - be/src/runtime/query_context.h | 6 - be/src/runtime_filter/runtime_filter.h | 2 + be/src/runtime_filter/runtime_filter_producer.h | 10 -- .../runtime_filter_producer_helper.cpp | 12 +- .../runtime_filter_producer_helper.h | 7 +- .../vec/runtime/shared_hash_table_controller.cpp | 70 ----------- be/src/vec/runtime/shared_hash_table_controller.h | 96 ---------------- be/test/olap/wal/wal_manager_test.cpp | 9 +- be/test/pipeline/local_exchanger_test.cpp | 12 +- be/test/pipeline/operator/agg_operator_test.cpp | 4 +- ...istinct_streaming_aggregation_operator_test.cpp | 2 +- .../operator/exchange_sink_operator_test.cpp | 2 +- .../operator/exchange_source_operator_test.cpp | 2 +- .../local_merge_sort_source_operator_test.cpp | 2 +- .../partitioned_aggregation_sink_operator_test.cpp | 14 +-- ...artitioned_aggregation_source_operator_test.cpp | 18 +-- .../partitioned_aggregation_test_helper.cpp | 7 +- .../partitioned_hash_join_probe_operator_test.cpp | 5 +- .../partitioned_hash_join_sink_operator_test.cpp | 8 +- .../operator/partitioned_hash_join_test_helper.cpp | 7 +- be/test/pipeline/operator/repeat_operator_test.cpp | 2 +- be/test/pipeline/operator/set_operator_test.cpp | 6 +- be/test/pipeline/operator/sort_operator_test.cpp | 4 +- .../operator/spill_sort_sink_operator_test.cpp | 10 +- .../operator/spill_sort_source_operator_test.cpp | 10 +- .../pipeline/operator/spill_sort_test_helper.cpp | 7 +- be/test/pipeline/operator/union_operator_test.cpp | 6 +- be/test/pipeline/pipeline_test.cpp | 16 +-- .../runtime_filter_producer_helper_test.cpp | 19 ++- be/test/vec/exec/vfile_scanner_exception_test.cpp | 9 +- 50 files changed, 368 insertions(+), 457 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 76b950f63a2..276d4d0a0c1 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -33,6 +33,7 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" + Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id, const std::string& name) { source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY")); @@ -40,6 +41,15 @@ Dependency* BasicSharedState::create_source_dependency(int operator_id, int node return source_deps.back().get(); } +void BasicSharedState::create_source_dependencies(int num_sources, int operator_id, int node_id, + const std::string& name) { + source_deps.resize(num_sources, nullptr); + for (auto& source_dep : source_deps) { + source_dep = std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY"); + source_dep->set_shared_state(this); + } +} + Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, const std::string& name) { sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name + "_DEPENDENCY", true)); diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index e31274a8791..0bf0e404253 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -84,8 +84,13 @@ struct BasicSharedState { virtual ~BasicSharedState() = default; Dependency* create_source_dependency(int operator_id, int node_id, const std::string& name); - + void create_source_dependencies(int num_sources, int operator_id, int node_id, + const std::string& name); Dependency* create_sink_dependency(int dest_id, int node_id, const std::string& name); + std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) { + DCHECK_LT(channel_id, source_deps.size()); + return {source_deps[channel_id]}; + } }; class Dependency : public std::enable_shared_from_this<Dependency> { @@ -110,11 +115,10 @@ public: [[nodiscard]] Dependency* is_blocked_by(PipelineTask* task = nullptr); // Notify downstream pipeline tasks this dependency is ready. virtual void set_ready(); - void set_ready_to_read() { - DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string(); - _shared_state->source_deps.front()->set_ready(); + void set_ready_to_read(int channel_id = 0) { + DCHECK_LT(channel_id, _shared_state->source_deps.size()) << debug_string(); + _shared_state->source_deps[channel_id]->set_ready(); } - void set_ready_to_write() { DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string(); _shared_state->sink_deps.front()->set_ready(); @@ -593,19 +597,32 @@ struct JoinSharedState : public BasicSharedState { struct HashJoinSharedState : public JoinSharedState { ENABLE_FACTORY_CREATOR(HashJoinSharedState) - // mark the join column whether support null eq - std::vector<bool> is_null_safe_eq_join; - - // mark the build hash table whether it needs to store null value - std::vector<bool> serialize_null_into_key; + HashJoinSharedState() { + hash_table_variant_vector.push_back(std::make_shared<JoinDataVariants>()); + } + HashJoinSharedState(int num_instances) { + source_deps.resize(num_instances, nullptr); + hash_table_variant_vector.resize(num_instances, nullptr); + for (int i = 0; i < num_instances; i++) { + hash_table_variant_vector[i] = std::make_shared<JoinDataVariants>(); + } + } std::shared_ptr<vectorized::Arena> arena = std::make_shared<vectorized::Arena>(); - // maybe share hash table with other fragment instances - std::shared_ptr<JoinDataVariants> hash_table_variants = std::make_shared<JoinDataVariants>(); const std::vector<TupleDescriptor*> build_side_child_desc; size_t build_exprs_size = 0; std::shared_ptr<vectorized::Block> build_block; std::shared_ptr<std::vector<uint32_t>> build_indexes_null; + + // Used by shared hash table + // For probe operator, hash table in _hash_table_variants is read-only if visited flags is not + // used. (visited flags will be used only in right / full outer join). + // + // For broadcast join, although hash table is read-only, some states in `_hash_table_variants` + // are still could be written. For example, serialized keys will be written in a continuous + // memory in `_hash_table_variants`. So before execution, we should use a local _hash_table_variants + // which has a shared hash table in it. + std::vector<std::shared_ptr<JoinDataVariants>> hash_table_variant_vector; }; struct PartitionedHashJoinSharedState @@ -750,13 +767,6 @@ public: std::atomic<size_t> _buffer_mem_limit = config::local_exchange_buffer_mem_limit; // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue. std::mutex le_lock; - void create_dependencies(int local_exchange_id) { - for (auto& source_dep : source_deps) { - source_dep = std::make_shared<Dependency>(local_exchange_id, local_exchange_id, - "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY"); - source_dep->set_shared_state(this); - } - } void sub_running_sink_operators(); void sub_running_source_operators(); void _set_always_ready() { @@ -770,9 +780,6 @@ public: } } - std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) { - return {source_deps[channel_id]}; - } Dependency* get_sink_dep_by_channel_id(int channel_id) { return nullptr; } void set_ready_to_read(int channel_id) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 44f1b00f889..b105985e11a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -267,7 +267,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( RuntimeState* state, const RowDescriptor& row_desc, int operator_id, const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations, const std::vector<TUniqueId>& fragment_instance_ids) - : DataSinkOperatorX(operator_id, sink.dest_node_id, 0), + : DataSinkOperatorX(operator_id, sink.dest_node_id, std::numeric_limits<int>::max()), _texprs(sink.output_partition.partition_exprs), _row_desc(row_desc), _part_type(sink.output_partition.type), diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 15153d1df40..56a6a1a3784 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -43,11 +43,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); + _task_idx = info.task_idx; auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); _shared_state->join_op_variants = p._join_op_variants; - _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join; - _shared_state->serialize_null_into_key = p._serialize_null_into_key; _build_expr_ctxs.resize(p._build_expr_ctxs.size()); for (size_t i = 0; i < _build_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._build_expr_ctxs[i]->clone(state, _build_expr_ctxs[i])); @@ -56,24 +55,20 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _should_build_hash_table = true; profile()->add_info_string("BroadcastJoin", std::to_string(p._is_broadcast_join)); - if (p._is_broadcast_join) { - if (state->enable_share_hash_table_for_broadcast_join()) { - _should_build_hash_table = info.task_idx == 0; - if (_should_build_hash_table) { - p._shared_hashtable_controller->set_builder_and_consumers( - state->fragment_instance_id(), p.node_id()); - } - } + if (p._use_shared_hash_table) { + _should_build_hash_table = info.task_idx == 0; } profile()->add_info_string("BuildShareHashTable", std::to_string(_should_build_hash_table)); - profile()->add_info_string("ShareHashTableEnabled", - std::to_string(state->enable_share_hash_table_for_broadcast_join())); + profile()->add_info_string("ShareHashTableEnabled", std::to_string(p._use_shared_hash_table)); if (!_should_build_hash_table) { _dependency->block(); _finish_dependency->block(); - p._shared_hashtable_controller->append_dependency(p.node_id(), - _dependency->shared_from_this(), - _finish_dependency->shared_from_this()); + { + std::lock_guard<std::mutex> guard(p._mutex); + p._finish_dependencies.push_back(_finish_dependency); + } + } else { + _dependency->set_ready(); } _build_blocks_memory_usage = @@ -187,7 +182,7 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo raw_ptrs, block.rows(), true, true, bucket_size); }}, - _shared_state->hash_table_variants->method_variant); + _shared_state->hash_table_variant_vector.front()->method_variant); } } return size_to_reserve; @@ -197,7 +192,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu if (_closed) { return Status::OK(); } - auto p = _parent->cast<HashJoinBuildSinkOperatorX>(); + auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); Defer defer {[&]() { if (!_should_build_hash_table) { return; @@ -211,12 +206,19 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu if (_shared_state->build_block) { // release the memory of unused column in probe stage - _shared_state->build_block->clear_column_mem_not_keep( - p._should_keep_column_flags, bool(p._shared_hashtable_controller)); + _shared_state->build_block->clear_column_mem_not_keep(p._should_keep_column_flags, + p._use_shared_hash_table); } - if (p._shared_hashtable_controller) { - p._shared_hashtable_controller->signal_finish(p.node_id()); + if (p._use_shared_hash_table) { + std::unique_lock(p._mutex); + p._signaled = true; + for (auto& dep : _shared_state->sink_deps) { + dep->set_ready(); + } + for (auto& dep : p._finish_dependencies) { + dep->set_ready(); + } } }}; @@ -226,11 +228,11 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu try { RETURN_IF_ERROR(_runtime_filter_producer_helper->process( - state, _shared_state->build_block.get(), p._shared_hash_table_context)); + state, _shared_state->build_block.get(), p._use_shared_hash_table, + p._runtime_filters)); } catch (Exception& e) { - bool blocked_by_shared_hash_table_signal = !_should_build_hash_table && - p._shared_hashtable_controller && - !p._shared_hash_table_context->signaled; + bool blocked_by_shared_hash_table_signal = + !_should_build_hash_table && p._use_shared_hash_table && !p._signaled; return Status::InternalError( "rf process meet error: {}, wake_up_early: {}, should_build_hash_table: " @@ -309,15 +311,18 @@ std::vector<uint16_t> HashJoinBuildSinkLocalState::_convert_block_to_null( Status HashJoinBuildSinkLocalState::_extract_join_column( vectorized::Block& block, vectorized::ColumnUInt8::MutablePtr& null_map, vectorized::ColumnRawPtrs& raw_ptrs, const std::vector<int>& res_col_ids) { + DCHECK(_should_build_hash_table); auto& shared_state = *_shared_state; for (size_t i = 0; i < shared_state.build_exprs_size; ++i) { const auto* column = block.get_by_position(res_col_ids[i]).column.get(); - if (!column->is_nullable() && shared_state.serialize_null_into_key[i]) { + if (!column->is_nullable() && + _parent->cast<HashJoinBuildSinkOperatorX>()._serialize_null_into_key[i]) { _key_columns_holder.emplace_back( vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column)); raw_ptrs[i] = _key_columns_holder.back().get(); } else if (const auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column); - !shared_state.serialize_null_into_key[i] && nullable) { + !_parent->cast<HashJoinBuildSinkOperatorX>()._serialize_null_into_key[i] && + nullable) { // update nulllmap and split nested out of ColumnNullable when serialize_null_into_key is false and column is nullable const auto& col_nested = nullable->get_nested_column(); const auto& col_nullmap = nullable->get_null_map_data(); @@ -333,6 +338,7 @@ Status HashJoinBuildSinkLocalState::_extract_join_column( Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, vectorized::Block& block) { + DCHECK(_should_build_hash_table); auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); SCOPED_TIMER(_build_table_timer); size_t rows = block.rows(); @@ -399,7 +405,8 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, arg.serialized_keys_size(true))); return st; }}, - _shared_state->hash_table_variants->method_variant, _shared_state->join_op_variants, + _shared_state->hash_table_variant_vector.front()->method_variant, + _shared_state->join_op_variants, vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side), vectorized::make_bool_variant((p._have_other_join_conjunct))); return st; @@ -407,6 +414,7 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, void HashJoinBuildSinkLocalState::_set_build_side_has_external_nullmap( vectorized::Block& block, const std::vector<int>& res_col_ids) { + DCHECK(_should_build_hash_table); auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); if (p._short_circuit_for_null_in_build_side) { _build_side_has_external_nullmap = true; @@ -414,7 +422,7 @@ void HashJoinBuildSinkLocalState::_set_build_side_has_external_nullmap( } for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { const auto* column = block.get_by_position(res_col_ids[i]).column.get(); - if (column->is_nullable() && !_shared_state->serialize_null_into_key[i]) { + if (column->is_nullable() && !p._serialize_null_into_key[i]) { _build_side_has_external_nullmap = true; return; } @@ -438,8 +446,10 @@ Status HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { if (_build_expr_ctxs.size() == 1) { p._should_keep_hash_key_column = true; } - return init_hash_method<JoinDataVariants>(_shared_state->hash_table_variants.get(), data_types, - true); + return init_hash_method<JoinDataVariants>( + _shared_state->hash_table_variant_vector[p._use_shared_hash_table ? _task_idx : 0] + .get(), + data_types, true); } HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, @@ -514,13 +524,8 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::prepare(state)); - if (_is_broadcast_join) { - if (state->enable_share_hash_table_for_broadcast_join()) { - _shared_hashtable_controller = - state->get_query_ctx()->get_shared_hash_table_controller(); - _shared_hash_table_context = _shared_hashtable_controller->get_context(node_id()); - } - } + _use_shared_hash_table = + _is_broadcast_join && state->enable_share_hash_table_for_broadcast_join(); auto init_keep_column_flags = [&](auto& tuple_descs, auto& output_slot_flags) { for (const auto& tuple_desc : tuple_descs) { for (const auto& slot_desc : tuple_desc->slots()) { @@ -593,39 +598,18 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR( local_state.process_build_block(state, (*local_state._shared_state->build_block))); - if (_shared_hashtable_controller) { - _shared_hash_table_context->status = Status::OK(); - // arena will be shared with other instances. - _shared_hash_table_context->arena = local_state._shared_state->arena; - _shared_hash_table_context->hash_table_variants = - local_state._shared_state->hash_table_variants; - _shared_hash_table_context->short_circuit_for_null_in_probe_side = - local_state._shared_state->_has_null_in_build_side; - _shared_hash_table_context->block = local_state._shared_state->build_block; - _shared_hash_table_context->build_indexes_null = - local_state._shared_state->build_indexes_null; - } + local_state.init_short_circuit_for_probe(); } else if (!local_state._should_build_hash_table) { - DCHECK(_shared_hashtable_controller != nullptr); - DCHECK(_shared_hash_table_context != nullptr); // the instance which is not build hash table, it's should wait the signal of hash table build finished. // but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit // return eof will make task marked as wake_up_early // todo: remove signaled after we can guarantee that wake up eraly is always set accurately - if (!_shared_hash_table_context->signaled || state->get_task()->wake_up_early()) { + if (!_signaled || state->get_task()->wake_up_early()) { return Status::Error<ErrorCode::END_OF_FILE>("source have closed"); } - if (!_shared_hash_table_context->status.ok()) { - return _shared_hash_table_context->status; - } - - local_state.profile()->add_info_string( - "SharedHashTableFrom", - print_id( - _shared_hashtable_controller->get_builder_fragment_instance_id(node_id()))); - local_state._shared_state->_has_null_in_build_side = - _shared_hash_table_context->short_circuit_for_null_in_probe_side; + DCHECK_LE(local_state._task_idx, + local_state._shared_state->hash_table_variant_vector.size()); std::visit( [](auto&& dst, auto&& src) { if constexpr (!std::is_same_v<std::monostate, std::decay_t<decltype(dst)>> && @@ -634,20 +618,18 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* dst.hash_table = src.hash_table; } }, - local_state._shared_state->hash_table_variants->method_variant, - std::static_pointer_cast<JoinDataVariants>( - _shared_hash_table_context->hash_table_variants) - ->method_variant); - - local_state._shared_state->build_block = _shared_hash_table_context->block; - local_state._shared_state->build_indexes_null = - _shared_hash_table_context->build_indexes_null; + local_state._shared_state->hash_table_variant_vector[local_state._task_idx] + ->method_variant, + local_state._shared_state->hash_table_variant_vector.front()->method_variant); } if (eos) { local_state._eos = true; - local_state.init_short_circuit_for_probe(); - local_state._dependency->set_ready_to_read(); + // If a shared hash table is used, states are shared by all tasks. + // Sink and source has n-n relationship If a shared hash table is used otherwise 1-1 relationship. + // So we should notify the `_task_idx` source task if a shared hash table is used. + local_state._dependency->set_ready_to_read(_use_shared_hash_table ? local_state._task_idx + : 0); } return Status::OK(); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 7ac62160bbd..ca743069726 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -54,8 +54,6 @@ public: Status close(RuntimeState* state, Status exec_status) override; - Status disable_runtime_filters(RuntimeState* state); - [[nodiscard]] MOCK_FUNCTION size_t get_reserve_mem_size(RuntimeState* state, bool eos); protected: @@ -82,6 +80,7 @@ protected: size_t _evaluate_mem_usage = 0; size_t _build_side_rows = 0; + int _task_idx; vectorized::MutableBlock _build_side_mutable_block; std::shared_ptr<RuntimeFilterProducerHelper> _runtime_filter_producer_helper; @@ -154,6 +153,7 @@ public: return _join_distribution != TJoinDistributionType::BROADCAST && _join_distribution != TJoinDistributionType::NONE; } + std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; } private: friend class HashJoinBuildSinkLocalState; @@ -168,9 +168,6 @@ private: std::vector<bool> _is_null_safe_eq_join; bool _is_broadcast_join = false; - std::shared_ptr<vectorized::SharedHashTableController> _shared_hashtable_controller; - - vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr; const std::vector<TExpr> _partition_exprs; std::vector<SlotId> _hash_output_slot_ids; @@ -179,6 +176,12 @@ private: // if build side has variant column and need output variant column // need to finalize variant column to speed up the join op bool _need_finalize_variant_column = false; + + bool _use_shared_hash_table = false; + std::atomic<bool> _signaled = false; + std::mutex _mutex; + std::vector<std::shared_ptr<pipeline::Dependency>> _finish_dependencies; + std::map<int, std::shared_ptr<RuntimeFilterWrapper>> _runtime_filters; }; template <class HashTableContext> @@ -226,8 +229,13 @@ struct ProcessHashTableBuild { with_other_conjuncts) { // null aware join with other conjuncts keep_null_key = true; - } else if (_parent->_shared_state->is_null_safe_eq_join.size() == 1 && - _parent->_shared_state->is_null_safe_eq_join[0]) { + } else if (_parent->parent() + ->cast<HashJoinBuildSinkOperatorX>() + .is_null_safe_eq_join() + .size() == 1 && + _parent->parent() + ->cast<HashJoinBuildSinkOperatorX>() + .is_null_safe_eq_join()[0]) { // single null safe eq keep_null_key = true; } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 9d8f3544ba2..ab52f01fa5b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -38,6 +38,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); + _task_idx = info.task_idx; auto& p = _parent->cast<HashJoinProbeOperatorX>(); _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) { @@ -124,7 +125,8 @@ bool HashJoinProbeLocalState::_need_probe_null_map(vectorized::Block& block, const std::vector<int>& res_col_ids) { for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { const auto* column = block.get_by_position(res_col_ids[i]).column.get(); - if (column->is_nullable() && !_shared_state->serialize_null_into_key[i]) { + if (column->is_nullable() && + !_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) { return true; } } @@ -239,7 +241,11 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc st = Status::InternalError("uninited hash table probe"); } }, - local_state._shared_state->hash_table_variants->method_variant, + local_state._shared_state->hash_table_variant_vector.size() == 1 + ? local_state._shared_state->hash_table_variant_vector[0]->method_variant + : local_state._shared_state + ->hash_table_variant_vector[local_state._task_idx] + ->method_variant, *local_state._process_hashtable_ctx_variants); } else if (local_state._probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { @@ -258,7 +264,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc st = Status::InternalError("uninited hash table probe"); } }, - local_state._shared_state->hash_table_variants->method_variant, + local_state._shared_state->hash_table_variant_vector.size() == 1 + ? local_state._shared_state->hash_table_variant_vector[0] + ->method_variant + : local_state._shared_state + ->hash_table_variant_vector[local_state._task_idx] + ->method_variant, *local_state._process_hashtable_ctx_variants); } else { *eos = true; @@ -311,12 +322,14 @@ Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block, auto& shared_state = *_shared_state; for (size_t i = 0; i < shared_state.build_exprs_size; ++i) { const auto* column = block.get_by_position(res_col_ids[i]).column.get(); - if (!column->is_nullable() && shared_state.serialize_null_into_key[i]) { + if (!column->is_nullable() && + _parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) { _key_columns_holder.emplace_back( vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column)); _probe_columns[i] = _key_columns_holder.back().get(); } else if (const auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column); - nullable && !shared_state.serialize_null_into_key[i]) { + nullable && + !_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) { // update nulllmap and split nested out of ColumnNullable when serialize_null_into_key is false and column is nullable const auto& col_nested = nullable->get_nested_column(); const auto& col_nullmap = nullable->get_null_map_data(); @@ -429,6 +442,27 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) vectorized::VExprContextSPtr ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, ctx)); _probe_expr_ctxs.push_back(ctx); + + /// null safe equal means null = null is true, the operator in SQL should be: <=>. + const bool is_null_safe_equal = + eq_join_conjunct.__isset.opcode && + (eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL) && + // For a null safe equal join, FE may generate a plan that + // both sides of the conjuct are not nullable, we just treat it + // as a normal equal join conjunct. + (eq_join_conjunct.right.nodes[0].is_nullable || + eq_join_conjunct.left.nodes[0].is_nullable); + + if (eq_join_conjuncts.size() == 1) { + // single column key serialize method must use nullmap for represent null to instead serialize null into key + _serialize_null_into_key.emplace_back(false); + } else if (is_null_safe_equal) { + // use serialize null into key to represent multi column null value + _serialize_null_into_key.emplace_back(true); + } else { + // on normal conditions, because null!=null, it can be expressed directly with nullmap. + _serialize_null_into_key.emplace_back(false); + } } if (tnode.hash_join_node.__isset.other_join_conjuncts && diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index eaa0b9376ca..3914fc0d58d 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -106,7 +106,7 @@ private: std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>(); - ssize_t _estimated_mem_in_push = -1; + int _task_idx; RuntimeProfile::Counter* _probe_expr_call_timer = nullptr; RuntimeProfile::Counter* _probe_side_output_timer = nullptr; @@ -142,6 +142,7 @@ public: _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs)); } + bool is_broadcast_join() const { return _is_broadcast_join; } bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; @@ -172,6 +173,8 @@ private: vectorized::VExprContextSPtrs _other_join_conjuncts; vectorized::VExprContextSPtrs _mark_join_conjuncts; + // mark the build hash table whether it needs to store null value + std::vector<bool> _serialize_null_into_key; // probe expr vectorized::VExprContextSPtrs _probe_expr_ctxs; diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp index d0abe6aa0d2..c213616e25c 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp @@ -28,7 +28,8 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc, int operator_id, const std::vector<TExpr>& t_output_expr) - : DataSinkOperatorX(operator_id, 0, 0), + : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(), + std::numeric_limits<int>::max()), _row_desc(row_desc), _t_output_expr(t_output_expr) {} diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index 86afd607432..fe67703bfe9 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -66,7 +66,8 @@ Status MemoryScratchSinkLocalState::close(RuntimeState* state, Status exec_statu MemoryScratchSinkOperatorX::MemoryScratchSinkOperatorX(const RowDescriptor& row_desc, int operator_id, const std::vector<TExpr>& t_output_expr) - : DataSinkOperatorX(operator_id, 0, 0), + : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(), + std::numeric_limits<int>::max()), _row_desc(row_desc), _t_output_expr(t_output_expr) {} diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index a82207c92c1..1f0d40069ed 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -470,14 +470,18 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState info.parent_profile->add_child(_runtime_profile.get(), /*indent=*/false, nullptr); constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>; if constexpr (!is_fake_shared) { - if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) { - DCHECK(info.le_state_map.find(_parent->operator_id()) != info.le_state_map.end()); - _shared_state = info.le_state_map.at(_parent->operator_id()).first.get(); + if (info.shared_state_map.find(_parent->operator_id()) != info.shared_state_map.end()) { + _shared_state = info.shared_state_map.at(_parent->operator_id()) + .first.get() + ->template cast<SharedStateArg>(); _dependency = _shared_state->get_dep_by_channel_id(info.task_idx).front().get(); _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); } else if (info.shared_state) { + if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) { + DCHECK(false); + } // For UnionSourceOperator without children, there is no shared state. _shared_state = info.shared_state->template cast<SharedStateArg>(); @@ -485,6 +489,10 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState _parent->operator_id(), _parent->node_id(), _parent->get_name()); _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); + } else { + if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) { + DCHECK(false); + } } } @@ -543,11 +551,21 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink _wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency"); constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>; if constexpr (!is_fake_shared) { - if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) { - DCHECK(info.le_state_map.find(_parent->dests_id().front()) != info.le_state_map.end()); - _dependency = info.le_state_map.at(_parent->dests_id().front()).second.get(); + if (info.shared_state_map.find(_parent->dests_id().front()) != + info.shared_state_map.end()) { + if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) { + DCHECK(info.shared_state_map.at(_parent->dests_id().front()).second.size() == 1); + } + _dependency = info.shared_state_map.at(_parent->dests_id().front()) + .second[std::is_same_v<LocalExchangeSharedState, SharedState> + ? 0 + : info.task_idx] + .get(); _shared_state = _dependency->shared_state()->template cast<SharedState>(); } else { + if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) { + DCHECK(false); + } _shared_state = info.shared_state->template cast<SharedState>(); _dependency = _shared_state->create_sink_dependency( _parent->dests_id().front(), _parent->node_id(), _parent->get_name()); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index fa8849640b6..ad982e509f2 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -69,8 +69,8 @@ struct LocalStateInfo { RuntimeProfile* parent_profile = nullptr; const std::vector<TScanRangeParams>& scan_ranges; BasicSharedState* shared_state; - const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, - std::shared_ptr<Dependency>>>& le_state_map; + const std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>>& shared_state_map; const int task_idx; }; @@ -80,8 +80,8 @@ struct LocalSinkStateInfo { RuntimeProfile* parent_profile = nullptr; const int sender_id; BasicSharedState* shared_state; - const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, - std::shared_ptr<Dependency>>>& le_state_map; + const std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>>& shared_state_map; const TDataSink& tsink; }; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index c6a024d99a3..79b74d4c313 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -180,7 +180,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( if (auto* tmp_sink_state = _shared_state->inner_runtime_state->get_sink_local_state()) { inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state); } - _shared_state->inner_shared_state->hash_table_variants.reset(); + DCHECK_EQ(_shared_state->inner_shared_state->hash_table_variant_vector.size(), 1); + _shared_state->inner_shared_state->hash_table_variant_vector.front().reset(); if (inner_sink_state) { COUNTER_UPDATE(_memory_used_counter, -(inner_sink_state->_hash_table_memory_usage->value() + diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index de2e2922b26..0e2667c752f 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -36,7 +36,8 @@ ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const RowDescriptor& row_desc, const std::vector<TExpr>& t_output_expr) - : DataSinkOperatorX(operator_id, 0, 0), + : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(), + std::numeric_limits<int>::max()), _row_desc(row_desc), _t_output_expr(t_output_expr) {} @@ -44,7 +45,8 @@ ResultFileSinkOperatorX::ResultFileSinkOperatorX( int operator_id, const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector<TPlanFragmentDestination>& destinations, const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs) - : DataSinkOperatorX(operator_id, 0, 0), + : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(), + std::numeric_limits<int>::max()), _row_desc(row_desc), _t_output_expr(t_output_expr), _dests(destinations), diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 9f3647b93fb..6ad81a2673f 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -101,7 +101,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) { ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc, const std::vector<TExpr>& t_output_expr, const TResultSink& sink) - : DataSinkOperatorX(operator_id, 0, 0), + : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(), + std::numeric_limits<int>::max()), _sink_type(!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL ? TResultSinkType::MYSQL_PROTOCAL : sink.type), diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e62addd95ad..b782474a3a8 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -159,7 +159,7 @@ PipelineFragmentContext::~PipelineFragmentContext() { _runtime_state.reset(); _runtime_filter_states.clear(); _runtime_filter_mgr_map.clear(); - _op_id_to_le_state.clear(); + _op_id_to_shared_state.clear(); } bool PipelineFragmentContext::is_timeout(timespec now) const { @@ -381,23 +381,26 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag std::make_unique<RuntimeFilterMgr>(request.query_id, _runtime_filter_states[i], _query_ctx->query_mem_tracker(), false); std::map<PipelineId, PipelineTask*> pipeline_id_to_task; - auto get_local_exchange_state = [&](PipelinePtr pipeline) - -> std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, - std::shared_ptr<Dependency>>> { - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, - std::shared_ptr<Dependency>>> - le_state_map; - auto source_id = pipeline->operators().front()->operator_id(); - if (auto iter = _op_id_to_le_state.find(source_id); iter != _op_id_to_le_state.end()) { - le_state_map.insert({source_id, iter->second}); + auto get_shared_state = [&](PipelinePtr pipeline) + -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> { + std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; + for (auto& op : pipeline->operators()) { + auto source_id = op->operator_id(); + if (auto iter = _op_id_to_shared_state.find(source_id); + iter != _op_id_to_shared_state.end()) { + shared_state_map.insert({source_id, iter->second}); + } } for (auto sink_to_source_id : pipeline->sink()->dests_id()) { - if (auto iter = _op_id_to_le_state.find(sink_to_source_id); - iter != _op_id_to_le_state.end()) { - le_state_map.insert({sink_to_source_id, iter->second}); + if (auto iter = _op_id_to_shared_state.find(sink_to_source_id); + iter != _op_id_to_shared_state.end()) { + shared_state_map.insert({sink_to_source_id, iter->second}); } } - return le_state_map; + return shared_state_map; }; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { @@ -449,10 +452,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); task_runtime_state->set_task_num(pipeline->num_tasks()); - auto task = std::make_unique<PipelineTask>(pipeline, cur_task_id, - task_runtime_state.get(), this, - pipeline_id_to_profile[pip_idx].get(), - get_local_exchange_state(pipeline), i); + auto task = std::make_unique<PipelineTask>( + pipeline, cur_task_id, task_runtime_state.get(), this, + pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline), i); pipeline->incr_created_tasks(i, task.get()); task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); @@ -552,7 +554,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag } } _pipeline_parent_map.clear(); - _op_id_to_le_state.clear(); + _op_id_to_shared_state.clear(); return Status::OK(); } @@ -821,11 +823,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl( return Status::InternalError("Unsupported local exchange type : " + std::to_string((int)data_distribution.distribution_type)); } - auto sink_dep = std::make_shared<Dependency>(sink_id, local_exchange_id, - "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); - sink_dep->set_shared_state(shared_state.get()); - shared_state->sink_deps.push_back(sink_dep); - _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}}); + shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id, + "LOCAL_EXCHANGE_OPERATOR"); + shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK"); + _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}}); // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink]. @@ -848,8 +849,6 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } operators.insert(operators.begin(), source_op); - shared_state->create_dependencies(local_exchange_id); - // 5. Set children for two pipelines separately. std::vector<std::shared_ptr<Pipeline>> new_children; std::vector<PipelineId> edges_with_source; @@ -1431,6 +1430,20 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } + if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) { + std::shared_ptr<HashJoinSharedState> shared_state = + HashJoinSharedState::create_shared(_num_instances); + for (int i = 0; i < _num_instances; i++) { + auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(), + "HASH_JOIN_BUILD_DEPENDENCY"); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + } + shared_state->create_source_dependencies(_num_instances, op->operator_id(), + op->node_id(), "HASH_JOIN_PROBE"); + _op_id_to_shared_state.insert( + {op->operator_id(), {shared_state, shared_state->sink_deps}}); + } _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); break; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 992f86bc76d..587a1989fa7 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -279,8 +279,22 @@ private: int _operator_id = 0; int _sink_operator_id = 0; - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - _op_id_to_le_state; + /** + * Some states are shared by tasks in different instances (e.g. local exchange , broadcast join). + * + * local exchange sink 0 -> -> local exchange source 0 + * LocalExchangeSharedState + * local exchange sink 1 -> -> local exchange source 1 + * + * hash join build sink 0 -> -> hash join build source 0 + * HashJoinSharedState + * hash join build sink 1 -> -> hash join build source 1 + * + * So we should keep states here. + */ + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + _op_id_to_shared_state; std::map<PipelineId, Pipeline*> _pip_id_to_pipeline; std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 0db8a26efca..bfcfa03070e 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -53,13 +53,13 @@ class RuntimeState; namespace doris::pipeline { -PipelineTask::PipelineTask( - PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, - PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - std::map<int, - std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map, - int task_idx) +PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, + PipelineFragmentContext* fragment_context, + RuntimeProfile* parent_profile, + std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> + shared_state_map, + int task_idx) : _index(task_id), _pipeline(pipeline), _opened(false), @@ -70,7 +70,7 @@ PipelineTask::PipelineTask( _source(_operators.front().get()), _root(_operators.back().get()), _sink(pipeline->sink_shared_pointer()), - _le_state_map(std::move(le_state_map)), + _shared_state_map(std::move(shared_state_map)), _task_idx(task_idx), _execution_dep(state->get_query_ctx()->get_execution_dependency()), _memory_sufficient_dependency( @@ -96,9 +96,9 @@ Status PipelineTask::prepare(const std::vector<TScanRangeParams>& scan_range, co }); { // set sink local state - LocalSinkStateInfo info {_task_idx, _task_profile.get(), - sender_id, get_sink_shared_state().get(), - _le_state_map, tsink}; + LocalSinkStateInfo info {_task_idx, _task_profile.get(), + sender_id, get_sink_shared_state().get(), + _shared_state_map, tsink}; RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); } @@ -108,7 +108,7 @@ Status PipelineTask::prepare(const std::vector<TScanRangeParams>& scan_range, co for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; LocalStateInfo info {parent_profile, _scan_ranges, get_op_shared_state(op->operator_id()), - _le_state_map, _task_idx}; + _shared_state_map, _task_idx}; RETURN_IF_ERROR(op->setup_local_state(_state, info)); parent_profile = _state->get_local_state(op->operator_id())->profile(); } @@ -604,7 +604,7 @@ Status PipelineTask::finalize() { RETURN_IF_ERROR(_state_transition(State::FINALIZED)); _sink_shared_state.reset(); _op_shared_states.clear(); - _le_state_map.clear(); + _shared_state_map.clear(); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 0c39f09c90b..a4fd3b4eac9 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -48,9 +48,9 @@ class PipelineTask { public: PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, - std::shared_ptr<Dependency>>> - le_state_map, + std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> + shared_state_map, int task_idx); Status prepare(const std::vector<TScanRangeParams>& scan_range, const int sender_id, @@ -284,8 +284,9 @@ private: std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states; std::shared_ptr<BasicSharedState> _sink_shared_state; std::vector<TScanRangeParams> _scan_ranges; - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - _le_state_map; + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + _shared_state_map; int _task_idx; bool _dry_run = false; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9b0cbb4665c..233175005d4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -90,7 +90,6 @@ #include "util/threadpool.h" #include "util/thrift_util.h" #include "util/uid_util.h" -#include "vec/runtime/shared_hash_table_controller.h" namespace doris { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index fb6eee2bfd6..76fd17e7d94 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -125,7 +125,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, _init_resource_context(); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker()); _query_watcher.start(); - _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", false); _memory_sufficient_dependency = @@ -264,7 +263,6 @@ QueryContext::~QueryContext() { #endif _runtime_filter_mgr.reset(); _execution_dependency.reset(); - _shared_hash_table_controller.reset(); _runtime_predicates.clear(); file_scan_range_params_map.clear(); obj_pool.clear(); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 84128875176..53f008438ec 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -41,7 +41,6 @@ #include "util/hash_util.hpp" #include "util/threadpool.h" #include "vec/exec/scan/scanner_scheduler.h" -#include "vec/runtime/shared_hash_table_controller.h" #include "workload_group/workload_group.h" namespace doris { @@ -185,10 +184,6 @@ public: void set_ready_to_execute_only(); - std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() { - return _shared_hash_table_controller; - } - bool has_runtime_predicate(int source_node_id) { return _runtime_predicates.contains(source_node_id); } @@ -414,7 +409,6 @@ private: void _init_resource_context(); void _init_query_mem_tracker(); - std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller; std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates; std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr; diff --git a/be/src/runtime_filter/runtime_filter.h b/be/src/runtime_filter/runtime_filter.h index 0a66bc02ba0..06b12a69835 100644 --- a/be/src/runtime_filter/runtime_filter.h +++ b/be/src/runtime_filter/runtime_filter.h @@ -82,6 +82,8 @@ public: } virtual std::string debug_string() const = 0; + std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; } + void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper = wrapper; } protected: RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc) diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index bfb9239a0b4..7a56d97189d 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -22,7 +22,6 @@ #include "pipeline/dependency.h" #include "runtime/query_context.h" #include "runtime_filter/runtime_filter.h" -#include "vec/runtime/shared_hash_table_controller.h" namespace doris { #include "common/compile_check_begin.h" @@ -109,15 +108,6 @@ public: } } - void copy_to_shared_context(const vectorized::SharedHashTableContextPtr& context) { - DCHECK(!context->runtime_filters.contains(_wrapper->filter_id())); - context->runtime_filters[_wrapper->filter_id()] = _wrapper; - } - void copy_from_shared_context(const vectorized::SharedHashTableContextPtr& context) { - DCHECK(context->runtime_filters.contains(_wrapper->filter_id())); - _wrapper = context->runtime_filters[_wrapper->filter_id()]; - } - bool set_state(State state) { std::unique_lock<std::mutex> l(_mtx); if (_rf_state == State::PUBLISHED || diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp b/be/src/runtime_filter/runtime_filter_producer_helper.cpp index 24861b61b92..b9f3caca364 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp +++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp @@ -85,8 +85,8 @@ Status RuntimeFilterProducerHelper::_publish(RuntimeState* state) { } Status RuntimeFilterProducerHelper::process( - RuntimeState* state, const vectorized::Block* block, - const vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) { + RuntimeState* state, const vectorized::Block* block, bool use_shared_table, + std::map<int, std::shared_ptr<RuntimeFilterWrapper>>& runtime_filters) { if (_skip_runtime_filters_process) { return Status::OK(); } @@ -106,12 +106,14 @@ Status RuntimeFilterProducerHelper::process( } for (const auto& filter : _producers) { - if (shared_hash_table_ctx && !wake_up_early) { + if (use_shared_table && !wake_up_early) { DCHECK(_is_broadcast_join); if (_should_build_hash_table) { - filter->copy_to_shared_context(shared_hash_table_ctx); + DCHECK(!runtime_filters.contains(filter->wrapper()->filter_id())); + runtime_filters[filter->wrapper()->filter_id()] = filter->wrapper(); } else { - filter->copy_from_shared_context(shared_hash_table_ctx); + DCHECK(runtime_filters.contains(filter->wrapper()->filter_id())); + filter->set_wrapper(runtime_filters[filter->wrapper()->filter_id()]); } } filter->set_wrapper_state_and_ready_to_publish(wrapper_state); diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h b/be/src/runtime_filter/runtime_filter_producer_helper.h index 3a703c9f591..fc8944ea3cb 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper.h @@ -25,13 +25,12 @@ #include "runtime_filter/runtime_filter_producer.h" #include "vec/core/block.h" // IWYU pragma: keep #include "vec/exprs/vexpr_context.h" -#include "vec/runtime/shared_hash_table_controller.h" namespace doris { #include "common/compile_check_begin.h" // this class used in hash join node /** - * init -> (skip_runtime_filters ->) send_filter_size -> process + * init -> (skip_process ->) send_filter_size -> (share_filters ->) process */ class RuntimeFilterProducerHelper { public: @@ -64,8 +63,8 @@ public: MOCK_FUNCTION Status skip_process(RuntimeState* state); // build rf's predicate and publish rf - Status process(RuntimeState* state, const vectorized::Block* block, - const vectorized::SharedHashTableContextPtr& shared_hash_table_ctx); + Status process(RuntimeState* state, const vectorized::Block* block, bool use_shared_table, + std::map<int, std::shared_ptr<RuntimeFilterWrapper>>& runtime_filters); protected: virtual void _init_expr(const vectorized::VExprContextSPtrs& build_expr_ctxs, diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp deleted file mode 100644 index 286f32bb38b..00000000000 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "shared_hash_table_controller.h" - -#include <glog/logging.h> -#include <runtime/runtime_state.h> -// IWYU pragma: no_include <bits/chrono.h> -#include <chrono> // IWYU pragma: keep -#include <utility> - -#include "pipeline/exec/hashjoin_build_sink.h" - -namespace doris::vectorized { -#include "common/compile_check_begin.h" - -void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int node_id) { - // Only need to set builder and consumers with pipeline engine enabled. - std::lock_guard<std::mutex> lock(_mutex); - DCHECK(_builder_fragment_ids.find(node_id) == _builder_fragment_ids.cend()); - _builder_fragment_ids.insert({node_id, builder}); -} - -SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) { - std::lock_guard<std::mutex> lock(_mutex); - if (!_shared_contexts.contains(my_node_id)) { - _shared_contexts.insert({my_node_id, std::make_shared<SharedHashTableContext>()}); - } - return _shared_contexts[my_node_id]; -} - -void SharedHashTableController::signal_finish(int my_node_id) { - std::lock_guard<std::mutex> lock(_mutex); - auto it = _shared_contexts.find(my_node_id); - if (it != _shared_contexts.cend()) { - it->second->signaled = true; - _shared_contexts.erase(it); - } - for (auto& dep : _dependencies[my_node_id]) { - dep->set_ready(); - } - for (auto& dep : _finish_dependencies[my_node_id]) { - dep->set_ready(); - } -} - -TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int my_node_id) { - std::lock_guard<std::mutex> lock(_mutex); - auto it = _builder_fragment_ids.find(my_node_id); - if (it == _builder_fragment_ids.cend()) { - return TUniqueId {}; - } - return it->second; -} - -} // namespace doris::vectorized diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h deleted file mode 100644 index 51f4cfda3b8..00000000000 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ /dev/null @@ -1,96 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <gen_cpp/Types_types.h> - -#include <condition_variable> -#include <map> -#include <memory> -#include <mutex> -#include <vector> - -#include "common/status.h" -#include "runtime_filter/runtime_filter_definitions.h" -#include "runtime_filter/runtime_filter_wrapper.h" -#include "vec/core/block.h" - -namespace doris { -#include "common/compile_check_begin.h" - -class RuntimeState; -class MinMaxFuncBase; -class HybridSetBase; -class BloomFilterFuncBase; -class BitmapFilterFuncBase; - -namespace pipeline { -class Dependency; -} -namespace vectorized { - -class Arena; - -struct SharedHashTableContext { - SharedHashTableContext() - : hash_table_variants(nullptr), block(std::make_shared<vectorized::Block>()) {} - - Status status; - std::shared_ptr<Arena> arena; - std::shared_ptr<void> hash_table_variants; - std::shared_ptr<Block> block; - std::shared_ptr<std::vector<uint32_t>> build_indexes_null; - std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters; - std::atomic<bool> signaled = false; - bool short_circuit_for_null_in_probe_side = false; -}; - -using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>; - -class SharedHashTableController { -public: - /// set hash table builder's fragment instance id and consumers' fragment instance id - void set_builder_and_consumers(TUniqueId builder, int node_id); - TUniqueId get_builder_fragment_instance_id(int my_node_id); - SharedHashTableContextPtr get_context(int my_node_id); - void signal_finish(int my_node_id); - void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> dep, - std::shared_ptr<pipeline::Dependency> finish_dep) { - std::lock_guard<std::mutex> lock(_mutex); - if (!_dependencies.contains(node_id)) { - _dependencies.insert({node_id, {}}); - _finish_dependencies.insert({node_id, {}}); - } - _dependencies[node_id].push_back(dep); - _finish_dependencies[node_id].push_back(finish_dep); - } - -private: - std::mutex _mutex; - // For pipelineX, we update all dependencies once hash table is built; - std::map<int /*node id*/, std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies; - std::map<int /*node id*/, std::vector<std::shared_ptr<pipeline::Dependency>>> - _finish_dependencies; - std::map<int /*node id*/, TUniqueId /*fragment instance id*/> _builder_fragment_ids; - std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts; -}; - -} // namespace vectorized -} // namespace doris - -#include "common/compile_check_end.h" diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index 459a61d4b76..2229ee73cf7 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -282,10 +282,11 @@ void WalManagerTest::init() { auto local_state = pipeline::FileScanLocalState::create_unique(&_runtime_state, _scan_node.get()); std::vector<TScanRangeParams> scan_ranges; - std::map<int, std::pair<std::shared_ptr<pipeline::LocalExchangeSharedState>, - std::shared_ptr<pipeline::Dependency>>> - le_state_map; - pipeline::LocalStateInfo info {&_global_profile, scan_ranges, nullptr, le_state_map, 0}; + pipeline::LocalStateInfo info {.parent_profile = &_global_profile, + .scan_ranges = scan_ranges, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; WARN_IF_ERROR(local_state->init(&_runtime_state, info), "fail to init local_state"); _runtime_state.emplace_local_state(_scan_node->operator_id(), std::move(local_state)); diff --git a/be/test/pipeline/local_exchanger_test.cpp b/be/test/pipeline/local_exchanger_test.cpp index 9a6c4acb9f3..d3a9b0e2d5d 100644 --- a/be/test/pipeline/local_exchanger_test.cpp +++ b/be/test/pipeline/local_exchanger_test.cpp @@ -95,7 +95,7 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); - shared_state->create_dependencies(0); + shared_state->create_source_dependencies(num_sources, 0, 0, "TEST"); auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get(); for (size_t i = 0; i < num_sink; i++) { @@ -338,7 +338,7 @@ TEST_F(LocalExchangerTest, PassthroughExchanger) { auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); - shared_state->create_dependencies(0); + shared_state->create_source_dependencies(num_sources, 0, 0, "TEST"); auto* exchanger = (PassthroughExchanger*)shared_state->exchanger.get(); for (size_t i = 0; i < num_sink; i++) { @@ -532,7 +532,7 @@ TEST_F(LocalExchangerTest, PassToOneExchanger) { auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); - shared_state->create_dependencies(0); + shared_state->create_source_dependencies(num_sources, 0, 0, "TEST"); auto* exchanger = (PassToOneExchanger*)shared_state->exchanger.get(); for (size_t i = 0; i < num_sink; i++) { @@ -734,7 +734,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) { auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); - shared_state->create_dependencies(0); + shared_state->create_source_dependencies(num_sources, 0, 0, "TEST"); auto* exchanger = (BroadcastExchanger*)shared_state->exchanger.get(); for (size_t i = 0; i < num_sink; i++) { @@ -931,7 +931,7 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) { auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); - shared_state->create_dependencies(0); + shared_state->create_source_dependencies(num_sources, 0, 0, "TEST"); auto* exchanger = (AdaptivePassthroughExchanger*)shared_state->exchanger.get(); for (size_t i = 0; i < num_sink; i++) { @@ -1140,7 +1140,7 @@ TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) { auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); - shared_state->create_dependencies(0); + shared_state->create_source_dependencies(num_sources, 0, 0, "TEST"); auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get(); auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF, diff --git a/be/test/pipeline/operator/agg_operator_test.cpp b/be/test/pipeline/operator/agg_operator_test.cpp index de9af15c1ba..1cb002c85d9 100644 --- a/be/test/pipeline/operator/agg_operator_test.cpp +++ b/be/test/pipeline/operator/agg_operator_test.cpp @@ -42,7 +42,7 @@ auto static init_sink_and_source(std::shared_ptr<AggSinkOperatorX> sink_op, .parent_profile = &ctx.profile, .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink {}}; EXPECT_TRUE(local_state->init(&ctx.state, info).ok()); ctx.state.emplace_sink_local_state(0, std::move(local_state)); @@ -54,7 +54,7 @@ auto static init_sink_and_source(std::shared_ptr<AggSinkOperatorX> sink_op, LocalStateInfo info {.parent_profile = &ctx.profile, .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; EXPECT_TRUE(local_state->init(&ctx.state, info).ok()); diff --git a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp index c1e92272739..1377ee03328 100644 --- a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp +++ b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp @@ -57,7 +57,7 @@ struct DistinctStreamingAggOperatorTest : public ::testing::Test { LocalStateInfo info {.parent_profile = &profile, .scan_ranges = {}, .shared_state = nullptr, - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; EXPECT_TRUE(local_state->init(state.get(), info)); state->resize_op_id_to_local_state(-100); diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp b/be/test/pipeline/operator/exchange_sink_operator_test.cpp index 945eab84ec5..aa3fc6f7877 100644 --- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp +++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp @@ -92,7 +92,7 @@ auto create_exchange_sink(std::vector<ChannelInfo> channel_info) { .parent_profile = &ctx->profile, .sender_id = 0, .shared_state = nullptr, - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink {}}; EXPECT_TRUE(local_state->init(&ctx->state, info).ok()); ctx->state.emplace_sink_local_state(0, std::move(local_state)); diff --git a/be/test/pipeline/operator/exchange_source_operator_test.cpp b/be/test/pipeline/operator/exchange_source_operator_test.cpp index db71bc6982a..709921ac48c 100644 --- a/be/test/pipeline/operator/exchange_source_operator_test.cpp +++ b/be/test/pipeline/operator/exchange_source_operator_test.cpp @@ -88,7 +88,7 @@ struct ExchangeSourceOperatorXTest : public ::testing::Test { LocalStateInfo info {.parent_profile = &profile, .scan_ranges = {}, .shared_state = nullptr, - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; auto st = local_state->init(state.get(), info); state->resize_op_id_to_local_state(-100); diff --git a/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp b/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp index 6264c1c52ab..ee653a950cf 100644 --- a/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp +++ b/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp @@ -66,7 +66,7 @@ struct LocalMergeSOrtSourceOperatorTest : public testing::Test { LocalStateInfo info {.parent_profile = &profile, .scan_ranges = {}, .shared_state = shared_states[i].get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = i}; EXPECT_TRUE(local_state->init(runtime_states[i].get(), info)); runtime_states[i]->resize_op_id_to_local_state(-100); diff --git a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp index ba24d77d5d7..c4b04260b4c 100644 --- a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp @@ -62,7 +62,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, Init) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -101,7 +101,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, Sink) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -152,7 +152,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithEmptyEOS) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -204,7 +204,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpill) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -274,7 +274,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillAndEmptyEOS) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -341,7 +341,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillLargeData) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -424,7 +424,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpilError) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); diff --git a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp index 8daff47ead5..67d49eb24c3 100644 --- a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp @@ -68,7 +68,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, Init) { .parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0, }; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -113,7 +113,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockEmpty) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -128,7 +128,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockEmpty) { .parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0, }; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -184,7 +184,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -218,7 +218,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) { .parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0, }; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -276,7 +276,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpill) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -323,7 +323,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpill) { .parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0, }; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -389,7 +389,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillError) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink()}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); @@ -436,7 +436,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillError) { .parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0, }; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp index 0d2a653c6c3..c8480cbb2ec 100644 --- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp +++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp @@ -181,10 +181,11 @@ PartitionedAggregationTestHelper::create_operators() { EXPECT_TRUE(sink_operator->set_child(child_operator)); // Setup task and state - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, runtime_state.get(), nullptr, - nullptr, le_state_map, 0); + nullptr, shared_state_map, 0); runtime_state->set_task(pipeline_task.get()); return {std::move(source_operator), std::move(sink_operator)}; } diff --git a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp index 9f2c3b48790..b9b1a1329df 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp @@ -67,9 +67,6 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, debug_string) { TEST_F(PartitionedHashJoinProbeOperatorTest, InitAndOpen) { auto [probe_operator, sink_operator] = _helper.create_operators(); - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; - auto local_state = PartitionedHashJoinProbeLocalState::create_shared( _helper.runtime_state.get(), probe_operator.get()); @@ -77,7 +74,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, InitAndOpen) { LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = le_state_map, + .shared_state_map = {}, .task_idx = 0}; auto st = local_state->init(_helper.runtime_state.get(), info); ASSERT_TRUE(st) << "init failed: " << st.to_string(); diff --git a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp index 29cb355b0a0..a9dae776a2d 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp @@ -118,12 +118,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, InitLocalState) { ASSERT_TRUE(st.ok()) << "Prepare failed: " << st.to_string(); RuntimeProfile runtime_profile("test"); - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; TDataSink t_sink; LocalSinkStateInfo info {.parent_profile = &runtime_profile, .shared_state = shared_state.get(), - .le_state_map = le_state_map, + .shared_state_map = {}, .tsink = t_sink}; st = local_state->init(_helper.runtime_state.get(), info); ASSERT_TRUE(st) << "init failed: " << st.to_string(); @@ -222,12 +220,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, SinkEosAndSpill) { auto shared_state = std::make_shared<MockPartitionedHashJoinSharedState>(); - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; LocalSinkStateInfo sink_info {.task_idx = 0, .parent_profile = _helper.runtime_profile.get(), .shared_state = shared_state.get(), - .le_state_map = le_state_map, + .shared_state_map = {}, .tsink = TDataSink()}; auto st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); ASSERT_TRUE(st.ok()) << "Setup local state failed: " << st.to_string(); diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp index 001bcd8e224..7d15e80fa67 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp @@ -154,10 +154,11 @@ PartitionedHashJoinTestHelper::create_operators() { sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); // Setup task and state - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; pipeline_task = std::make_shared<PipelineTask>(probe_pipeline, 0, runtime_state.get(), nullptr, - nullptr, le_state_map, 0); + nullptr, shared_state_map, 0); runtime_state->set_task(pipeline_task.get()); return {probe_operator, sink_operator}; } diff --git a/be/test/pipeline/operator/repeat_operator_test.cpp b/be/test/pipeline/operator/repeat_operator_test.cpp index a1c120e95f3..e49c7c452df 100644 --- a/be/test/pipeline/operator/repeat_operator_test.cpp +++ b/be/test/pipeline/operator/repeat_operator_test.cpp @@ -50,7 +50,7 @@ struct RepeatOperatorTest : public ::testing::Test { LocalStateInfo info {.parent_profile = &profile, .scan_ranges = {}, .shared_state = nullptr, - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; EXPECT_TRUE(local_state->init(state.get(), info)); state->resize_op_id_to_local_state(-100); diff --git a/be/test/pipeline/operator/set_operator_test.cpp b/be/test/pipeline/operator/set_operator_test.cpp index 78bc2c02a36..62675144ffa 100644 --- a/be/test/pipeline/operator/set_operator_test.cpp +++ b/be/test/pipeline/operator/set_operator_test.cpp @@ -78,7 +78,7 @@ struct SetOperatorTest : public ::testing::Test { LocalStateInfo info {.parent_profile = &profile, .scan_ranges = {}, .shared_state = shared_state_sptr.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; EXPECT_TRUE(source_local_state->init(state.get(), info)); state->resize_op_id_to_local_state(-100); @@ -91,7 +91,7 @@ struct SetOperatorTest : public ::testing::Test { .parent_profile = &profile, .sender_id = 0, .shared_state = shared_state_sptr.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink {}}; EXPECT_TRUE(sink_local_state->init(state.get(), info)); state->emplace_sink_local_state(sink_op->operator_id(), @@ -112,7 +112,7 @@ struct SetOperatorTest : public ::testing::Test { .parent_profile = &profile, .sender_id = 0, .shared_state = shared_state_sptr.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink {}}; EXPECT_TRUE(probe_sink_local_state[i]->init(states[i].get(), info)); states[i]->emplace_sink_local_state(probe_sink_ops[i]->operator_id(), diff --git a/be/test/pipeline/operator/sort_operator_test.cpp b/be/test/pipeline/operator/sort_operator_test.cpp index 1867f31dbf1..cd1a4c35d85 100644 --- a/be/test/pipeline/operator/sort_operator_test.cpp +++ b/be/test/pipeline/operator/sort_operator_test.cpp @@ -90,7 +90,7 @@ struct SortOperatorTest : public ::testing::Test { .parent_profile = &profile, .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink {}}; EXPECT_TRUE(sink_local_state_uptr->init(state.get(), info).ok()); state->emplace_sink_local_state(0, std::move(sink_local_state_uptr)); @@ -102,7 +102,7 @@ struct SortOperatorTest : public ::testing::Test { LocalStateInfo info {.parent_profile = &profile, .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; EXPECT_TRUE(source_local_state_uptr->init(state.get(), info).ok()); diff --git a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp index 75101ae4edb..ac5da984389 100644 --- a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp +++ b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp @@ -60,7 +60,7 @@ TEST_F(SpillSortSinkOperatorTest, Basic) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = {}}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -112,7 +112,7 @@ TEST_F(SpillSortSinkOperatorTest, Sink) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = {}}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -209,7 +209,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = {}}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -287,7 +287,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill2) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = {}}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -349,7 +349,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpillError) { .parent_profile = _helper.runtime_profile.get(), .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = {}}; st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); diff --git a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp index aa9340a7d9b..87e5c9d1bf5 100644 --- a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp +++ b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp @@ -62,7 +62,7 @@ TEST_F(SpillSortSourceOperatorTest, Basic) { LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -109,7 +109,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlock) { LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -186,7 +186,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) { LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -332,7 +332,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) { LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); @@ -482,7 +482,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError) { LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(), .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; st = source_operator->setup_local_state(_helper.runtime_state.get(), info); diff --git a/be/test/pipeline/operator/spill_sort_test_helper.cpp b/be/test/pipeline/operator/spill_sort_test_helper.cpp index ca93291d273..0e291343737 100644 --- a/be/test/pipeline/operator/spill_sort_test_helper.cpp +++ b/be/test/pipeline/operator/spill_sort_test_helper.cpp @@ -159,10 +159,11 @@ SpillSortTestHelper::create_operators() { EXPECT_TRUE(sink_operator->set_child(child_operator)); // Setup task and state - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, runtime_state.get(), nullptr, - nullptr, le_state_map, 0); + nullptr, shared_state_map, 0); runtime_state->set_task(pipeline_task.get()); return {std::move(source_operator), std::move(sink_operator)}; } diff --git a/be/test/pipeline/operator/union_operator_test.cpp b/be/test/pipeline/operator/union_operator_test.cpp index d655b7bf4ed..2c57d10b89e 100644 --- a/be/test/pipeline/operator/union_operator_test.cpp +++ b/be/test/pipeline/operator/union_operator_test.cpp @@ -96,7 +96,7 @@ TEST_F(UnionOperatorTest, test_all_const_expr) { LocalStateInfo info {.parent_profile = &profile, .scan_ranges = {}, .shared_state = nullptr, - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; EXPECT_TRUE(source_local_state->init(state.get(), info)); state->resize_op_id_to_local_state(-100); @@ -190,7 +190,7 @@ TEST_F(UnionOperatorTest, test_sink_and_source) { LocalStateInfo info {.parent_profile = &profile, .scan_ranges = {}, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .task_idx = 0}; EXPECT_TRUE(source_local_state->init(state.get(), info)); state->resize_op_id_to_local_state(-100); @@ -209,7 +209,7 @@ TEST_F(UnionOperatorTest, test_sink_and_source) { .parent_profile = &profile, .sender_id = 0, .shared_state = shared_state.get(), - .le_state_map = {}, + .shared_state_map = {}, .tsink = TDataSink {}}; EXPECT_TRUE(sink_local_state->init(sink_state[i].get(), info)); sink_state[i]->resize_op_id_to_local_state(-100); diff --git a/be/test/pipeline/pipeline_test.cpp b/be/test/pipeline/pipeline_test.cpp index 337b214b62f..1f9d345087a 100644 --- a/be/test/pipeline/pipeline_test.cpp +++ b/be/test/pipeline/pipeline_test.cpp @@ -272,12 +272,12 @@ TEST_F(PipelineTest, HAPPY_PATH) { _pipeline_profiles[cur_pipe->id()] = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(cur_pipe->id())); - std::map<int, - std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; + std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; auto task = std::make_unique<PipelineTask>( cur_pipe, task_id, local_runtime_state.get(), _context.back().get(), - _pipeline_profiles[cur_pipe->id()].get(), le_state_map, task_id); + _pipeline_profiles[cur_pipe->id()].get(), shared_state_map, task_id); cur_pipe->incr_created_tasks(task_id, task.get()); local_runtime_state->set_task(task.get()); task->set_task_queue(_task_queue.get()); @@ -936,12 +936,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { std::static_pointer_cast<TaskExecutionContext>(_context.back())); local_runtime_state->set_runtime_filter_mgr(_runtime_filter_mgrs[j].get()); _runtime_filter_mgrs[j]->_state->set_state(local_runtime_state.get()); - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, - std::shared_ptr<Dependency>>> - le_state_map; + std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; auto task = std::make_unique<PipelineTask>( _pipelines[i], task_id, local_runtime_state.get(), _context.back().get(), - _pipeline_profiles[_pipelines[i]->id()].get(), le_state_map, j); + _pipeline_profiles[_pipelines[i]->id()].get(), shared_state_map, j); _pipelines[i]->incr_created_tasks(j, task.get()); local_runtime_state->set_task(task.get()); task->set_task_queue(_task_queue.get()); diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp index af48faffcf4..63f10a69838 100644 --- a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp @@ -76,9 +76,9 @@ TEST_F(RuntimeFilterProducerHelperTest, basic) { column->insert(2); block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); - vectorized::SharedHashTableContextPtr shared_hash_table_ctx; + std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); + helper.process(_runtime_states[0].get(), &block, false, runtime_filters)); } TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) { @@ -101,10 +101,10 @@ TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) { column->insert(2); block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); - vectorized::SharedHashTableContextPtr shared_hash_table_ctx; _tasks[0]->set_wake_up_early(); + std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); + helper.process(_runtime_states[0].get(), &block, false, runtime_filters)); } TEST_F(RuntimeFilterProducerHelperTest, skip_process) { @@ -132,9 +132,9 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) { column->insert(2); block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); - vectorized::SharedHashTableContextPtr shared_hash_table_ctx; + std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); + helper.process(_runtime_states[0].get(), &block, false, runtime_filters)); } TEST_F(RuntimeFilterProducerHelperTest, broadcast) { @@ -156,16 +156,15 @@ TEST_F(RuntimeFilterProducerHelperTest, broadcast) { column->insert(2); block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); - vectorized::SharedHashTableContextPtr shared_hash_table_ctx = - std::make_shared<vectorized::SharedHashTableContext>(); + std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); + helper.process(_runtime_states[0].get(), &block, true, runtime_filters)); auto helper2 = RuntimeFilterProducerHelper(&_profile, false, true); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( helper2.init(_runtime_states[1].get(), build_expr_ctxs, runtime_filter_descs)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - helper2.process(_runtime_states[1].get(), &block, shared_hash_table_ctx)); + helper2.process(_runtime_states[1].get(), &block, true, runtime_filters)); } } // namespace doris diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp b/be/test/vec/exec/vfile_scanner_exception_test.cpp index 0a0dee944da..8fa37c26278 100644 --- a/be/test/vec/exec/vfile_scanner_exception_test.cpp +++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp @@ -252,10 +252,11 @@ void VfileScannerExceptionTest::init() { auto local_state = pipeline::FileScanLocalState::create_unique(&_runtime_state, _scan_node.get()); std::vector<TScanRangeParams> scan_ranges; - std::map<int, std::pair<std::shared_ptr<pipeline::LocalExchangeSharedState>, - std::shared_ptr<pipeline::Dependency>>> - le_state_map; - pipeline::LocalStateInfo info {&_global_profile, scan_ranges, nullptr, le_state_map, 0}; + pipeline::LocalStateInfo info {.parent_profile = &_global_profile, + .scan_ranges = scan_ranges, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; WARN_IF_ERROR(local_state->init(&_runtime_state, info), "fail to init local_state"); _runtime_state.emplace_local_state(_scan_node->operator_id(), std::move(local_state)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org