This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dae604b7964 [pipelineX](improvement) Adjust local exchange strategy 
(#29915)
dae604b7964 is described below

commit dae604b7964d038ae78336b89ce97b743e81e6e5
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Sat Jan 13 03:46:41 2024 +0800

    [pipelineX](improvement) Adjust local exchange strategy (#29915)
---
 .../exec/distinct_streaming_aggregation_sink_operator.h   |  4 ++++
 be/src/pipeline/exec/hashjoin_build_sink.h                |  4 ++++
 be/src/pipeline/exec/hashjoin_probe_operator.h            |  4 ++++
 .../local_exchange/local_exchange_sink_operator.h         | 15 +++++++++++++--
 .../pipeline_x/local_exchange/local_exchanger.cpp         |  3 +++
 be/src/pipeline/pipeline_x/operator.h                     | 12 +++++++++---
 .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp   |  5 ++++-
 7 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
index d62178460ea..6607516d6cb 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -110,6 +110,10 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
+
+    DataDistribution required_data_distribution() const override {
+        return 
DataSinkOperatorX<DistinctStreamingAggSinkLocalState>::required_data_distribution();
+    }
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 3c1b772b30a..8420719330c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -165,6 +165,10 @@ public:
                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
 
+    bool is_shuffled_hash_join() const override {
+        return _join_distribution == TJoinDistributionType::PARTITIONED;
+    }
+
 private:
     friend class HashJoinBuildSinkLocalState;
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 093884b6d0f..ac7954af13b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -172,6 +172,10 @@ public:
                                   : 
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
     }
 
+    bool is_shuffled_hash_join() const override {
+        return _join_distribution == TJoinDistributionType::PARTITIONED;
+    }
+
 private:
     Status _do_evaluate(vectorized::Block& block, 
vectorized::VExprContextSPtrs& exprs,
                         RuntimeProfile::Counter& expr_call_timer,
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index daf75c966af..7275e545205 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -102,10 +102,21 @@ public:
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
     }
 
-    Status init(ExchangeType type, int num_buckets) override {
+    Status init(ExchangeType type, const int num_buckets,
+                const bool is_shuffled_hash_join) override {
         _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + 
get_exchange_type_name(type) + ")";
         _type = type;
         if (_type == ExchangeType::HASH_SHUFFLE) {
+            // For shuffle join, if data distribution has been broken by 
previous operator, we
+            // should use a HASH_SHUFFLE local exchanger to shuffle data 
again. To be mentioned,
+            // we should use map shuffle idx to instance idx because all 
instances will be
+            // distributed to all BEs. Otherwise, we should use shuffle idx 
directly.
+            if (!is_shuffled_hash_join) {
+                _shuffle_idx_to_instance_idx.clear();
+                for (int i = 0; i < _num_partitions; i++) {
+                    _shuffle_idx_to_instance_idx.insert({i, i});
+                }
+            }
             _partitioner.reset(
                     new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
             RETURN_IF_ERROR(_partitioner->init(_texprs));
@@ -145,7 +156,7 @@ private:
     const std::vector<TExpr>& _texprs;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     const std::map<int, int> _bucket_seq_to_instance_idx;
-    const std::map<int, int> _shuffle_idx_to_instance_idx;
+    std::map<int, int> _shuffle_idx_to_instance_idx;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 602020c4882..900e31e6631 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -121,6 +121,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
         auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
                            ._shuffle_idx_to_instance_idx;
         for (size_t i = 0; i < _num_partitions; i++) {
+            DCHECK(map.contains(i)) << " i: " << i << " _num_partitions: " << 
_num_partitions
+                                    << " map.size(): " << map.size();
+            DCHECK(map[i] >= 0 && map[i] < _num_partitions) << map[i] << " " 
<< _num_partitions;
             size_t start = local_state._partition_rows_histogram[i];
             size_t size = local_state._partition_rows_histogram[i + 1] - start;
             if (size > 0) {
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 5304d0074f6..6792ce35f36 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -206,6 +206,8 @@ public:
 
     [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { 
return false; }
 
+    [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
+
     bool can_read() override {
         LOG(FATAL) << "should not reach here!";
         return false;
@@ -467,14 +469,16 @@ public:
     virtual Status init(const TPlanNode& tnode, RuntimeState* state);
 
     Status init(const TDataSink& tsink) override;
-    virtual Status init(ExchangeType type, int num_buckets) {
+    [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
+                                      const bool is_shuffled_hash_join) {
         return Status::InternalError("init() is only implemented in local 
exchange!");
     }
 
     Status prepare(RuntimeState* state) override { return Status::OK(); }
     Status open(RuntimeState* state) override { return Status::OK(); }
 
-    virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& 
info) = 0;
+    [[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
+                                                   LocalSinkStateInfo& info) = 
0;
 
     template <class TARGET>
     TARGET& cast() {
@@ -492,12 +496,14 @@ public:
     }
 
     virtual void get_dependency(std::vector<DependencySPtr>& dependency, 
QueryContext* ctx) = 0;
-    virtual DataDistribution required_data_distribution() const {
+    [[nodiscard]] virtual DataDistribution required_data_distribution() const {
         return _child_x && _child_x->ignore_data_distribution()
                        ? DataDistribution(ExchangeType::PASSTHROUGH)
                        : DataDistribution(ExchangeType::NOOP);
     }
 
+    [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
+
     Status close(RuntimeState* state) override {
         return Status::InternalError("Should not reach here!");
     }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index ffaccebe898..b479e1d9334 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -732,7 +732,10 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
             sink_id, local_exchange_id, _total_instances, 
data_distribution.partition_exprs,
             bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
     RETURN_IF_ERROR(new_pip->set_sink(sink));
-    
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, 
num_buckets));
+    
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, 
num_buckets,
+                                            operator_xs.size() > idx
+                                                    ? 
operator_xs[idx]->is_shuffled_hash_join()
+                                                    : 
cur_pipe->sink_x()->is_shuffled_hash_join()));
 
     // 2. Create and initialize LocalExchangeSharedState.
     auto shared_state = LocalExchangeSharedState::create_shared();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to