This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dae604b7964 [pipelineX](improvement) Adjust local exchange strategy (#29915) dae604b7964 is described below commit dae604b7964d038ae78336b89ce97b743e81e6e5 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sat Jan 13 03:46:41 2024 +0800 [pipelineX](improvement) Adjust local exchange strategy (#29915) --- .../exec/distinct_streaming_aggregation_sink_operator.h | 4 ++++ be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++++ be/src/pipeline/exec/hashjoin_probe_operator.h | 4 ++++ .../local_exchange/local_exchange_sink_operator.h | 15 +++++++++++++-- .../pipeline_x/local_exchange/local_exchanger.cpp | 3 +++ be/src/pipeline/pipeline_x/operator.h | 12 +++++++++--- .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 5 ++++- 7 files changed, 41 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index d62178460ea..6607516d6cb 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -110,6 +110,10 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + + DataDistribution required_data_distribution() const override { + return DataSinkOperatorX<DistinctStreamingAggSinkLocalState>::required_data_distribution(); + } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 3c1b772b30a..8420719330c 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -165,6 +165,10 @@ public: : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } + bool is_shuffled_hash_join() const override { + return _join_distribution == TJoinDistributionType::PARTITIONED; + } + private: friend class HashJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 093884b6d0f..ac7954af13b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -172,6 +172,10 @@ public: : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs)); } + bool is_shuffled_hash_join() const override { + return _join_distribution == TJoinDistributionType::PARTITIONED; + } + private: Status _do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs, RuntimeProfile::Counter& expr_call_timer, diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index daf75c966af..7275e545205 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -102,10 +102,21 @@ public: return Status::InternalError("{} should not init with TPlanNode", Base::_name); } - Status init(ExchangeType type, int num_buckets) override { + Status init(ExchangeType type, const int num_buckets, + const bool is_shuffled_hash_join) override { _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; if (_type == ExchangeType::HASH_SHUFFLE) { + // For shuffle join, if data distribution has been broken by previous operator, we + // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, + // we should use map shuffle idx to instance idx because all instances will be + // distributed to all BEs. Otherwise, we should use shuffle idx directly. + if (!is_shuffled_hash_join) { + _shuffle_idx_to_instance_idx.clear(); + for (int i = 0; i < _num_partitions; i++) { + _shuffle_idx_to_instance_idx.insert({i, i}); + } + } _partitioner.reset( new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions)); RETURN_IF_ERROR(_partitioner->init(_texprs)); @@ -145,7 +156,7 @@ private: const std::vector<TExpr>& _texprs; std::unique_ptr<vectorized::PartitionerBase> _partitioner; const std::map<int, int> _bucket_seq_to_instance_idx; - const std::map<int, int> _shuffle_idx_to_instance_idx; + std::map<int, int> _shuffle_idx_to_instance_idx; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 602020c4882..900e31e6631 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -121,6 +121,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>() ._shuffle_idx_to_instance_idx; for (size_t i = 0; i < _num_partitions; i++) { + DCHECK(map.contains(i)) << " i: " << i << " _num_partitions: " << _num_partitions + << " map.size(): " << map.size(); + DCHECK(map[i] >= 0 && map[i] < _num_partitions) << map[i] << " " << _num_partitions; size_t start = local_state._partition_rows_histogram[i]; size_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 5304d0074f6..6792ce35f36 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -206,6 +206,8 @@ public: [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } + [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } + bool can_read() override { LOG(FATAL) << "should not reach here!"; return false; @@ -467,14 +469,16 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state); Status init(const TDataSink& tsink) override; - virtual Status init(ExchangeType type, int num_buckets) { + [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets, + const bool is_shuffled_hash_join) { return Status::InternalError("init() is only implemented in local exchange!"); } Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } - virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; + [[nodiscard]] virtual Status setup_local_state(RuntimeState* state, + LocalSinkStateInfo& info) = 0; template <class TARGET> TARGET& cast() { @@ -492,12 +496,14 @@ public: } virtual void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* ctx) = 0; - virtual DataDistribution required_data_distribution() const { + [[nodiscard]] virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); } + [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } + Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ffaccebe898..b479e1d9334 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -732,7 +732,10 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( sink_id, local_exchange_id, _total_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); RETURN_IF_ERROR(new_pip->set_sink(sink)); - RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets)); + RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, + operator_xs.size() > idx + ? operator_xs[idx]->is_shuffled_hash_join() + : cur_pipe->sink_x()->is_shuffled_hash_join())); // 2. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org