This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new de8442bbef2 [refactor](pipeline) Refactor local exchange planning 
(#42482)
de8442bbef2 is described below

commit de8442bbef2e4ee91d2815f8bf2bca8886bf3235
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Oct 29 14:04:20 2024 +0800

    [refactor](pipeline) Refactor local exchange planning (#42482)
---
 be/src/pipeline/exec/aggregation_sink_operator.h   |  1 -
 be/src/pipeline/exec/analytic_sink_operator.h      |  3 --
 be/src/pipeline/exec/datagen_operator.cpp          |  8 +--
 .../exec/distinct_streaming_aggregation_operator.h |  3 --
 be/src/pipeline/exec/exchange_source_operator.cpp  |  4 +-
 be/src/pipeline/exec/exchange_source_operator.h    |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |  7 +--
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  3 --
 .../exec/nested_loop_join_build_operator.h         |  4 +-
 be/src/pipeline/exec/operator.cpp                  | 10 +++-
 be/src/pipeline/exec/operator.h                    | 21 +-------
 .../exec/partitioned_aggregation_sink_operator.h   |  3 --
 .../exec/partitioned_hash_join_probe_operator.h    |  3 --
 .../exec/partitioned_hash_join_sink_operator.h     |  3 --
 be/src/pipeline/exec/scan_operator.cpp             |  6 ++-
 be/src/pipeline/exec/scan_operator.h               |  4 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |  2 -
 be/src/pipeline/exec/set_sink_operator.h           |  1 -
 be/src/pipeline/exec/sort_sink_operator.h          |  1 -
 .../local_exchange_source_operator.h               |  3 --
 be/src/pipeline/local_exchange/local_exchanger.cpp |  2 +-
 be/src/pipeline/local_exchange/local_exchanger.h   | 11 ++--
 be/src/pipeline/pipeline.cpp                       | 21 ++++----
 be/src/pipeline/pipeline.h                         |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 62 +++++-----------------
 be/src/pipeline/pipeline_fragment_context.h        |  9 ++--
 .../org/apache/doris/planner/PlanFragment.java     |  1 -
 .../java/org/apache/doris/qe/SessionVariable.java  |  4 +-
 gensrc/thrift/Planner.thrift                       |  4 --
 .../distribute/local_shuffle.groovy                |  2 +-
 30 files changed, 64 insertions(+), 146 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 8271f1451b4..9ff3de99b22 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -152,7 +152,6 @@ public:
                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
     bool require_data_distribution() const override { return _is_colocate; }
-    bool require_shuffled_data_distribution() const override { return 
!_probe_expr_ctxs.empty(); }
     size_t get_revocable_mem_size(RuntimeState* state) const;
 
     AggregatedDataVariants* get_agg_data(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 1a0a671cf9f..b35354107f6 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -88,9 +88,6 @@ public:
     }
 
     bool require_data_distribution() const override { return true; }
-    bool require_shuffled_data_distribution() const override {
-        return !_partition_by_eq_expr_ctxs.empty();
-    }
 
 private:
     Status _insert_range_column(vectorized::Block* block, const 
vectorized::VExprContextSPtr& expr,
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index faa6359e874..965092b7eef 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -36,7 +36,9 @@ DataGenSourceOperatorX::DataGenSourceOperatorX(ObjectPool* 
pool, const TPlanNode
         : OperatorX<DataGenLocalState>(pool, tnode, operator_id, descs),
           _tuple_id(tnode.data_gen_scan_node.tuple_id),
           _tuple_desc(nullptr),
-          _runtime_filter_descs(tnode.runtime_filters) {}
+          _runtime_filter_descs(tnode.runtime_filters) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(OperatorX<DataGenLocalState>::init(tnode, state));
@@ -87,8 +89,8 @@ Status DataGenLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     // TODO: use runtime filter to filte result block, maybe this node need 
derive from vscan_node.
     for (const auto& filter_desc : p._runtime_filter_descs) {
         std::shared_ptr<IRuntimeFilter> runtime_filter;
-        RETURN_IF_ERROR(state->register_consumer_runtime_filter(
-                filter_desc, p.ignore_data_distribution(), p.node_id(), 
&runtime_filter));
+        RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, 
p.is_serial_operator(),
+                                                                p.node_id(), 
&runtime_filter));
         runtime_filter->init_profile(_runtime_profile.get());
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 1f7a21190ad..4c5fcd5efa7 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -116,9 +116,6 @@ public:
     }
 
     bool require_data_distribution() const override { return _is_colocate; }
-    bool require_shuffled_data_distribution() const override {
-        return _needs_finalize || (!_probe_expr_ctxs.empty() && 
!_is_streaming_preagg);
-    }
 
 private:
     friend class DistinctStreamingAggLocalState;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 844e6decd64..c9eebc5d2e4 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -105,7 +105,9 @@ 
ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
                           std::vector<bool>(tnode.nullable_tuples.begin(),
                                             tnode.nullable_tuples.begin() +
                                                     
tnode.exchange_node.input_row_tuples.size())),
-          _offset(tnode.exchange_node.__isset.offset ? 
tnode.exchange_node.offset : 0) {}
+          _offset(tnode.exchange_node.__isset.offset ? 
tnode.exchange_node.offset : 0) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::init(tnode, state));
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 0fe3dcbb590..c8ef674d269 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -81,7 +81,7 @@ public:
     [[nodiscard]] bool is_merging() const { return _is_merging; }
 
     DataDistribution required_data_distribution() const override {
-        if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
+        if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
             return {ExchangeType::NOOP};
         }
         return _partition_type == TPartitionType::HASH_PARTITIONED
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 69aa6843b84..83755d7f730 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -130,8 +130,8 @@ public:
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
         } else if (_is_broadcast_join) {
-            return _child->ignore_data_distribution() ? 
DataDistribution(ExchangeType::PASS_TO_ONE)
-                                                      : 
DataDistribution(ExchangeType::NOOP);
+            return _child->is_serial_operator() ? 
DataDistribution(ExchangeType::PASS_TO_ONE)
+                                                : 
DataDistribution(ExchangeType::NOOP);
         }
         return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
                                _join_distribution == 
TJoinDistributionType::COLOCATE
@@ -139,9 +139,6 @@ public:
                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
 
-    bool require_shuffled_data_distribution() const override {
-        return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!_is_broadcast_join;
-    }
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 917c2692b44..7da7a3b238d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -152,9 +152,6 @@ public:
                                   : 
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
     }
 
-    bool require_shuffled_data_distribution() const override {
-        return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!_is_broadcast_join;
-    }
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index f2ca259754b..d6e72799f97 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -76,8 +76,8 @@ public:
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
         }
-        return _child->ignore_data_distribution() ? 
DataDistribution(ExchangeType::BROADCAST)
-                                                  : 
DataDistribution(ExchangeType::NOOP);
+        return _child->is_serial_operator() ? 
DataDistribution(ExchangeType::BROADCAST)
+                                            : 
DataDistribution(ExchangeType::NOOP);
     }
 
 private:
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 6e3099db748..fb2dd828c39 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -74,6 +74,7 @@
 #include "pipeline/exec/union_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
 #include "pipeline/local_exchange/local_exchange_source_operator.h"
+#include "pipeline/pipeline.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
 #include "util/string_util.h"
@@ -116,11 +117,16 @@ std::string 
PipelineXSinkLocalState<SharedStateArg>::name_suffix() {
     }() + ")";
 }
 
-DataDistribution DataSinkOperatorXBase::required_data_distribution() const {
-    return _child && _child->ignore_data_distribution()
+DataDistribution OperatorBase::required_data_distribution() const {
+    return _child && _child->is_serial_operator() && !is_source()
                    ? DataDistribution(ExchangeType::PASSTHROUGH)
                    : DataDistribution(ExchangeType::NOOP);
 }
+
+bool OperatorBase::require_shuffled_data_distribution() const {
+    return 
Pipeline::is_hash_exchange(required_data_distribution().distribution_type);
+}
+
 const RowDescriptor& OperatorBase::row_desc() const {
     return _child->row_desc();
 }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 5df0a19498f..2a2b3fdd3b9 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -118,7 +118,8 @@ public:
         _followed_by_shuffled_operator = followed_by_shuffled_operator;
     }
     [[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
-    [[nodiscard]] virtual bool require_shuffled_data_distribution() const { 
return false; }
+    [[nodiscard]] virtual DataDistribution required_data_distribution() const;
+    [[nodiscard]] virtual bool require_shuffled_data_distribution() const;
 
 protected:
     OperatorPtr _child = nullptr;
@@ -483,7 +484,6 @@ public:
     }
 
     [[nodiscard]] virtual std::shared_ptr<BasicSharedState> 
create_shared_state() const = 0;
-    [[nodiscard]] virtual DataDistribution required_data_distribution() const;
 
     Status close(RuntimeState* state) override {
         return Status::InternalError("Should not reach here!");
@@ -496,8 +496,6 @@ public:
 
     [[nodiscard]] bool is_sink() const override { return true; }
 
-    [[nodiscard]] bool is_source() const override { return false; }
-
     static Status close(RuntimeState* state, Status exec_status) {
         auto result = state->get_sink_local_state_result();
         if (!result) {
@@ -652,19 +650,7 @@ public:
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
-    [[nodiscard]] virtual DataDistribution required_data_distribution() const {
-        return _child && _child->ignore_data_distribution() && !is_source()
-                       ? DataDistribution(ExchangeType::PASSTHROUGH)
-                       : DataDistribution(ExchangeType::NOOP);
-    }
-    [[nodiscard]] virtual bool ignore_data_distribution() const {
-        return _child ? _child->ignore_data_distribution() : 
_ignore_data_distribution;
-    }
-    [[nodiscard]] bool ignore_data_hash_distribution() const {
-        return _child ? _child->ignore_data_hash_distribution() : 
_ignore_data_distribution;
-    }
     [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const 
{ return true; }
-    void set_ignore_data_distribution() { _ignore_data_distribution = true; }
 
     Status open(RuntimeState* state) override;
 
@@ -735,8 +721,6 @@ public:
 
     bool has_output_row_desc() const { return _output_row_descriptor != 
nullptr; }
 
-    [[nodiscard]] bool is_source() const override { return false; }
-
     [[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state,
                                                           vectorized::Block* 
block, bool* eos);
 
@@ -779,7 +763,6 @@ protected:
     uint32_t _debug_point_count = 0;
 
     std::string _op_name;
-    bool _ignore_data_distribution = false;
     int _parallel_tasks = 0;
 
     //_keep_origin is used to avoid copying during projection,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 6b3a74c83df..15f6b22387a 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -309,9 +309,6 @@ public:
     bool require_data_distribution() const override {
         return _agg_sink_operator->require_data_distribution();
     }
-    bool require_shuffled_data_distribution() const override {
-        return _agg_sink_operator->require_shuffled_data_distribution();
-    }
 
     Status set_child(OperatorPtr child) override {
         
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 3aab11f62d8..f8fc0780b6f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -165,9 +165,6 @@ public:
                                            _distribution_partition_exprs));
     }
 
-    bool require_shuffled_data_distribution() const override {
-        return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
-    }
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index c768d7518b9..8e89763b50a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -115,9 +115,6 @@ public:
                                           _distribution_partition_exprs);
     }
 
-    bool require_shuffled_data_distribution() const override {
-        return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
-    }
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 4f3c97bab71..be940e8c89c 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -73,7 +73,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<typename Derived::Parent>();
-    RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, 
p.ignore_data_distribution()));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, 
p.is_serial_operator()));
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
     init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), 
p.node_id(),
@@ -990,7 +990,7 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = vectorized::ScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            _scan_dependency, p.ignore_data_distribution());
+            _scan_dependency, p.is_serial_operator());
     return Status::OK();
 }
 
@@ -1145,6 +1145,8 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* 
pool, const TPlanNode&
         : OperatorX<LocalStateType>(pool, tnode, operator_id, descs),
           _runtime_filter_descs(tnode.runtime_filters),
           _parallel_tasks(parallel_tasks) {
+    OperatorX<LocalStateType>::_is_serial_operator =
+            tnode.__isset.is_serial_operator && tnode.is_serial_operator;
     if (tnode.__isset.push_down_count) {
         _push_down_count = tnode.push_down_count;
     }
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index bf650cb8495..e4f8a828c6e 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -383,8 +383,8 @@ public:
     TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
 
     DataDistribution required_data_distribution() const override {
-        if (OperatorX<LocalStateType>::ignore_data_distribution()) {
-            // `ignore_data_distribution()` returns true means we ignore the 
distribution.
+        if (OperatorX<LocalStateType>::is_serial_operator()) {
+            // `is_serial_operator()` returns true means we ignore the 
distribution.
             return {ExchangeType::NOOP};
         }
         return {ExchangeType::BUCKET_HASH_SHUFFLE};
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index ab53f5358c2..f320c8e89cd 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -96,8 +96,6 @@ public:
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
 
-    bool require_shuffled_data_distribution() const override { return true; }
-
     std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
 
 private:
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 65c33795e5d..8e3c264f267 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -94,7 +94,6 @@ public:
         return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
-    bool require_shuffled_data_distribution() const override { return true; }
 
 private:
     template <class HashTableContext, bool is_intersected>
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index 0829c38b40f..a5a24e37163 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -73,7 +73,6 @@ public:
             return {ExchangeType::NOOP};
         }
     }
-    bool require_shuffled_data_distribution() const override { return 
_is_analytic_sort; }
     bool require_data_distribution() const override { return _is_colocate; }
 
     size_t get_revocable_mem_size(RuntimeState* state) const;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index c0da5c8120c..3c706d50182 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -81,9 +81,6 @@ public:
 
     bool is_source() const override { return true; }
 
-    // If input data distribution is ignored by this fragment, this first 
local exchange source in this fragment will re-assign all data.
-    bool ignore_data_distribution() const override { return false; }
-
 private:
     friend class LocalExchangeSourceLocalState;
 
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index da27a39772d..c5f99ca5d6a 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -226,7 +226,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 new_block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
             }
         }
-    } else if (_num_senders != _num_sources || 
_ignore_source_data_distribution) {
+    } else if (_num_senders != _num_sources) {
         // In this branch, data just should be distributed equally into all 
instances.
         new_block_wrapper->ref(_num_partitions);
         for (size_t i = 0; i < _num_partitions; i++) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index b3731638cb3..bf052ac3b92 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -218,24 +218,21 @@ public:
 
 protected:
     ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                     bool ignore_source_data_distribution, int 
free_block_limit)
+                     int free_block_limit)
             : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
-                                          free_block_limit),
-              
_ignore_source_data_distribution(ignore_source_data_distribution) {
+                                          free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, LocalExchangeSinkLocalState& 
local_state);
-
-    const bool _ignore_source_data_distribution = false;
 };
 
 class BucketShuffleExchanger final : public ShuffleExchanger {
     ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
     BucketShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                           bool ignore_source_data_distribution, int 
free_block_limit)
+                           int free_block_limit)
             : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions,
-                               ignore_source_data_distribution, 
free_block_limit) {}
+                               free_block_limit) {}
     ~BucketShuffleExchanger() override = default;
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 5b93fbdf1f8..96da754daa5 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -39,6 +39,7 @@ bool Pipeline::need_to_local_exchange(const DataDistribution 
target_data_distrib
                     [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
         return false;
     }
+    // If all operators are serial and sink is not serial, we should improve 
parallelism for sink.
     if (std::all_of(_operators.begin(), _operators.end(),
                     [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
         if (!_sink->is_serial_operator()) {
@@ -46,21 +47,22 @@ bool Pipeline::need_to_local_exchange(const 
DataDistribution target_data_distrib
         }
     } else if (std::any_of(_operators.begin(), _operators.end(),
                            [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+        // If non-serial operators exist, we should improve parallelism for 
those.
         return true;
     }
 
     if (target_data_distribution.distribution_type != 
ExchangeType::BUCKET_HASH_SHUFFLE &&
         target_data_distribution.distribution_type != 
ExchangeType::HASH_SHUFFLE) {
+        // Always do local exchange if non-hash-partition exchanger is 
required.
+        // For example, `PASSTHROUGH` exchanger is always required to 
distribute data evenly.
         return true;
-    } else if (_operators.front()->ignore_data_hash_distribution()) {
-        if (_data_distribution.distribution_type == 
target_data_distribution.distribution_type &&
-            (_data_distribution.partition_exprs.empty() ||
-             target_data_distribution.partition_exprs.empty())) {
-            return true;
-        }
-        return _data_distribution.distribution_type != 
target_data_distribution.distribution_type &&
-               !(is_hash_exchange(_data_distribution.distribution_type) &&
-                 is_hash_exchange(target_data_distribution.distribution_type));
+    } else if (_operators.front()->is_serial_operator()) {
+        DCHECK(std::all_of(_operators.begin(), _operators.end(),
+                           [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); }) &&
+               _sink->is_serial_operator())
+                << debug_string();
+        // All operators and sink are serial in this path.
+        return false;
     } else {
         return _data_distribution.distribution_type != 
target_data_distribution.distribution_type &&
                !(is_hash_exchange(_data_distribution.distribution_type) &&
@@ -71,7 +73,6 @@ bool Pipeline::need_to_local_exchange(const DataDistribution 
target_data_distrib
 Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) {
     if (parallelism > 0 && op->is_serial_operator()) {
         set_num_tasks(parallelism);
-        op->set_ignore_data_distribution();
     }
     op->set_parallel_tasks(num_tasks());
     _operators.emplace_back(op);
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 9554537ca16..98e52ec5271 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -115,7 +115,7 @@ public:
     int num_tasks() const { return _num_tasks; }
     bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
 
-    std::string debug_string() {
+    std::string debug_string() const {
         fmt::memory_buffer debug_string_buffer;
         fmt::format_to(debug_string_buffer,
                        "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: 
{}]", _pipeline_id,
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index ef856da5135..bd45016adf5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -236,8 +236,6 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     if (request.__isset.query_options && 
request.query_options.__isset.execution_timeout) {
         _timeout = request.query_options.execution_timeout;
     }
-    _use_serial_source =
-            request.fragment.__isset.use_serial_source && 
request.fragment.use_serial_source;
 
     _fragment_level_profile = 
std::make_unique<RuntimeProfile>("PipelineContext");
     _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
@@ -704,6 +702,9 @@ Status 
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
             (followed_by_shuffled_operator || op->is_shuffled_operator()) &&
             require_shuffled_data_distribution;
 
+    if (num_children == 0) {
+        _use_serial_source = op->is_serial_operator();
+    }
     // rely on that tnodes is preorder of the plan
     for (int i = 0; i < num_children; i++) {
         ++*node_idx;
@@ -736,8 +737,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
         int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
         DataDistribution data_distribution, bool* do_local_exchange, int 
num_buckets,
         const std::map<int, int>& bucket_seq_to_instance_idx,
-        const std::map<int, int>& shuffle_idx_to_instance_idx,
-        const bool ignore_data_hash_distribution) {
+        const std::map<int, int>& shuffle_idx_to_instance_idx) {
     auto& operators = cur_pipe->operators();
     const auto downstream_pipeline_id = cur_pipe->id();
     auto local_exchange_id = next_operator_id();
@@ -785,7 +785,6 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     case ExchangeType::BUCKET_HASH_SHUFFLE:
         shared_state->exchanger = BucketShuffleExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances, num_buckets,
-                ignore_data_hash_distribution,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? cast_set<int>(
                                   
_runtime_state->query_options().local_exchange_free_blocks_limit)
@@ -922,8 +921,7 @@ Status PipelineFragmentContext::_add_local_exchange(
         int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr 
cur_pipe,
         DataDistribution data_distribution, bool* do_local_exchange, int 
num_buckets,
         const std::map<int, int>& bucket_seq_to_instance_idx,
-        const std::map<int, int>& shuffle_idx_to_instance_idx,
-        const bool ignore_data_distribution) {
+        const std::map<int, int>& shuffle_idx_to_instance_idx) {
     if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
         return Status::OK();
     }
@@ -938,7 +936,7 @@ Status PipelineFragmentContext::_add_local_exchange(
     auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
     RETURN_IF_ERROR(_add_local_exchange_impl(
             idx, pool, cur_pipe, new_pip, data_distribution, 
do_local_exchange, num_buckets,
-            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, 
ignore_data_distribution));
+            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
 
     CHECK(total_op_num + 1 == cur_pipe->operators().size() + 
new_pip->operators().size())
             << "total_op_num: " << total_op_num
@@ -952,7 +950,7 @@ Status PipelineFragmentContext::_add_local_exchange(
                 cast_set<int>(new_pip->operators().size()), pool, new_pip,
                 add_pipeline(new_pip, pip_idx + 2), 
DataDistribution(ExchangeType::PASSTHROUGH),
                 do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
-                shuffle_idx_to_instance_idx, ignore_data_distribution));
+                shuffle_idx_to_instance_idx));
     }
     return Status::OK();
 }
@@ -978,13 +976,8 @@ Status PipelineFragmentContext::_plan_local_exchange(
         // scan node. so here use `_num_instance` to replace the `num_buckets` 
to prevent dividing 0
         // still keep colocate plan after local shuffle
         RETURN_IF_ERROR(_plan_local_exchange(
-                
_pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution() ||
-                                num_buckets == 0
-                        ? _num_instances
-                        : num_buckets,
-                pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx,
-                shuffle_idx_to_instance_idx,
-                
_pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution()));
+                _use_serial_source || num_buckets == 0 ? _num_instances : 
num_buckets, pip_idx,
+                _pipelines[pip_idx], bucket_seq_to_instance_idx, 
shuffle_idx_to_instance_idx));
     }
     return Status::OK();
 }
@@ -992,8 +985,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
 Status PipelineFragmentContext::_plan_local_exchange(
         int num_buckets, int pip_idx, PipelinePtr pip,
         const std::map<int, int>& bucket_seq_to_instance_idx,
-        const std::map<int, int>& shuffle_idx_to_instance_idx,
-        const bool ignore_data_hash_distribution) {
+        const std::map<int, int>& shuffle_idx_to_instance_idx) {
     int idx = 1;
     bool do_local_exchange = false;
     do {
@@ -1005,8 +997,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
                 RETURN_IF_ERROR(_add_local_exchange(
                         pip_idx, idx, ops[idx]->node_id(), 
_runtime_state->obj_pool(), pip,
                         ops[idx]->required_data_distribution(), 
&do_local_exchange, num_buckets,
-                        bucket_seq_to_instance_idx, 
shuffle_idx_to_instance_idx,
-                        ignore_data_hash_distribution));
+                        bucket_seq_to_instance_idx, 
shuffle_idx_to_instance_idx));
             }
             if (do_local_exchange) {
                 // If local exchange is needed for current operator, we will 
split this pipeline to
@@ -1023,8 +1014,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
         RETURN_IF_ERROR(_add_local_exchange(
                 pip_idx, idx, pip->sink()->node_id(), 
_runtime_state->obj_pool(), pip,
                 pip->sink()->required_data_distribution(), &do_local_exchange, 
num_buckets,
-                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
-                ignore_data_hash_distribution));
+                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
     }
     return Status::OK();
 }
@@ -1215,10 +1205,6 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 enable_query_cache ? request.fragment.query_cache_param : 
TQueryCacheParam {}));
         RETURN_IF_ERROR(cur_pipe->add_operator(
                 op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
         break;
     }
     case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
@@ -1229,10 +1215,6 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), 
descs, _num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(
                 op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
         break;
     }
     case doris::TPlanNodeType::JDBC_SCAN_NODE: {
@@ -1245,20 +1227,12 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                     "Jdbc scan node is disabled, you can change be config 
enable_java_support "
                     "to true and restart be.");
         }
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
         break;
     }
     case doris::TPlanNodeType::FILE_SCAN_NODE: {
         op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(
                 op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
         break;
     }
     case TPlanNodeType::ES_SCAN_NODE:
@@ -1266,10 +1240,6 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(
                 op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
         break;
     }
     case TPlanNodeType::EXCHANGE_NODE: {
@@ -1278,10 +1248,6 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), 
descs, num_senders));
         RETURN_IF_ERROR(cur_pipe->add_operator(
                 op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
-        if (request.__isset.parallel_instances) {
-            op->set_ignore_data_distribution();
-            cur_pipe->set_num_tasks(request.parallel_instances);
-        }
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
@@ -1643,10 +1609,6 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
         RETURN_IF_ERROR(cur_pipe->add_operator(
                 op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
         break;
     }
     case TPlanNodeType::SCHEMA_SCAN_NODE: {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 6caa0e5c106..289f5c82365 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -153,22 +153,19 @@ private:
                                 const std::map<int, int>& 
shuffle_idx_to_instance_idx);
     Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
                                 const std::map<int, int>& 
bucket_seq_to_instance_idx,
-                                const std::map<int, int>& 
shuffle_idx_to_instance_idx,
-                                const bool ignore_data_distribution);
+                                const std::map<int, int>& 
shuffle_idx_to_instance_idx);
     void _inherit_pipeline_properties(const DataDistribution& 
data_distribution,
                                       PipelinePtr pipe_with_source, 
PipelinePtr pipe_with_sink);
     Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* 
pool,
                                PipelinePtr cur_pipe, DataDistribution 
data_distribution,
                                bool* do_local_exchange, int num_buckets,
                                const std::map<int, int>& 
bucket_seq_to_instance_idx,
-                               const std::map<int, int>& 
shuffle_idx_to_instance_idx,
-                               const bool ignore_data_distribution);
+                               const std::map<int, int>& 
shuffle_idx_to_instance_idx);
     Status _add_local_exchange_impl(int idx, ObjectPool* pool, PipelinePtr 
cur_pipe,
                                     PipelinePtr new_pip, DataDistribution 
data_distribution,
                                     bool* do_local_exchange, int num_buckets,
                                     const std::map<int, int>& 
bucket_seq_to_instance_idx,
-                                    const std::map<int, int>& 
shuffle_idx_to_instance_idx,
-                                    const bool ignore_data_hash_distribution);
+                                    const std::map<int, int>& 
shuffle_idx_to_instance_idx);
 
     Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
                                  ThreadPool* thread_pool);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index c5a6ec55f63..0ebd023ed41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -342,7 +342,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         // TODO chenhao , calculated by cost
         result.setMinReservationBytes(0);
         result.setInitialReservationTotalClaims(0);
-        result.setUseSerialSource(useSerialSource(ConnectContext.get()));
         return result;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 0c755b9aae9..52ea334a142 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4351,7 +4351,7 @@ public class SessionVariable implements Serializable, 
Writable {
     }
 
     public boolean isIgnoreStorageDataDistribution() {
-        return ignoreStorageDataDistribution && enableLocalShuffle;
+        return ignoreStorageDataDistribution && enableLocalShuffle && 
enableNereidsPlanner;
     }
 
     public void setIgnoreStorageDataDistribution(boolean 
ignoreStorageDataDistribution) {
@@ -4389,7 +4389,7 @@ public class SessionVariable implements Serializable, 
Writable {
     }
 
     public boolean isForceToLocalShuffle() {
-        return enableLocalShuffle && forceToLocalShuffle;
+        return enableLocalShuffle && forceToLocalShuffle && 
enableNereidsPlanner;
     }
 
     public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
diff --git a/gensrc/thrift/Planner.thrift b/gensrc/thrift/Planner.thrift
index ffcc33638db..866d8d45320 100644
--- a/gensrc/thrift/Planner.thrift
+++ b/gensrc/thrift/Planner.thrift
@@ -64,10 +64,6 @@ struct TPlanFragment {
   8: optional i64 initial_reservation_total_claims
 
   9: optional QueryCache.TQueryCacheParam query_cache_param
-
-  // Using serial source means a serial source operator will be used in this 
fragment (e.g. data will be shuffled to
-  // only 1 exchange operator) and then splitted by followed local exchanger
-  10: optional bool use_serial_source
 }
 
 // location information for a single scan range
diff --git 
a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy 
b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
index 950b6171c7c..d701ad890d6 100644
--- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
+++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
@@ -45,7 +45,7 @@ suite("local_shuffle") {
         insert into test_local_shuffle1 values (1, 1), (2, 2);
         insert into test_local_shuffle2 values (2, 2), (3, 3);
         
-        set enable_nereids_distribute_planner=true;
+        // set enable_nereids_distribute_planner=true;
         set enable_pipeline_x_engine=true;
         set disable_join_reorder=true;
         set enable_local_shuffle=true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to