This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bf737b12530 [Improvement](local shuffle) Improve local shuffle 
strategy (#41789)
bf737b12530 is described below

commit bf737b12530301cd3e2dd6d50918cc3051e76a7c
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Oct 25 10:20:23 2024 +0800

    [Improvement](local shuffle) Improve local shuffle strategy (#41789)
    
    Add local shuffle to unpartitioned fragment to add parallel for
    perfomance
    ```sql
    SELECT h1.UserID, h2.URL, COUNT(*) AS visit_count
    FROM (
      SELECT *
      FROM hits_10m
      LIMIT 5000
    ) AS h1
    CROSS JOIN (
      SELECT *
      FROM hits_10m
      LIMIT 5000
    ) AS h2
    GROUP BY h1.UserID, h2.URL
    ORDER BY visit_count DESC
    LIMIT 1000
    ```
    
    Add a rule to apply local exchanger:
    
    ```
    ┌───────────────────────┐               ┌───────────────────────┐
    │                       │               │                       │
    │Exchange(UNPARTITIONED)│               │Exchange(UNPARTITIONED)│
    │                       │               │                       │
    └───────────────────────┴──────┬────────┴───────────────────────┘
                                   │
                                   │
                                   │
                                   │
                                   │
                                   │
                            ┌──────▼──────┐
                            │             │
                            │ CROSS JOIN  │
                            │             │
                            └──────┬──────┘
                                   │
                                   │
                                   │
                ┌──────────────────▼─────────────────────┐
                │                                        │
                │ LOCAL EXCHANGE (HASH PARTITION) 1 -> n │
                │                                        │
                └──────────────────┬─────────────────────┘
                                   │
                                   │
                                   │
                                   │
                       ▼        ┌──▼────┐
                                │       │
                                │  AGG  │
                                │       │
                                └───────┘
    ```
    
    
    before: 1 min 17.79 sec
    after: 16.73 sec
---
 be/src/pipeline/dependency.h                       |   8 +-
 be/src/pipeline/exec/aggregation_sink_operator.cpp |   5 +-
 be/src/pipeline/exec/aggregation_sink_operator.h   |   7 +-
 .../pipeline/exec/aggregation_source_operator.cpp  |   4 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    |   4 +-
 be/src/pipeline/exec/analytic_source_operator.cpp  |   1 +
 be/src/pipeline/exec/assert_num_rows_operator.cpp  |   1 +
 .../distinct_streaming_aggregation_operator.cpp    |   4 +-
 .../exec/distinct_streaming_aggregation_operator.h |   4 +
 be/src/pipeline/exec/exchange_sink_operator.h      |   1 +
 be/src/pipeline/exec/join_build_sink_operator.cpp  |   2 +
 be/src/pipeline/exec/join_probe_operator.cpp       |   1 +
 .../exec/nested_loop_join_probe_operator.h         |   4 +-
 be/src/pipeline/exec/operator.cpp                  |   9 +-
 be/src/pipeline/exec/operator.h                    |   6 +-
 .../partitioned_aggregation_source_operator.cpp    |   4 +
 .../exec/partitioned_aggregation_source_operator.h |   2 +
 be/src/pipeline/exec/sort_sink_operator.cpp        |   4 +-
 be/src/pipeline/exec/sort_sink_operator.h          |   3 +-
 be/src/pipeline/exec/sort_source_operator.cpp      |   4 +-
 be/src/pipeline/exec/union_source_operator.h       |   4 +-
 .../local_exchange_sink_operator.cpp               |   6 +-
 .../local_exchange/local_exchange_sink_operator.h  |   2 +-
 be/src/pipeline/pipeline.cpp                       |  43 +++++++-
 be/src/pipeline/pipeline.h                         |  44 ++++----
 be/src/pipeline/pipeline_fragment_context.cpp      | 120 +++++++++++++--------
 be/src/pipeline/pipeline_fragment_context.h        |   6 +-
 .../org/apache/doris/planner/AggregationNode.java  |   5 +
 .../org/apache/doris/planner/AnalyticEvalNode.java |   5 +
 .../apache/doris/planner/AssertNumRowsNode.java    |   5 +
 .../org/apache/doris/planner/DataPartition.java    |   4 +
 .../org/apache/doris/planner/EmptySetNode.java     |   4 +
 .../org/apache/doris/planner/ExchangeNode.java     |  10 ++
 .../org/apache/doris/planner/JoinNodeBase.java     |   1 -
 .../apache/doris/planner/NestedLoopJoinNode.java   |  17 +++
 .../org/apache/doris/planner/PlanFragment.java     |  35 ++++++
 .../java/org/apache/doris/planner/PlanNode.java    |  10 ++
 .../java/org/apache/doris/planner/RepeatNode.java  |   5 +
 .../java/org/apache/doris/planner/ScanNode.java    |   5 +
 .../java/org/apache/doris/planner/SelectNode.java  |   5 +
 .../java/org/apache/doris/planner/SortNode.java    |   5 +
 .../java/org/apache/doris/planner/UnionNode.java   |   5 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  22 ++++
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 gensrc/thrift/Planner.thrift                       |   4 +
 .../insert_into_table/complex_insert.groovy        |   6 +-
 .../distribute/local_shuffle.groovy                |  12 +--
 47 files changed, 361 insertions(+), 108 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 0885dbf380f..8060ee8362d 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -111,19 +111,19 @@ public:
     // Notify downstream pipeline tasks this dependency is ready.
     void set_ready();
     void set_ready_to_read() {
-        DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+        DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
         _shared_state->source_deps.front()->set_ready();
     }
     void set_block_to_read() {
-        DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+        DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
         _shared_state->source_deps.front()->block();
     }
     void set_ready_to_write() {
-        DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+        DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
         _shared_state->sink_deps.front()->set_ready();
     }
     void set_block_to_write() {
-        DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+        DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
         _shared_state->sink_deps.front()->block();
     }
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 4007f50f58a..5fb14c02585 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -717,7 +717,10 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int 
operator_id, const TPla
                                    : tnode.agg_node.grouping_exprs),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
           _require_bucket_distribution(require_bucket_distribution),
-          _agg_fn_output_row_descriptor(descs, tnode.row_tuples, 
tnode.nullable_tuples) {}
+          _agg_fn_output_row_descriptor(descs, tnode.row_tuples, 
tnode.nullable_tuples),
+          _without_key(tnode.agg_node.grouping_exprs.empty()) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state));
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 1f846ec88ff..8271f1451b4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -143,9 +143,8 @@ public:
 
     DataDistribution required_data_distribution() const override {
         if (_probe_expr_ctxs.empty()) {
-            return _needs_finalize || 
DataSinkOperatorX<AggSinkLocalState>::_child
-                                              ->ignore_data_distribution()
-                           ? DataDistribution(ExchangeType::PASSTHROUGH)
+            return _needs_finalize
+                           ? DataDistribution(ExchangeType::NOOP)
                            : 
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
         }
         return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
@@ -204,8 +203,8 @@ protected:
     const std::vector<TExpr> _partition_exprs;
     const bool _is_colocate;
     const bool _require_bucket_distribution;
-
     RowDescriptor _agg_fn_output_row_descriptor;
+    const bool _without_key;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index c68601fcdca..6d4cd291079 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -441,7 +441,9 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
                                        const DescriptorTbl& descs)
         : Base(pool, tnode, operator_id, descs),
           _needs_finalize(tnode.agg_node.need_finalize),
-          _without_key(tnode.agg_node.grouping_exprs.empty()) {}
+          _without_key(tnode.agg_node.grouping_exprs.empty()) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 1da5c1f7c35..afe9aeab8fd 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -201,7 +201,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* 
pool, int operator_id,
           _require_bucket_distribution(require_bucket_distribution),
           _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
                                    ? tnode.distribute_expr_lists[0]
-                                   : tnode.analytic_node.partition_exprs) {}
+                                   : tnode.analytic_node.partition_exprs) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 134a0ad82d7..019f95042c2 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -475,6 +475,7 @@ 
AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNo
           _has_range_window(tnode.analytic_node.window.type == 
TAnalyticWindowType::RANGE),
           _has_window_start(tnode.analytic_node.window.__isset.window_start),
           _has_window_end(tnode.analytic_node.window.__isset.window_end) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
     _fn_scope = AnalyticFnScope::PARTITION;
     if (tnode.analytic_node.__isset.window &&
         tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index c1a02b6f838..345e42b7d96 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -27,6 +27,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* 
pool, const TPlanNode
         : StreamingOperatorX<AssertNumRowsLocalState>(pool, tnode, 
operator_id, descs),
           _desired_num_rows(tnode.assert_num_rows_node.desired_num_rows),
           _subquery_string(tnode.assert_num_rows_node.subquery_string) {
+    _is_serial_operator = true;
     if (tnode.assert_num_rows_node.__isset.assertion) {
         _assertion = tnode.assert_num_rows_node.assertion;
     } else {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index ddec533a9ff..a59af8ce7b4 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -326,7 +326,9 @@ 
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
                                    ? tnode.distribute_expr_lists[0]
                                    : tnode.agg_node.grouping_exprs),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
-          _require_bucket_distribution(require_bucket_distribution) {
+          _require_bucket_distribution(require_bucket_distribution),
+          _without_key(tnode.agg_node.grouping_exprs.empty()) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
         _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
         if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 71d289402ec..1f7a21190ad 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -104,6 +104,9 @@ public:
     bool need_more_input_data(RuntimeState* state) const override;
 
     DataDistribution required_data_distribution() const override {
+        if (_needs_finalize && _probe_expr_ctxs.empty()) {
+            return {ExchangeType::NOOP};
+        }
         if (_needs_finalize || (!_probe_expr_ctxs.empty() && 
!_is_streaming_preagg)) {
             return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
@@ -136,6 +139,7 @@ private:
     /// The total size of the row from the aggregate functions.
     size_t _total_size_of_aggregate_states = 0;
     bool _is_streaming_preagg = false;
+    const bool _without_key;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 8af944728a2..689172dfc6b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -224,6 +224,7 @@ public:
     Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* 
src, PBlock* dest,
                            int num_receivers = 1);
     DataDistribution required_data_distribution() const override;
+    bool is_serial_operator() const override { return true; }
 
 private:
     friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp 
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index a1f3262d6ed..fc0d3b87460 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -82,6 +82,8 @@ 
JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool,
           _short_circuit_for_null_in_build_side(_join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
                                                 !_is_mark_join),
           _runtime_filter_descs(tnode.runtime_filters) {
+    DataSinkOperatorX<LocalStateType>::_is_serial_operator =
+            tnode.__isset.is_serial_operator && tnode.is_serial_operator;
     _init_join_op();
     if (_is_mark_join) {
         DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == 
TJoinOp::LEFT_SEMI_JOIN ||
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp 
b/be/src/pipeline/exec/join_probe_operator.cpp
index 8e5010d7513..76dc75a90d8 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -220,6 +220,7 @@ 
JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
                                      : true)
 
           ) {
+    Base::_is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
     if (tnode.__isset.hash_join_node) {
         _intermediate_row_desc.reset(new RowDescriptor(
                 descs, tnode.hash_join_node.vintermediate_tuple_id_list,
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 4121de64210..5b0fec159e2 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -203,7 +203,9 @@ public:
     }
 
     DataDistribution required_data_distribution() const override {
-        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+            _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == 
TJoinOp::RIGHT_ANTI_JOIN ||
+            _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN) {
             return {ExchangeType::NOOP};
         }
         return {ExchangeType::ADAPTIVE_PASSTHROUGH};
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 5a13fdcbd84..6e3099db748 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -141,8 +141,9 @@ std::string 
PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentatio
 
 std::string OperatorXBase::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}",
-                   std::string(indentation_level * 2, ' '), _op_name, 
node_id(), _parallel_tasks);
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}, 
_is_serial_operator={}",
+                   std::string(indentation_level * 2, ' '), _op_name, 
node_id(), _parallel_tasks,
+                   _is_serial_operator);
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -363,8 +364,8 @@ void 
PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos)
 std::string DataSinkOperatorXBase::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
 
-    fmt::format_to(debug_string_buffer, "{}{}: id={}", 
std::string(indentation_level * 2, ' '),
-                   _name, node_id());
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}",
+                   std::string(indentation_level * 2, ' '), _name, node_id(), 
_is_serial_operator);
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index b5bd0fe4713..5df0a19498f 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -101,6 +101,9 @@ public:
         return Status::OK();
     }
 
+    // Operators need to be executed serially. (e.g. finalized agg without key)
+    [[nodiscard]] virtual bool is_serial_operator() const { return 
_is_serial_operator; }
+
     [[nodiscard]] bool is_closed() const { return _is_closed; }
 
     virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
@@ -122,6 +125,7 @@ protected:
 
     bool _is_closed;
     bool _followed_by_shuffled_operator = false;
+    bool _is_serial_operator = false;
 };
 
 class PipelineXLocalStateBase {
@@ -444,7 +448,7 @@ public:
 
     Status init(const TDataSink& tsink) override;
     [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
-                                      const bool is_shuffled_hash_join,
+                                      const bool use_global_hash_shuffle,
                                       const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
         return Status::InternalError("init() is only implemented in local 
exchange!");
     }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 48df5587198..655a6e19725 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -118,6 +118,10 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* 
state) {
     return _agg_source_operator->close(state);
 }
 
+bool PartitionedAggSourceOperatorX::is_serial_operator() const {
+    return _agg_source_operator->is_serial_operator();
+}
+
 Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 bool* eos) {
     auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index edae99c716a..7e73241745e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -91,6 +91,8 @@ public:
 
     bool is_source() const override { return true; }
 
+    bool is_serial_operator() const override;
+
 private:
     friend class PartitionedAggLocalState;
 
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index ee8689a8084..6d6684437b8 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -90,7 +90,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int 
operator_id, const TP
                                                                : 
std::vector<TExpr> {}),
           _algorithm(tnode.sort_node.__isset.algorithm ? 
tnode.sort_node.algorithm
                                                        : 
TSortAlgorithm::FULL_SORT),
-          _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {}
+          _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index 8462472dd02..0829c38b40f 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -69,8 +69,9 @@ public:
         } else if (_merge_by_exchange) {
             // The current sort node is used for the ORDER BY
             return {ExchangeType::PASSTHROUGH};
+        } else {
+            return {ExchangeType::NOOP};
         }
-        return 
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
     }
     bool require_shuffled_data_distribution() const override { return 
_is_analytic_sort; }
     bool require_data_distribution() const override { return _is_colocate; }
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 02a99e183c8..7f801b79c0b 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -30,7 +30,9 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnod
                                          const DescriptorTbl& descs)
         : OperatorX<SortLocalState>(pool, tnode, operator_id, descs),
           _merge_by_exchange(tnode.sort_node.merge_by_exchange),
-          _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) 
{}
+          _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) 
{
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(Base::init(tnode, state));
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 2d112ebf2df..200e7de8597 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -63,7 +63,9 @@ public:
     using Base = OperatorX<UnionSourceLocalState>;
     UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                          const DescriptorTbl& descs)
-            : Base(pool, tnode, operator_id, descs), 
_child_size(tnode.num_children) {};
+            : Base(pool, tnode, operator_id, descs), 
_child_size(tnode.num_children) {
+        _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+    }
     ~UnionSourceOperatorX() override = default;
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index d87113ca80a..ff243186c47 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -36,17 +36,17 @@ std::vector<Dependency*> 
LocalExchangeSinkLocalState::dependencies() const {
 }
 
 Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int 
num_buckets,
-                                        const bool 
should_disable_bucket_shuffle,
+                                        const bool use_global_hash_shuffle,
                                         const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
     _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + 
")";
     _type = type;
     if (_type == ExchangeType::HASH_SHUFFLE) {
-        _use_global_shuffle = should_disable_bucket_shuffle;
+        _use_global_shuffle = use_global_hash_shuffle;
         // For shuffle join, if data distribution has been broken by previous 
operator, we
         // should use a HASH_SHUFFLE local exchanger to shuffle data again. To 
be mentioned,
         // we should use map shuffle idx to instance idx because all instances 
will be
         // distributed to all BEs. Otherwise, we should use shuffle idx 
directly.
-        if (should_disable_bucket_shuffle) {
+        if (use_global_hash_shuffle) {
             std::for_each(shuffle_idx_to_instance_idx.begin(), 
shuffle_idx_to_instance_idx.end(),
                           [&](const auto& item) {
                               DCHECK(item.first != -1);
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 1cd9736d429..09b1f2cc310 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -100,7 +100,7 @@ public:
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
     }
 
-    Status init(ExchangeType type, const int num_buckets, const bool 
should_disable_bucket_shuffle,
+    Status init(ExchangeType type, const int num_buckets, const bool 
use_global_hash_shuffle,
                 const std::map<int, int>& shuffle_idx_to_instance_idx) 
override;
 
     Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 6e83c7805e4..5b93fbdf1f8 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -22,6 +22,7 @@
 #include <utility>
 
 #include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_task.h"
 
 namespace doris::pipeline {
@@ -31,7 +32,47 @@ void Pipeline::_init_profile() {
     _pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s));
 }
 
-Status Pipeline::add_operator(OperatorPtr& op) {
+bool Pipeline::need_to_local_exchange(const DataDistribution 
target_data_distribution,
+                                      const int idx) const {
+    // If serial operator exists after `idx`-th operator, we should not 
improve parallelism.
+    if (std::any_of(_operators.begin() + idx, _operators.end(),
+                    [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+        return false;
+    }
+    if (std::all_of(_operators.begin(), _operators.end(),
+                    [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+        if (!_sink->is_serial_operator()) {
+            return true;
+        }
+    } else if (std::any_of(_operators.begin(), _operators.end(),
+                           [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+        return true;
+    }
+
+    if (target_data_distribution.distribution_type != 
ExchangeType::BUCKET_HASH_SHUFFLE &&
+        target_data_distribution.distribution_type != 
ExchangeType::HASH_SHUFFLE) {
+        return true;
+    } else if (_operators.front()->ignore_data_hash_distribution()) {
+        if (_data_distribution.distribution_type == 
target_data_distribution.distribution_type &&
+            (_data_distribution.partition_exprs.empty() ||
+             target_data_distribution.partition_exprs.empty())) {
+            return true;
+        }
+        return _data_distribution.distribution_type != 
target_data_distribution.distribution_type &&
+               !(is_hash_exchange(_data_distribution.distribution_type) &&
+                 is_hash_exchange(target_data_distribution.distribution_type));
+    } else {
+        return _data_distribution.distribution_type != 
target_data_distribution.distribution_type &&
+               !(is_hash_exchange(_data_distribution.distribution_type) &&
+                 is_hash_exchange(target_data_distribution.distribution_type));
+    }
+}
+
+Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) {
+    if (parallelism > 0 && op->is_serial_operator()) {
+        set_num_tasks(parallelism);
+        op->set_ignore_data_distribution();
+    }
     op->set_parallel_tasks(num_tasks());
     _operators.emplace_back(op);
     if (op->is_source()) {
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 8a20ccb631c..ef0ae9e9a75 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -44,14 +44,16 @@ class Pipeline : public 
std::enable_shared_from_this<Pipeline> {
 
 public:
     explicit Pipeline(PipelineId pipeline_id, int num_tasks,
-                      std::weak_ptr<PipelineFragmentContext> context)
-            : _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
+                      std::weak_ptr<PipelineFragmentContext> context, int 
num_tasks_of_parent)
+            : _pipeline_id(pipeline_id),
+              _num_tasks(num_tasks),
+              _num_tasks_of_parent(num_tasks_of_parent) {
         _init_profile();
         _tasks.resize(_num_tasks, nullptr);
     }
 
     // Add operators for pipelineX
-    Status add_operator(OperatorPtr& op);
+    Status add_operator(OperatorPtr& op, const int parallelism);
     // prepare operators for pipelineX
     Status prepare(RuntimeState* state);
 
@@ -71,28 +73,8 @@ public:
         return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE;
     }
 
-    bool need_to_local_exchange(const DataDistribution 
target_data_distribution) const {
-        if (target_data_distribution.distribution_type != 
ExchangeType::BUCKET_HASH_SHUFFLE &&
-            target_data_distribution.distribution_type != 
ExchangeType::HASH_SHUFFLE) {
-            return true;
-        } else if (_operators.front()->ignore_data_hash_distribution()) {
-            if (_data_distribution.distribution_type ==
-                        target_data_distribution.distribution_type &&
-                (_data_distribution.partition_exprs.empty() ||
-                 target_data_distribution.partition_exprs.empty())) {
-                return true;
-            }
-            return _data_distribution.distribution_type !=
-                           target_data_distribution.distribution_type &&
-                   !(is_hash_exchange(_data_distribution.distribution_type) &&
-                     
is_hash_exchange(target_data_distribution.distribution_type));
-        } else {
-            return _data_distribution.distribution_type !=
-                           target_data_distribution.distribution_type &&
-                   !(is_hash_exchange(_data_distribution.distribution_type) &&
-                     
is_hash_exchange(target_data_distribution.distribution_type));
-        }
-    }
+    bool need_to_local_exchange(const DataDistribution 
target_data_distribution,
+                                const int idx) const;
     void init_data_distribution() {
         
set_data_distribution(_operators.front()->required_data_distribution());
     }
@@ -120,6 +102,14 @@ public:
         for (auto& op : _operators) {
             op->set_parallel_tasks(_num_tasks);
         }
+
+#ifndef NDEBUG
+        if (num_tasks > 1 &&
+            std::any_of(_operators.begin(), _operators.end(),
+                        [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+            DCHECK(false) << debug_string();
+        }
+#endif
     }
     int num_tasks() const { return _num_tasks; }
     bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
@@ -136,6 +126,8 @@ public:
         return fmt::to_string(debug_string_buffer);
     }
 
+    int num_tasks_of_parent() const { return _num_tasks_of_parent; }
+
 private:
     void _init_profile();
 
@@ -173,6 +165,8 @@ private:
     std::atomic<int> _num_tasks_running = 0;
     // Tasks in this pipeline.
     std::vector<PipelineTask*> _tasks;
+    // Parallelism of parent pipeline.
+    const int _num_tasks_of_parent;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 28cfefbf6c1..fd3baefa76f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -214,8 +214,9 @@ void PipelineFragmentContext::cancel(const Status reason) {
 PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) 
{
     PipelineId id = _next_pipeline_id++;
     auto pipeline = std::make_shared<Pipeline>(
-            id, _num_instances,
-            
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
+            id, parent ? std::min(parent->num_tasks(), _num_instances) : 
_num_instances,
+            
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
+            parent ? parent->num_tasks() : _num_instances);
     if (idx >= 0) {
         _pipelines.insert(_pipelines.begin() + idx, pipeline);
     } else {
@@ -235,6 +236,8 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     if (request.__isset.query_options && 
request.query_options.__isset.execution_timeout) {
         _timeout = request.query_options.execution_timeout;
     }
+    _use_serial_source =
+            request.fragment.__isset.use_serial_source && 
request.fragment.use_serial_source;
 
     _fragment_level_profile = 
std::make_unique<RuntimeProfile>("PipelineContext");
     _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
@@ -749,13 +752,12 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     const bool followed_by_shuffled_operator =
             operators.size() > idx ? 
operators[idx]->followed_by_shuffled_operator()
                                    : 
cur_pipe->sink()->followed_by_shuffled_operator();
-    const bool should_disable_bucket_shuffle =
+    const bool use_global_hash_shuffle =
             bucket_seq_to_instance_idx.empty() &&
             shuffle_idx_to_instance_idx.find(-1) == 
shuffle_idx_to_instance_idx.end() &&
-            followed_by_shuffled_operator;
+            followed_by_shuffled_operator && !_use_serial_source;
     sink.reset(new LocalExchangeSinkOperatorX(
-            sink_id, local_exchange_id,
-            should_disable_bucket_shuffle ? _total_instances : _num_instances,
+            sink_id, local_exchange_id, use_global_hash_shuffle ? 
_total_instances : _num_instances,
             data_distribution.partition_exprs, bucket_seq_to_instance_idx));
     if (bucket_seq_to_instance_idx.empty() &&
         data_distribution.distribution_type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
@@ -763,8 +765,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     }
     RETURN_IF_ERROR(new_pip->set_sink(sink));
     RETURN_IF_ERROR(new_pip->sink()->init(data_distribution.distribution_type, 
num_buckets,
-                                          should_disable_bucket_shuffle,
-                                          shuffle_idx_to_instance_idx));
+                                          use_global_hash_shuffle, 
shuffle_idx_to_instance_idx));
 
     // 2. Create and initialize LocalExchangeSharedState.
     std::shared_ptr<LocalExchangeSharedState> shared_state =
@@ -775,7 +776,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances),
-                should_disable_bucket_shuffle ? _total_instances : 
_num_instances,
+                use_global_hash_shuffle ? _total_instances : _num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
                         : 0);
@@ -915,11 +916,11 @@ Status PipelineFragmentContext::_add_local_exchange(
         const std::map<int, int>& bucket_seq_to_instance_idx,
         const std::map<int, int>& shuffle_idx_to_instance_idx,
         const bool ignore_data_distribution) {
-    if (_num_instances <= 1) {
+    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
         return Status::OK();
     }
 
-    if (!cur_pipe->need_to_local_exchange(data_distribution)) {
+    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
         return Status::OK();
     }
     *do_local_exchange = true;
@@ -1154,7 +1155,8 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
             // 1. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
             source_op.reset(new MultiCastDataStreamerSourceOperatorX(
                     i, pool, thrift_sink.multi_cast_stream_sink.sinks[i], 
row_desc, source_id));
-            RETURN_IF_ERROR(new_pipeline->add_operator(source_op));
+            RETURN_IF_ERROR(new_pipeline->add_operator(
+                    source_op, params.__isset.parallel_instances ? 
params.parallel_instances : 0));
             // 2. create and set sink operator of data stream sender for new 
pipeline
 
             DataSinkOperatorPtr sink_op;
@@ -1203,7 +1205,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         op.reset(new OlapScanOperatorX(
                 pool, tnode, next_operator_id(), descs, _num_instances,
                 enable_query_cache ? request.fragment.query_cache_param : 
TQueryCacheParam {}));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         if (request.__isset.parallel_instances) {
             cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
@@ -1216,7 +1219,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         _query_ctx->query_mem_tracker->is_group_commit_load = true;
 #endif
         op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), 
descs, _num_instances));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         if (request.__isset.parallel_instances) {
             cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
@@ -1226,7 +1230,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     case doris::TPlanNodeType::JDBC_SCAN_NODE: {
         if (config::enable_java_support) {
             op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), 
descs, _num_instances));
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            RETURN_IF_ERROR(cur_pipe->add_operator(
+                    op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         } else {
             return Status::InternalError(
                     "Jdbc scan node is disabled, you can change be config 
enable_java_support "
@@ -1240,7 +1245,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case doris::TPlanNodeType::FILE_SCAN_NODE: {
         op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         if (request.__isset.parallel_instances) {
             cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
@@ -1250,7 +1256,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     case TPlanNodeType::ES_SCAN_NODE:
     case TPlanNodeType::ES_HTTP_SCAN_NODE: {
         op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         if (request.__isset.parallel_instances) {
             cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
@@ -1261,7 +1268,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         int num_senders = find_with_default(request.per_exch_num_senders, 
tnode.node_id, 0);
         DCHECK_GT(num_senders, 0);
         op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), 
descs, num_senders));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         if (request.__isset.parallel_instances) {
             op->set_ignore_data_distribution();
             cur_pipe->set_num_tasks(request.parallel_instances);
@@ -1280,7 +1288,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             auto cache_source_id = next_operator_id();
             op.reset(new CacheSourceOperatorX(pool, cache_node_id, 
cache_source_id,
                                               
request.fragment.query_cache_param));
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            RETURN_IF_ERROR(cur_pipe->add_operator(
+                    op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
             const auto downstream_pipeline_id = cur_pipe->id();
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1315,7 +1324,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                                            
_require_bucket_distribution));
                 op->set_followed_by_shuffled_operator(false);
                 _require_bucket_distribution = true;
-                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                RETURN_IF_ERROR(new_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                 cur_pipe = new_pipe;
             } else {
@@ -1324,7 +1334,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
                 _require_bucket_distribution =
                         _require_bucket_distribution || 
op->require_data_distribution();
-                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+                RETURN_IF_ERROR(cur_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
             }
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
                    tnode.agg_node.use_streaming_preaggregation &&
@@ -1335,11 +1346,13 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 
                 op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
-                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                RETURN_IF_ERROR(new_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
                 cur_pipe = new_pipe;
             } else {
                 op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
-                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+                RETURN_IF_ERROR(cur_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
             }
         } else {
             // create new pipeline to add query cache operator
@@ -1355,10 +1368,12 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             }
             if (enable_query_cache) {
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
-                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                RETURN_IF_ERROR(new_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
                 cur_pipe = new_pipe;
             } else {
-                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+                RETURN_IF_ERROR(cur_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
             }
 
             const auto downstream_pipeline_id = cur_pipe->id();
@@ -1406,7 +1421,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                     pool, tnode_, next_operator_id(), descs, partition_count);
             probe_operator->set_inner_operators(inner_sink_operator, 
inner_probe_operator);
             op = std::move(probe_operator);
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            RETURN_IF_ERROR(cur_pipe->add_operator(
+                    op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
             const auto downstream_pipeline_id = cur_pipe->id();
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1430,7 +1446,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
         } else {
             op.reset(new HashJoinProbeOperatorX(pool, tnode, 
next_operator_id(), descs));
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            RETURN_IF_ERROR(cur_pipe->add_operator(
+                    op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
             const auto downstream_pipeline_id = cur_pipe->id();
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1457,7 +1474,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::CROSS_JOIN_NODE: {
         op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, 
next_operator_id(), descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1480,7 +1498,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         int child_count = tnode.num_children;
         op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
         op->set_followed_by_shuffled_operator(_require_bucket_distribution);
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1508,7 +1527,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         } else {
             op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
         }
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1535,7 +1555,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case doris::TPlanNodeType::PARTITION_SORT_NODE: {
         op.reset(new PartitionSortSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1553,7 +1574,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::ANALYTIC_EVAL_NODE: {
         op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1575,39 +1597,44 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::INTERSECT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
+                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, 
request));
         op->set_followed_by_shuffled_operator(_require_bucket_distribution);
         break;
     }
     case TPlanNodeType::EXCEPT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
+                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, 
request));
         op->set_followed_by_shuffled_operator(_require_bucket_distribution);
         break;
     }
     case TPlanNodeType::REPEAT_NODE: {
         op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::TABLE_FUNCTION_NODE: {
         op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
         op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::EMPTY_SET_NODE: {
         op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::DATA_GEN_SCAN_NODE: {
         op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         if (request.__isset.parallel_instances) {
             cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
@@ -1616,17 +1643,20 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::SCHEMA_SCAN_NODE: {
         op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::META_SCAN_NODE: {
         op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::SELECT_NODE: {
         op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     default:
@@ -1642,9 +1672,11 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 template <bool is_intersect>
 Status PipelineFragmentContext::_build_operators_for_set_operation_node(
         ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, 
OperatorPtr& op,
-        PipelinePtr& cur_pipe, int parent_idx, int child_idx) {
+        PipelinePtr& cur_pipe, int parent_idx, int child_idx,
+        const doris::TPipelineFragmentParams& request) {
     op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, 
next_operator_id(), descs));
-    RETURN_IF_ERROR(cur_pipe->add_operator(op));
+    RETURN_IF_ERROR(cur_pipe->add_operator(
+            op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
     const auto downstream_pipeline_id = cur_pipe->id();
     if (_dag.find(downstream_pipeline_id) == _dag.end()) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 0749729789e..6caa0e5c106 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -121,7 +121,7 @@ public:
                 _tasks[j][i]->stop_if_finished();
             }
         }
-    };
+    }
 
 private:
     Status _build_pipelines(ObjectPool* pool, const 
doris::TPipelineFragmentParams& request,
@@ -140,7 +140,8 @@ private:
     Status _build_operators_for_set_operation_node(ObjectPool* pool, const 
TPlanNode& tnode,
                                                    const DescriptorTbl& descs, 
OperatorPtr& op,
                                                    PipelinePtr& cur_pipe, int 
parent_idx,
-                                                   int child_idx);
+                                                   int child_idx,
+                                                   const 
doris::TPipelineFragmentParams& request);
 
     Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
                              const std::vector<TExpr>& output_exprs,
@@ -224,6 +225,7 @@ private:
     int _num_instances = 1;
 
     int _timeout = -1;
+    bool _use_serial_source = false;
 
     OperatorPtr _root_op = nullptr;
     // this is a [n * m] matrix. n is parallelism of pipeline engine and m is 
the number of pipelines.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 4dca9384d65..55d1b4b50c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -488,6 +488,11 @@ public class AggregationNode extends PlanNode {
         }
     }
 
+    @Override
+    public boolean isSerialOperator() {
+        return aggInfo.getGroupingExprs().isEmpty() && needsFinalize;
+    }
+
     public void setColocate(boolean colocate) {
         isColocate = colocate;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index cdbf827aed9..dce6c3d1b04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -296,4 +296,9 @@ public class AnalyticEvalNode extends PlanNode {
 
         return output.toString();
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return partitionExprs.isEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
index 57d9ce8742f..a4c4aa42c65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
@@ -116,4 +116,9 @@ public class AssertNumRowsNode extends PlanNode {
     public int getNumInstances() {
         return 1;
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 9c6ba83408a..ce57a57c377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -90,6 +90,10 @@ public class DataPartition {
         return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
     }
 
+    public boolean isTabletSinkShufflePartition() {
+        return type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED;
+    }
+
     public TPartitionType getType() {
         return type;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
index 867c220d9fe..f6ddf23429e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
@@ -81,4 +81,8 @@ public class EmptySetNode extends PlanNode {
         return 1;
     }
 
+    @Override
+    public boolean isSerialOperator() {
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 4ada9a82f7c..7af09287191 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -195,6 +195,11 @@ public class ExchangeNode extends PlanNode {
         return prefix + "offset: " + offset + "\n";
     }
 
+    @Override
+    public boolean isMerging() {
+        return mergeInfo != null;
+    }
+
     public boolean isRightChildOfBroadcastHashJoin() {
         return isRightChildOfBroadcastHashJoin;
     }
@@ -202,4 +207,9 @@ public class ExchangeNode extends PlanNode {
     public void setRightChildOfBroadcastHashJoin(boolean value) {
         isRightChildOfBroadcastHashJoin = value;
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
index 91a3c26e770..5dc81e29d85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
@@ -597,7 +597,6 @@ public abstract class JoinNodeBase extends PlanNode {
         this.useSpecificProjections = useSpecificProjections;
     }
 
-
     public boolean isUseSpecificProjections() {
         return useSpecificProjections;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 30c0a2d0394..c7b3525e4cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -281,4 +281,21 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         }
         return output.toString();
     }
+
+    /**
+     * If joinOp is one of type below:
+     * 1. NULL_AWARE_LEFT_ANTI_JOIN
+     * 2. RIGHT_OUTER_JOIN
+     * 3. RIGHT_ANTI_JOIN
+     * 4. RIGHT_SEMI_JOIN
+     *
+     * We will
+     * @return
+     */
+    @Override
+    public boolean isSerialOperator() {
+        return joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN || joinOp == 
JoinOperator.RIGHT_OUTER_JOIN
+                || joinOp == JoinOperator.RIGHT_ANTI_JOIN || joinOp == 
JoinOperator.RIGHT_SEMI_JOIN
+                || joinOp == JoinOperator.FULL_OUTER_JOIN;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index ae1d34308a3..3e3c49bf675 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -341,6 +341,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         // TODO chenhao , calculated by cost
         result.setMinReservationBytes(0);
         result.setInitialReservationTotalClaims(0);
+        result.setUseSerialSource(useSerialSource(ConnectContext.get()));
         return result;
     }
 
@@ -502,4 +503,38 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     public boolean hasNullAwareLeftAntiJoin() {
         return planRoot.isNullAwareLeftAntiJoin();
     }
+
+    private boolean isMergingFragment() {
+        return planRoot.isMerging();
+    }
+
+    public boolean useSerialSource(ConnectContext context) {
+        return context != null
+                && 
context.getSessionVariable().isIgnoreStorageDataDistribution()
+                && !hasNullAwareLeftAntiJoin()
+                // If input data partition is UNPARTITIONED and sink is 
DataStreamSink and root node is not a serial
+                // operator, we use local exchange to improve parallelism
+                && getDataPartition() == DataPartition.UNPARTITIONED && 
!children.isEmpty()
+                && sink instanceof DataStreamSink && 
!planRoot.isSerialOperator()
+                /**
+                 * If table `t1` has unique key `k1` and value column `v1`.
+                 * Now use plan below to load data into `t1`:
+                 * ```
+                 * FRAGMENT 0:
+                 *  Merging Exchange (id = 1)
+                 *   NL Join (id = 2)
+                 *  DataStreamSender (id = 3, dst_id = 3) 
(TABLET_SINK_SHUFFLE_PARTITIONED)
+                 *
+                 * FRAGMENT 1:
+                 *  Exchange (id = 3)
+                 *  OlapTableSink (id = 4) ```
+                 *
+                 * In this plan, `Exchange (id = 1)` needs to do merge sort 
using column `k1` and `v1` so parallelism
+                 * of FRAGMENT 0 must be 1 and data will be shuffled to 
FRAGMENT 1 which also has only 1 instance
+                 * because this loading job relies on the global ordering of 
column `k1` and `v1`.
+                 *
+                 * So FRAGMENT 0 should not use serial source.
+                 */
+                && !isMergingFragment();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 1e9d5646939..d1ba493682b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -279,6 +279,10 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         return children.stream().anyMatch(PlanNode::isNullAwareLeftAntiJoin);
     }
 
+    public boolean isMerging() {
+        return children.stream().anyMatch(PlanNode::isMerging);
+    }
+
     public PlanFragment getFragment() {
         return fragment;
     }
@@ -639,6 +643,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         TPlanNode msg = new TPlanNode();
         msg.node_id = id.asInt();
         msg.setNereidsId(nereidsId);
+        msg.setIsSerialOperator(isSerialOperator());
         msg.num_children = children.size();
         msg.limit = limit;
         for (TupleId tid : tupleIds) {
@@ -1374,4 +1379,9 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
             return true;
         });
     }
+
+    // Operators need to be executed serially. (e.g. finalized agg without key)
+    public boolean isSerialOperator() {
+        return false;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
index 3c6a88cea08..407d8a6444c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
@@ -200,4 +200,9 @@ public class RepeatNode extends PlanNode {
         }
         return output.toString();
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return children.get(0).isSerialOperator();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index a92cac7b510..1681699d651 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -848,4 +848,9 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     public long getSelectedSplitNum() {
         return selectedSplitNum;
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return true;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
index 6c6b665b00a..b3b088837a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
@@ -109,4 +109,9 @@ public class SelectNode extends PlanNode {
         }
         return output.toString();
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return children.get(0).isSerialOperator();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index e3c405bcbab..fc1c50c0bba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -389,6 +389,11 @@ public class SortNode extends PlanNode {
         return new HashSet<>(result);
     }
 
+    @Override
+    public boolean isSerialOperator() {
+        return !isAnalyticSort && !mergeByexchange;
+    }
+
     public void setColocate(boolean colocate) {
         isColocate = colocate;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index 40982d07e77..bf48a770f1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -42,4 +42,9 @@ public class UnionNode extends SetOperationNode {
     protected void toThrift(TPlanNode msg) {
         toThrift(msg, TPlanNodeType.UNION_NODE);
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return children.isEmpty();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8e580c549df..4eda6775b5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1779,6 +1779,20 @@ public class Coordinator implements CoordInterface {
                 FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, execHostport,
                         0, params);
                 params.instanceExecParams.add(instanceParam);
+
+                // Using serial source means a serial source operator will be 
used in this fragment (e.g. data will be
+                // shuffled to only 1 exchange operator) and then splitted by 
followed local exchanger
+                int expectedInstanceNum = fragment.getParallelExecNum();
+                boolean useSerialSource = fragment.useSerialSource(context) && 
useNereids
+                        && fragment.queryCacheParam == null;
+                if (useSerialSource) {
+                    for (int j = 1; j < expectedInstanceNum; j++) {
+                        params.instanceExecParams.add(new FInstanceExecParam(
+                                null, execHostport, 0, params));
+                    }
+                    params.ignoreDataDistribution = true;
+                    params.parallelTasksNum = 1;
+                }
                 continue;
             }
 
@@ -1808,6 +1822,10 @@ public class Coordinator implements CoordInterface {
                 if (leftMostNode.getNumInstances() == 1) {
                     exchangeInstances = 1;
                 }
+                // Using serial source means a serial source operator will be 
used in this fragment (e.g. data will be
+                // shuffled to only 1 exchange operator) and then splitted by 
followed local exchanger
+                boolean useSerialSource = fragment.useSerialSource(context) && 
useNereids
+                        && fragment.queryCacheParam == null;
                 if (exchangeInstances > 0 && 
fragmentExecParamsMap.get(inputFragmentId)
                         .instanceExecParams.size() > exchangeInstances) {
                     // random select some instance
@@ -1825,12 +1843,16 @@ public class Coordinator implements CoordInterface {
                                 hosts.get(index % hosts.size()), 0, params);
                         params.instanceExecParams.add(instanceParam);
                     }
+                    params.ignoreDataDistribution = useSerialSource;
+                    params.parallelTasksNum = useSerialSource ? 1 : 
params.instanceExecParams.size();
                 } else {
                     for (FInstanceExecParam execParams
                             : 
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
                         FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, execParams.host, 0, params);
                         params.instanceExecParams.add(instanceParam);
                     }
+                    params.ignoreDataDistribution = useSerialSource;
+                    params.parallelTasksNum = useSerialSource ? 1 : 
params.instanceExecParams.size();
                 }
 
                 // When group by cardinality is smaller than number of 
backend, only some backends always
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 5c0273da791..eb5266942c0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1366,6 +1366,7 @@ struct TPlanNode {
   49: optional i64 push_down_count
 
   50: optional list<list<Exprs.TExpr>> distribute_expr_lists
+  51: optional bool is_serial_operator
   // projections is final projections, which means projecting into results and 
materializing them into the output block.
   101: optional list<Exprs.TExpr> projections
   102: optional Types.TTupleId output_tuple_id
diff --git a/gensrc/thrift/Planner.thrift b/gensrc/thrift/Planner.thrift
index 866d8d45320..ffcc33638db 100644
--- a/gensrc/thrift/Planner.thrift
+++ b/gensrc/thrift/Planner.thrift
@@ -64,6 +64,10 @@ struct TPlanFragment {
   8: optional i64 initial_reservation_total_claims
 
   9: optional QueryCache.TQueryCacheParam query_cache_param
+
+  // Using serial source means a serial source operator will be used in this 
fragment (e.g. data will be shuffled to
+  // only 1 exchange operator) and then splitted by followed local exchanger
+  10: optional bool use_serial_source
 }
 
 // location information for a single scan range
diff --git 
a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy 
b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
index 2493a7df5de..049cbe0b4d7 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
@@ -177,15 +177,15 @@ suite('complex_insert') {
 
     sql 'insert into t1(id, c1, c2, c3) select id, c1 * 2, c2, c3 from t1'
     sql 'sync'
-    qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+    qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, 
t3.id'
 
     sql 'insert into t2(id, c1, c2, c3) select id, c1, c2 * 2, c3 from t2'
     sql 'sync'
-    qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+    qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, 
t3.id'
 
     sql 'insert into t2(c1, c3) select c1 + 1, c3 + 1 from (select id, c1, c3 
from t1 order by id, c1 limit 10) t1, t3'
     sql 'sync'
-    qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+    qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, 
t3.id'
 
     sql 'drop table if exists agg_have_dup_base'
 
diff --git 
a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy 
b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
index 997230b1a06..950b6171c7c 100644
--- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
+++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
@@ -52,7 +52,7 @@ suite("local_shuffle") {
         set force_to_local_shuffle=true;
         """
 
-    order_qt_read_single_olap_table "select * from test_local_shuffle1"
+    order_qt_read_single_olap_table "select * from test_local_shuffle1 order 
by id, id2"
 
     order_qt_broadcast_join """
         select *
@@ -96,7 +96,7 @@ suite("local_shuffle") {
         ) a
         right outer join [shuffle]
         test_local_shuffle2
-        on a.id=test_local_shuffle2.id2
+        on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id, 
test_local_shuffle2.id2
         """
 
     order_qt_bucket_shuffle_with_prune_tablets2 """
@@ -109,7 +109,7 @@ suite("local_shuffle") {
             from test_local_shuffle1
             where id=1
         ) a
-        on a.id=test_local_shuffle2.id2
+        on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id, 
test_local_shuffle2.id2
         """
 
     order_qt_bucket_shuffle_with_prune_tablets3 """
@@ -150,11 +150,11 @@ suite("local_shuffle") {
         """
 
     order_qt_fillup_bucket """
-            SELECT cast(a.c0 as int), cast(b.c0 as int) FROM
+            SELECT cast(a.c0 as int), cast(b.c0 as int) res FROM
             (select * from test_local_shuffle3 where c0 =1)a
             RIGHT OUTER JOIN
             (select * from test_local_shuffle4)b
-            ON a.c0 = b.c0
+            ON a.c0 = b.c0 order by res
             """
 
     multi_sql """
@@ -182,6 +182,6 @@ suite("local_shuffle") {
             ) a
             inner join [shuffle]
             test_shuffle_left_with_local_shuffle b
-            on a.id2=b.id;
+            on a.id2=b.id order by a.id2;
         """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to