This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d43fad88394622b2e0f45789176d18caf9ab3fcc
Author: Gabriel <[email protected]>
AuthorDate: Fri Feb 27 17:38:31 2026 +0800

    [refactor](local shuffle) Plan local exchanger in FE (BE part)
---
 be/src/pipeline/dependency.h                       | 50 +++++++--------
 be/src/pipeline/exec/aggregation_sink_operator.h   |  8 ++-
 be/src/pipeline/exec/analytic_sink_operator.h      |  8 ++-
 be/src/pipeline/exec/assert_num_rows_operator.h    |  2 +-
 .../exec/distinct_streaming_aggregation_operator.h | 10 +--
 be/src/pipeline/exec/exchange_source_operator.h    |  8 +--
 be/src/pipeline/exec/hashjoin_build_sink.h         | 12 ++--
 be/src/pipeline/exec/hashjoin_probe_operator.h     | 14 ++--
 .../exec/nested_loop_join_build_operator.h         |  6 +-
 .../exec/nested_loop_join_probe_operator.h         |  4 +-
 be/src/pipeline/exec/operator.cpp                  |  4 +-
 be/src/pipeline/exec/operator.h                    |  6 +-
 .../pipeline/exec/partition_sort_sink_operator.h   |  4 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  2 -
 .../exec/partitioned_hash_join_probe_operator.h    | 14 +---
 .../exec/partitioned_hash_join_sink_operator.h     |  6 +-
 .../pipeline/exec/rec_cte_anchor_sink_operator.h   |  2 +-
 be/src/pipeline/exec/rec_cte_sink_operator.h       |  2 +-
 be/src/pipeline/exec/rec_cte_source_operator.h     |  2 +-
 be/src/pipeline/exec/scan_operator.h               |  4 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |  6 +-
 be/src/pipeline/exec/set_sink_operator.h           |  4 +-
 be/src/pipeline/exec/set_source_operator.h         |  4 +-
 be/src/pipeline/exec/sort_sink_operator.h          | 10 +--
 .../pipeline/exec/streaming_aggregation_operator.h |  7 +-
 be/src/pipeline/exec/table_function_operator.h     |  2 +-
 be/src/pipeline/exec/union_sink_operator.h         |  4 +-
 be/src/pipeline/exec/union_source_operator.h       |  4 +-
 .../local_exchange_sink_operator.cpp               | 48 +++++++-------
 .../local_exchange/local_exchange_sink_operator.h  | 22 ++++---
 .../local_exchange_source_operator.cpp             |  5 +-
 .../local_exchange_source_operator.h               | 35 ++++++++--
 be/src/pipeline/local_exchange/local_exchanger.h   | 42 ++++++------
 be/src/pipeline/pipeline.cpp                       |  3 +-
 be/src/pipeline/pipeline.h                         | 11 ++--
 be/src/pipeline/pipeline_fragment_context.cpp      | 74 +++++++++++++++++-----
 be/src/runtime/runtime_state.h                     |  7 +-
 be/test/pipeline/local_exchanger_test.cpp          | 10 +--
 be/test/pipeline/pipeline_test.cpp                 | 21 +++---
 gensrc/thrift/PaloInternalService.thrift           |  3 +
 gensrc/thrift/Partitions.thrift                    | 34 ++++++++++
 gensrc/thrift/PlanNodes.thrift                     | 22 ++++++-
 42 files changed, 340 insertions(+), 206 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 4c9b8d3eb2a..e4bb92df7dc 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -715,50 +715,44 @@ public:
     Status hash_table_init();
 };
 
-enum class ExchangeType : uint8_t {
-    NOOP = 0,
-    // Shuffle data by Crc32CHashPartitioner
-    HASH_SHUFFLE = 1,
-    // Round-robin passthrough data blocks.
-    PASSTHROUGH = 2,
-    // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as 
storage engine).
-    BUCKET_HASH_SHUFFLE = 3,
-    // Passthrough data blocks to all channels.
-    BROADCAST = 4,
-    // Passthrough data to channels evenly in an adaptive way.
-    ADAPTIVE_PASSTHROUGH = 5,
-    // Send all data to the first channel.
-    PASS_TO_ONE = 6,
-};
+inline bool is_shuffled_exchange(TLocalPartitionType::type idx) {
+    return idx == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
+           idx == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE ||
+           idx == TLocalPartitionType::BUCKET_HASH_SHUFFLE;
+}
 
-inline std::string get_exchange_type_name(ExchangeType idx) {
+inline std::string get_exchange_type_name(TLocalPartitionType::type idx) {
     switch (idx) {
-    case ExchangeType::NOOP:
+    case TLocalPartitionType::NOOP:
         return "NOOP";
-    case ExchangeType::HASH_SHUFFLE:
-        return "HASH_SHUFFLE";
-    case ExchangeType::PASSTHROUGH:
+    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
+        return "GLOBAL_HASH_SHUFFLE";
+    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
+        return "LOCAL_HASH_SHUFFLE";
+    case TLocalPartitionType::PASSTHROUGH:
         return "PASSTHROUGH";
-    case ExchangeType::BUCKET_HASH_SHUFFLE:
+    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
         return "BUCKET_HASH_SHUFFLE";
-    case ExchangeType::BROADCAST:
+    case TLocalPartitionType::BROADCAST:
         return "BROADCAST";
-    case ExchangeType::ADAPTIVE_PASSTHROUGH:
+    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
         return "ADAPTIVE_PASSTHROUGH";
-    case ExchangeType::PASS_TO_ONE:
+    case TLocalPartitionType::PASS_TO_ONE:
         return "PASS_TO_ONE";
+    case TLocalPartitionType::LOCAL_MERGE_SORT:
+        return "LOCAL_MERGE_SORT";
     }
     throw Exception(Status::FatalError("__builtin_unreachable"));
 }
 
 struct DataDistribution {
-    DataDistribution(ExchangeType type) : distribution_type(type) {}
-    DataDistribution(ExchangeType type, const std::vector<TExpr>& 
partition_exprs_)
+    DataDistribution(TLocalPartitionType::type type) : distribution_type(type) 
{}
+    DataDistribution(TLocalPartitionType::type type, const std::vector<TExpr>& 
partition_exprs_)
             : distribution_type(type), partition_exprs(partition_exprs_) {}
     DataDistribution(const DataDistribution& other) = default;
-    bool need_local_exchange() const { return distribution_type != 
ExchangeType::NOOP; }
+    bool need_local_exchange() const { return distribution_type != 
TLocalPartitionType::NOOP; }
     DataDistribution& operator=(const DataDistribution& other) = default;
-    ExchangeType distribution_type;
+    TLocalPartitionType::type distribution_type;
     std::vector<TExpr> partition_exprs;
 };
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 996daf90149..882aada9047 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -155,13 +155,15 @@ public:
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         if (_partition_exprs.empty()) {
             return _needs_finalize
-                           ? DataDistribution(ExchangeType::NOOP)
+                           ? DataDistribution(TLocalPartitionType::NOOP)
                            : 
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(
                                      state);
         }
         return _is_colocate && _require_bucket_distribution
-                       ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_partition_exprs)
-                       : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+                       ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE,
+                                          _partition_exprs)
+                       : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
+                                          _partition_exprs);
     }
     bool is_colocated_operator() const override { return _is_colocate; }
     bool is_shuffled_operator() const override {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index c4168a33c4a..526857a5427 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -215,11 +215,13 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_partition_by_eq_expr_ctxs.empty()) {
-            return {ExchangeType::PASSTHROUGH};
+            return {TLocalPartitionType::PASSTHROUGH};
         } else {
             return _is_colocate && _require_bucket_distribution
-                           ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
-                           : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+                           ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE,
+                                              _partition_exprs)
+                           : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
+                                              _partition_exprs);
         }
     }
 
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h 
b/be/src/pipeline/exec/assert_num_rows_operator.h
index c9a56c58004..62f8e4da105 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -48,7 +48,7 @@ public:
     [[nodiscard]] bool is_source() const override { return false; }
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        return {ExchangeType::PASSTHROUGH};
+        return {TLocalPartitionType::PASSTHROUGH};
     }
 
 private:
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index d4f7a08136b..d6e3af66aa2 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -118,15 +118,17 @@ public:
 
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         if (_needs_finalize && _probe_expr_ctxs.empty()) {
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         }
         if (_needs_finalize || (!_probe_expr_ctxs.empty() && 
!_is_streaming_preagg)) {
             return _is_colocate && _require_bucket_distribution
-                           ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
-                           : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+                           ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE,
+                                              _partition_exprs)
+                           : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
+                                              _partition_exprs);
         }
         if (state->enable_distinct_streaming_agg_force_passthrough()) {
-            return {ExchangeType::PASSTHROUGH};
+            return {TLocalPartitionType::PASSTHROUGH};
         } else {
             return 
StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(
                     state);
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index b11e5bef6a0..42ebccbdc47 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -114,13 +114,13 @@ public:
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         }
         return _partition_type == TPartitionType::HASH_PARTITIONED
-                       ? DataDistribution(ExchangeType::HASH_SHUFFLE)
+                       ? 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE)
                : _partition_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED
-                       ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
-                       : DataDistribution(ExchangeType::NOOP);
+                       ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE)
+                       : DataDistribution(TLocalPartitionType::NOOP);
     }
 
 private:
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index f0b6f3c80dc..3505833a7be 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -128,15 +128,17 @@ public:
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         } else if (_is_broadcast_join) {
-            return _child->is_serial_operator() ? 
DataDistribution(ExchangeType::PASS_TO_ONE)
-                                                : 
DataDistribution(ExchangeType::NOOP);
+            return _child->is_serial_operator() ? 
DataDistribution(TLocalPartitionType::PASS_TO_ONE)
+                                                : 
DataDistribution(TLocalPartitionType::NOOP);
         }
         return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
                                _join_distribution == 
TJoinDistributionType::COLOCATE
-                       ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_partition_exprs)
-                       : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+                       ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE,
+                                          _partition_exprs)
+                       : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
+                                          _partition_exprs);
     }
 
     bool is_shuffled_operator() const override {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index ceabc6cb8b6..e25afa1d52e 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -132,21 +132,23 @@ public:
     bool need_more_input_data(RuntimeState* state) const override;
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         } else if (_is_broadcast_join) {
             if (state->enable_broadcast_join_force_passthrough()) {
-                return DataDistribution(ExchangeType::PASSTHROUGH);
+                return DataDistribution(TLocalPartitionType::PASSTHROUGH);
             } else {
                 return _child && _child->is_serial_operator()
-                               ? DataDistribution(ExchangeType::PASSTHROUGH)
-                               : DataDistribution(ExchangeType::NOOP);
+                               ? 
DataDistribution(TLocalPartitionType::PASSTHROUGH)
+                               : DataDistribution(TLocalPartitionType::NOOP);
             }
         }
 
         return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
                                 _join_distribution == 
TJoinDistributionType::COLOCATE
-                        ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_partition_exprs)
-                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs));
+                        ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE,
+                                           _partition_exprs)
+                        : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
+                                           _partition_exprs));
     }
     bool is_broadcast_join() const { return _is_broadcast_join; }
 
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 003bd749e9e..5148356d59f 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -69,10 +69,10 @@ public:
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         }
-        return _child->is_serial_operator() ? 
DataDistribution(ExchangeType::BROADCAST)
-                                            : 
DataDistribution(ExchangeType::NOOP);
+        return _child->is_serial_operator() ? 
DataDistribution(TLocalPartitionType::BROADCAST)
+                                            : 
DataDistribution(TLocalPartitionType::NOOP);
     }
 
 private:
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index f67d65e236d..209c7635b8d 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -243,9 +243,9 @@ public:
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
             _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == 
TJoinOp::RIGHT_ANTI_JOIN ||
             _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN) {
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         }
-        return {ExchangeType::ADAPTIVE_PASSTHROUGH};
+        return {TLocalPartitionType::ADAPTIVE_PASSTHROUGH};
     }
 
     const RowDescriptor& row_desc() const override {
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 60193253071..4f60a6e6ffb 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -144,8 +144,8 @@ Status 
PipelineXSinkLocalState<SharedStateArg>::terminate(RuntimeState* state) {
 
 DataDistribution OperatorBase::required_data_distribution(RuntimeState* 
/*state*/) const {
     return _child && _child->is_serial_operator() && !is_source()
-                   ? DataDistribution(ExchangeType::PASSTHROUGH)
-                   : DataDistribution(ExchangeType::NOOP);
+                   ? DataDistribution(TLocalPartitionType::PASSTHROUGH)
+                   : DataDistribution(TLocalPartitionType::NOOP);
 }
 
 const RowDescriptor& OperatorBase::row_desc() const {
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 20d363ce5f9..cb730e0e084 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -653,8 +653,8 @@ public:
     virtual bool reset_to_rerun(RuntimeState* state, OperatorXBase* root) 
const { return false; }
 
     Status init(const TDataSink& tsink) override;
-    [[nodiscard]] virtual Status init(RuntimeState* state, ExchangeType type, 
const int num_buckets,
-                                      const bool use_global_hash_shuffle,
+    [[nodiscard]] virtual Status init(RuntimeState* state, 
TLocalPartitionType::type type,
+                                      const int num_buckets,
                                       const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
         return Status::InternalError("init() is only implemented in local 
exchange!");
     }
@@ -913,7 +913,7 @@ public:
     Status init(const TDataSink& tsink) override {
         throw Exception(Status::FatalError("should not reach here!"));
     }
-    virtual Status init(ExchangeType type) {
+    virtual Status init(TLocalPartitionType::type type) {
         throw Exception(Status::FatalError("should not reach here!"));
     }
     [[noreturn]] virtual const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs() {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index da5cf9db69e..032dcda9a97 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -95,9 +95,9 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
-            return DataDistribution(ExchangeType::HASH_SHUFFLE, 
_distribute_exprs);
+            return 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, 
_distribute_exprs);
         }
-        return {ExchangeType::PASSTHROUGH};
+        return {TLocalPartitionType::PASSTHROUGH};
     }
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 4faa327a93e..998ab974c90 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -490,8 +490,6 @@ 
PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool*
                                                                      const 
DescriptorTbl& descs,
                                                                      uint32_t 
partition_count)
         : JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>(pool, tnode, 
operator_id, descs),
-          _join_distribution(tnode.hash_join_node.__isset.dist_type ? 
tnode.hash_join_node.dist_type
-                                                                    : 
TJoinDistributionType::NONE),
           _distribution_partition_exprs(tnode.__isset.distribute_expr_lists
                                                 ? 
tnode.distribute_expr_lists[0]
                                                 : std::vector<TExpr> {}),
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 167dc1bd00c..2df65279d99 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -140,16 +140,8 @@ public:
                 bool* eos) const override;
 
     bool need_more_input_data(RuntimeState* state) const override;
-    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            return {ExchangeType::NOOP};
-        }
-        return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
-                                _join_distribution == 
TJoinDistributionType::COLOCATE
-                        ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
-                                           _distribution_partition_exprs)
-                        : DataDistribution(ExchangeType::HASH_SHUFFLE,
-                                           _distribution_partition_exprs));
+    DataDistribution required_data_distribution(RuntimeState* state) const 
override {
+        return _inner_probe_operator->required_data_distribution(state);
     }
 
     size_t revocable_mem_size(RuntimeState* state) const override;
@@ -189,8 +181,6 @@ private:
 
     bool _should_revoke_memory(RuntimeState* state) const;
 
-    const TJoinDistributionType::type _join_distribution;
-
     std::shared_ptr<HashJoinBuildSinkOperatorX> _inner_sink_operator;
     std::shared_ptr<HashJoinProbeOperatorX> _inner_probe_operator;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 59eed7aac66..8d9fad28cef 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -127,14 +127,14 @@ public:
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         }
 
         return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
                                _join_distribution == 
TJoinDistributionType::COLOCATE
-                       ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
+                       ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE,
                                           _distribution_partition_exprs)
-                       : DataDistribution(ExchangeType::HASH_SHUFFLE,
+                       : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
                                           _distribution_partition_exprs);
     }
 
diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h 
b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
index e4e4926f53b..ca6910e7b71 100644
--- a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
+++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
@@ -68,7 +68,7 @@ public:
     bool is_serial_operator() const override { return true; }
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        return {ExchangeType::NOOP};
+        return {TLocalPartitionType::NOOP};
     }
 
     Status terminate(RuntimeState* state) override {
diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h 
b/be/src/pipeline/exec/rec_cte_sink_operator.h
index e4d6022758c..4ddaead2d0a 100644
--- a/be/src/pipeline/exec/rec_cte_sink_operator.h
+++ b/be/src/pipeline/exec/rec_cte_sink_operator.h
@@ -79,7 +79,7 @@ public:
     bool is_serial_operator() const override { return true; }
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        return {ExchangeType::NOOP};
+        return {TLocalPartitionType::NOOP};
     }
 
     Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override {
diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h 
b/be/src/pipeline/exec/rec_cte_source_operator.h
index 0bd58106146..985b2b684f5 100644
--- a/be/src/pipeline/exec/rec_cte_source_operator.h
+++ b/be/src/pipeline/exec/rec_cte_source_operator.h
@@ -97,7 +97,7 @@ public:
     bool is_serial_operator() const override { return true; }
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        return {ExchangeType::NOOP};
+        return {TLocalPartitionType::NOOP};
     }
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 8e6fcf98a3a..b6bcd281c99 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -370,9 +370,9 @@ public:
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (OperatorX<LocalStateType>::is_serial_operator()) {
             // `is_serial_operator()` returns true means we ignore the 
distribution.
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         }
-        return {ExchangeType::BUCKET_HASH_SHUFFLE};
+        return {TLocalPartitionType::BUCKET_HASH_SHUFFLE};
     }
 
     void set_low_memory_mode(RuntimeState* state) override {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 141459fac5d..af439b7c7ec 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -108,8 +108,10 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
-                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+        return _is_colocate ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE,
+                                               _partition_exprs)
+                            : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
+                                               _partition_exprs);
     }
 
     std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 6eb18db6bfc..140312f760f 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -114,8 +114,8 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
-                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+        return _is_colocate ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, _partition_exprs)
+                            : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, 
_partition_exprs);
     }
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
index db56da26a24..88bee875165 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -82,8 +82,8 @@ public:
     bool is_shuffled_operator() const override { return true; }
     bool is_colocated_operator() const override { return _is_colocate; }
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
-                            : DataDistribution(ExchangeType::HASH_SHUFFLE);
+        return _is_colocate ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE)
+                            : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
     }
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index b7aaa99b0fb..8af1c60a05a 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -80,13 +80,15 @@ public:
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_is_analytic_sort) {
             return _is_colocate && _require_bucket_distribution
-                           ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
-                           : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+                           ? 
DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE,
+                                              _partition_exprs)
+                           : 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
+                                              _partition_exprs);
         } else if (_merge_by_exchange) {
             // The current sort node is used for the ORDER BY
-            return {ExchangeType::PASSTHROUGH};
+            return {TLocalPartitionType::PASSTHROUGH};
         } else {
-            return {ExchangeType::NOOP};
+            return {TLocalPartitionType::NOOP};
         }
     }
     bool is_colocated_operator() const override { return _is_colocate; }
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index c90d1ea8a5b..c051840d822 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -223,7 +223,7 @@ public:
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         if (_child && _child->is_hash_join_probe() &&
             state->enable_streaming_agg_hash_join_force_passthrough()) {
-            return DataDistribution(ExchangeType::PASSTHROUGH);
+            return DataDistribution(TLocalPartitionType::PASSTHROUGH);
         }
         if (!state->get_query_ctx()->should_be_shuffled_agg(
                     StatefulOperatorX<StreamingAggLocalState>::node_id())) {
@@ -231,11 +231,12 @@ public:
         }
         if (_partition_exprs.empty()) {
             return _needs_finalize
-                           ? DataDistribution(ExchangeType::NOOP)
+                           ? DataDistribution(TLocalPartitionType::NOOP)
                            : 
StatefulOperatorX<StreamingAggLocalState>::required_data_distribution(
                                      state);
         }
-        return DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
+        return 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
+                                _partition_exprs);
     }
 
 private:
diff --git a/be/src/pipeline/exec/table_function_operator.h 
b/be/src/pipeline/exec/table_function_operator.h
index 56f9f116ec8..28aaa7d7e20 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -105,7 +105,7 @@ public:
     }
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        return {ExchangeType::PASSTHROUGH};
+        return {TLocalPartitionType::PASSTHROUGH};
     }
 
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
const override {
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 2b32268ffca..16b8a8b4fa2 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -119,10 +119,10 @@ public:
 
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         if (_require_bucket_distribution) {
-            return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_distribute_exprs);
+            return DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, 
_distribute_exprs);
         }
         if (_followed_by_shuffled_operator) {
-            return DataDistribution(ExchangeType::HASH_SHUFFLE, 
_distribute_exprs);
+            return 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, 
_distribute_exprs);
         }
         return Base::required_data_distribution(state);
     }
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 81040b9d417..aa93abfac7c 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -105,10 +105,10 @@ public:
 
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         if (_require_bucket_distribution) {
-            return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE);
+            return DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE);
         }
         if (_followed_by_shuffled_operator) {
-            return DataDistribution(ExchangeType::HASH_SHUFFLE);
+            return 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
         }
         return Base::required_data_distribution(state);
     }
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 1937b111975..d28b4c8aa31 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -37,24 +37,31 @@ std::vector<Dependency*> 
LocalExchangeSinkLocalState::dependencies() const {
     return deps;
 }
 
-Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type,
-                                        const int num_buckets, const bool 
use_global_hash_shuffle,
+Status LocalExchangeSinkOperatorX::init(RuntimeState* state, 
TLocalPartitionType::type type,
+                                        const int num_buckets,
                                         const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
+    DCHECK(!_planned_by_fe);
     _name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) + 
")";
     _type = type;
-    if (_type == ExchangeType::HASH_SHUFFLE) {
-        _shuffle_idx_to_instance_idx.clear();
-        _use_global_shuffle = use_global_hash_shuffle;
+    if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
         // For shuffle join, if data distribution has been broken by previous 
operator, we
         // should use a HASH_SHUFFLE local exchanger to shuffle data again. To 
be mentioned,
         // we should use map shuffle idx to instance idx because all instances 
will be
         // distributed to all BEs. Otherwise, we should use shuffle idx 
directly.
-        if (use_global_hash_shuffle) {
-            _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
+        _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
+        if (state->query_options().__isset.enable_new_shuffle_hash_method &&
+            state->query_options().enable_new_shuffle_hash_method) {
+            _partitioner = 
std::make_unique<vectorized::Crc32CHashPartitioner>(_num_partitions);
         } else {
-            for (int i = 0; i < _num_partitions; i++) {
-                _shuffle_idx_to_instance_idx[i] = i;
-            }
+            _partitioner = std::make_unique<
+                    
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                    _num_partitions);
+        }
+        RETURN_IF_ERROR(_partitioner->init(_texprs));
+    } else if (_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) {
+        _shuffle_idx_to_instance_idx.clear();
+        for (int i = 0; i < _num_partitions; i++) {
+            _shuffle_idx_to_instance_idx[i] = i;
         }
         if (state->query_options().__isset.enable_new_shuffle_hash_method &&
             state->query_options().enable_new_shuffle_hash_method) {
@@ -65,7 +72,7 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, 
ExchangeType type,
                     _num_partitions);
         }
         RETURN_IF_ERROR(_partitioner->init(_texprs));
-    } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
+    } else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
         DCHECK_GT(num_buckets, 0);
         _partitioner =
                 
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
@@ -77,7 +84,9 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, 
ExchangeType type,
 
 Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) {
     
RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state));
-    if (_type == ExchangeType::HASH_SHUFFLE || _type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
+    if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
+        _type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE ||
+        _type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
         RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
         RETURN_IF_ERROR(_partitioner->open(state));
     }
@@ -91,11 +100,6 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     SCOPED_TIMER(_init_timer);
     _compute_hash_value_timer = ADD_TIMER(custom_profile(), 
"ComputeHashValueTime");
     _distribute_timer = ADD_TIMER(custom_profile(), "DistributeDataTime");
-    if (_parent->cast<LocalExchangeSinkOperatorX>()._type == 
ExchangeType::HASH_SHUFFLE) {
-        custom_profile()->add_info_string(
-                "UseGlobalShuffle",
-                
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
-    }
     custom_profile()->add_info_string(
             "PartitionExprsSize",
             
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num));
@@ -110,8 +114,7 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* 
state) {
     _exchanger = _shared_state->exchanger.get();
     DCHECK(_exchanger != nullptr);
 
-    if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE ||
-        _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) {
+    if (is_shuffled_exchange(_exchanger->get_type())) {
         auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
         RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
     }
@@ -134,12 +137,11 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* 
state, Status exec_statu
 std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) 
const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
-                   "{}, _use_global_shuffle: {}, _channel_id: {}, 
_num_partitions: {}, "
+                   "{},  _channel_id: {}, _num_partitions: {}, "
                    "_num_senders: {}, _num_sources: {}, "
                    "_running_sink_operators: {}, _running_source_operators: 
{}",
-                   Base::debug_string(indentation_level),
-                   
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
-                   _exchanger->_num_partitions, _exchanger->_num_senders, 
_exchanger->_num_sources,
+                   Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
+                   _exchanger->_num_senders, _exchanger->_num_sources,
                    _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators);
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index c4723a9f512..83d330a606f 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -79,6 +79,17 @@ public:
               _texprs(texprs),
               _partitioned_exprs_num(texprs.size()),
               _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
+
+    LocalExchangeSinkOperatorX(int operator_id, int dest_id, const TPlanNode& 
tnode,
+                               int num_partitions,
+                               const std::map<int, int>& 
shuffle_id_to_instance_idx)
+            : Base(operator_id, tnode, dest_id),
+              _type(tnode.local_exchange_node.partition_type),
+              _num_partitions(num_partitions),
+              _texprs(tnode.local_exchange_node.distribute_expr_lists),
+              
_partitioned_exprs_num(tnode.local_exchange_node.distribute_expr_lists.size()),
+              _shuffle_idx_to_instance_idx(shuffle_id_to_instance_idx),
+              _planned_by_fe(true) {}
 #ifdef BE_TEST
     LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs,
                                const std::map<int, int>& 
bucket_seq_to_instance_idx)
@@ -89,16 +100,11 @@ public:
               _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
 #endif
 
-    Status init(const TPlanNode& tnode, RuntimeState* state) override {
-        return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
-    }
-
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
     }
 
-    Status init(RuntimeState* state, ExchangeType type, const int num_buckets,
-                const bool use_global_hash_shuffle,
+    Status init(RuntimeState* state, TLocalPartitionType::type type, const int 
num_buckets,
                 const std::map<int, int>& shuffle_idx_to_instance_idx) 
override;
 
     Status prepare(RuntimeState* state) override;
@@ -115,13 +121,13 @@ public:
 private:
     friend class LocalExchangeSinkLocalState;
     friend class ShuffleExchanger;
-    ExchangeType _type;
+    TLocalPartitionType::type _type;
     const int _num_partitions;
     const std::vector<TExpr>& _texprs;
     const size_t _partitioned_exprs_num;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     std::map<int, int> _shuffle_idx_to_instance_idx;
-    bool _use_global_shuffle = false;
+    const bool _planned_by_fe = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 81f34d30c9b..de4062862af 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -31,8 +31,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     DCHECK(_exchanger != nullptr);
     _get_block_failed_counter =
             ADD_COUNTER_WITH_LEVEL(custom_profile(), "GetBlockFailedTime", 
TUnit::UNIT, 1);
-    if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE ||
-        _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) {
+    if (is_shuffled_exchange(_exchanger->get_type())) {
         _copy_data_timer = ADD_TIMER(custom_profile(), "CopyDataTime");
     }
 
@@ -60,7 +59,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
 }
 
 std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
-    if ((_exchanger->get_type() == ExchangeType::PASS_TO_ONE) && _channel_id 
!= 0) {
+    if ((_exchanger->get_type() == TLocalPartitionType::PASS_TO_ONE) && 
_channel_id != 0) {
         // If this is a PASS_TO_ONE exchange and is not the first task, source 
operators always
         // return empty result so no dependencies here.
         return {};
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index c9541e69ab5..35a8e165cf6 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -63,20 +63,44 @@ class LocalExchangeSourceOperatorX final : public 
OperatorX<LocalExchangeSourceL
 public:
     using Base = OperatorX<LocalExchangeSourceLocalState>;
     LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id, 
id) {}
+    LocalExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                                 const DescriptorTbl& descs)
+            : Base(pool, tnode, operator_id, descs),
+              _exchange_type(tnode.local_exchange_node.partition_type),
+              _planned_by_fe(true) {}
 #ifdef BE_TEST
     LocalExchangeSourceOperatorX() = default;
 #endif
-    Status init(ExchangeType type) override {
+    Status init(TLocalPartitionType::type type) override {
+        DCHECK(!_planned_by_fe);
         _op_name = "LOCAL_EXCHANGE_OPERATOR(" + get_exchange_type_name(type) + 
")";
         _exchange_type = type;
         return Status::OK();
     }
-    Status prepare(RuntimeState* state) override { return Status::OK(); }
+    Status prepare(RuntimeState* state) override {
+        if (_planned_by_fe) {
+            return Base::prepare(state);
+        }
+        return Status::OK();
+    }
     const RowDescriptor& intermediate_row_desc() const override {
+        if (_planned_by_fe) {
+            return Base::intermediate_row_desc();
+        }
         return _child->intermediate_row_desc();
     }
-    RowDescriptor& row_descriptor() override { return 
_child->row_descriptor(); }
-    const RowDescriptor& row_desc() const override { return 
_child->row_desc(); }
+    RowDescriptor& row_descriptor() override {
+        if (_planned_by_fe) {
+            return Base::row_descriptor();
+        }
+        return _child->row_descriptor();
+    }
+    const RowDescriptor& row_desc() const override {
+        if (_planned_by_fe) {
+            return Base::row_desc();
+        }
+        return _child->row_desc();
+    }
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
@@ -85,7 +109,8 @@ public:
 private:
     friend class LocalExchangeSourceLocalState;
 
-    ExchangeType _exchange_type;
+    TLocalPartitionType::type _exchange_type;
+    const bool _planned_by_fe = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 08fff542f3b..e0effb70fa5 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -146,7 +146,7 @@ public:
                              Profile&& profile, SourceInfo&& source_info) = 0;
     virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool 
eos,
                         Profile&& profile, SinkInfo& sink_info) = 0;
-    virtual ExchangeType get_type() const = 0;
+    virtual TLocalPartitionType::type get_type() const = 0;
     // Called if a local exchanger source operator are closed. Free the unused 
data block in data_queue.
     virtual void close(SourceInfo&& source_info) = 0;
     // Called if all local exchanger source operators are closed. We free the 
memory in
@@ -231,16 +231,19 @@ using BlockWrapperSPtr = 
std::shared_ptr<ExchangerBase::BlockWrapper>;
 template <typename BlockType>
 class Exchanger : public ExchangerBase {
 public:
-    Exchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : ExchangerBase(running_sink_operators, num_partitions, 
free_block_limit) {
+    Exchanger(int running_sink_operators, int num_partitions, int 
free_block_limit,
+              TLocalPartitionType::type type)
+            : ExchangerBase(running_sink_operators, num_partitions, 
free_block_limit), _type(type) {
         _data_queue.resize(num_partitions);
         _m.resize(num_partitions);
         for (size_t i = 0; i < num_partitions; i++) {
             _m[i] = std::make_unique<std::mutex>();
         }
     }
-    Exchanger(int running_sink_operators, int num_sources, int num_partitions, 
int free_block_limit)
-            : ExchangerBase(running_sink_operators, num_sources, 
num_partitions, free_block_limit) {
+    Exchanger(int running_sink_operators, int num_sources, int num_partitions, 
int free_block_limit,
+              TLocalPartitionType::type type)
+            : ExchangerBase(running_sink_operators, num_sources, 
num_partitions, free_block_limit),
+              _type(type) {
         _data_queue.resize(num_sources);
         _m.resize(num_sources);
         for (size_t i = 0; i < num_sources; i++) {
@@ -248,6 +251,7 @@ public:
         }
     }
     ~Exchanger() override = default;
+    TLocalPartitionType::type get_type() const override { return _type; }
     std::string data_queue_debug_string(int i) override {
         return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
                            _data_queue[i].data_queue.size_approx(), 
_data_queue[i].eos);
@@ -264,6 +268,7 @@ protected:
     bool _dequeue_data(BlockType& block, bool* eos, vectorized::Block* 
data_block, int channel_id);
     std::vector<BlockQueue<BlockType>> _data_queue;
     std::vector<std::unique_ptr<std::mutex>> _m;
+    const TLocalPartitionType::type _type;
 };
 
 class LocalExchangeSourceLocalState;
@@ -273,9 +278,9 @@ class ShuffleExchanger : public Exchanger<PartitionedBlock> 
{
 public:
     ENABLE_FACTORY_CREATOR(ShuffleExchanger);
     ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                     int free_block_limit)
+                     int free_block_limit, TLocalPartitionType::type type)
             : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
-                                          free_block_limit) {
+                                          free_block_limit, type) {
         DCHECK_GT(num_partitions, 0);
         DCHECK_GT(num_sources, 0);
         _partition_rows_histogram.resize(running_sink_operators);
@@ -287,7 +292,6 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
                      SourceInfo&& source_info) override;
     void close(SourceInfo&& source_info) override;
-    ExchangeType get_type() const override { return 
ExchangeType::HASH_SHUFFLE; }
 
 protected:
     Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& 
channel_ids,
@@ -304,24 +308,22 @@ class BucketShuffleExchanger final : public 
ShuffleExchanger {
     BucketShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
                            int free_block_limit)
             : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions,
-                               free_block_limit) {}
+                               free_block_limit, 
TLocalPartitionType::BUCKET_HASH_SHUFFLE) {}
     ~BucketShuffleExchanger() override = default;
-    ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
 
 class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> {
 public:
     ENABLE_FACTORY_CREATOR(PassthroughExchanger);
     PassthroughExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
-                                          free_block_limit) {}
+            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions, free_block_limit,
+                                          TLocalPartitionType::PASSTHROUGH) {}
     ~PassthroughExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
                 SinkInfo& sink_info) override;
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
                      SourceInfo&& source_info) override;
-    ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; 
}
     void close(SourceInfo&& source_info) override;
 };
 
@@ -329,29 +331,28 @@ class PassToOneExchanger final : public 
Exchanger<BlockWrapperSPtr> {
 public:
     ENABLE_FACTORY_CREATOR(PassToOneExchanger);
     PassToOneExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
-                                          free_block_limit) {}
+            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions, free_block_limit,
+                                          TLocalPartitionType::PASS_TO_ONE) {}
     ~PassToOneExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
                 SinkInfo& sink_info) override;
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
                      SourceInfo&& source_info) override;
-    ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; 
}
     void close(SourceInfo&& source_info) override;
 };
 class BroadcastExchanger final : public Exchanger<BroadcastBlock> {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastExchanger);
     BroadcastExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<BroadcastBlock>(running_sink_operators, 
num_partitions, free_block_limit) {}
+            : Exchanger<BroadcastBlock>(running_sink_operators, 
num_partitions, free_block_limit,
+                                        TLocalPartitionType::BROADCAST) {}
     ~BroadcastExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
                 SinkInfo& sink_info) override;
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
                      SourceInfo&& source_info) override;
-    ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
     void close(SourceInfo&& source_info) override;
 };
 
@@ -362,8 +363,8 @@ public:
     ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
     AdaptivePassthroughExchanger(int running_sink_operators, int 
num_partitions,
                                  int free_block_limit)
-            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
-                                          free_block_limit) {
+            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions, free_block_limit,
+                                          
TLocalPartitionType::ADAPTIVE_PASSTHROUGH) {
         _partition_rows_histogram.resize(running_sink_operators);
     }
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
@@ -371,7 +372,6 @@ public:
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
                      SourceInfo&& source_info) override;
-    ExchangeType get_type() const override { return 
ExchangeType::ADAPTIVE_PASSTHROUGH; }
 
     void close(SourceInfo&& source_info) override;
 
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 774561bbe37..33051148f34 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -54,8 +54,7 @@ bool Pipeline::need_to_local_exchange(const DataDistribution 
target_data_distrib
         return true;
     }
 
-    if (target_data_distribution.distribution_type != 
ExchangeType::BUCKET_HASH_SHUFFLE &&
-        target_data_distribution.distribution_type != 
ExchangeType::HASH_SHUFFLE) {
+    if (!is_shuffled_exchange(target_data_distribution.distribution_type)) {
         // Always do local exchange if non-hash-partition exchanger is 
required.
         // For example, `PASSTHROUGH` exchanger is always required to 
distribute data evenly.
         return true;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 2a20a5cd73d..60d3f15afd9 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -69,16 +69,15 @@ public:
 
     [[nodiscard]] PipelineId id() const { return _pipeline_id; }
 
-    static bool is_hash_exchange(ExchangeType idx) {
-        return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE;
+    static bool is_hash_exchange(TLocalPartitionType::type idx) {
+        return is_shuffled_exchange(idx);
     }
 
     // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
     // data is processed and shuffled on the sink.
     // Compared to PASSTHROUGH, this is a relatively heavy operation.
-    static bool heavy_operations_on_the_sink(ExchangeType idx) {
-        return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE ||
-               idx == ExchangeType::ADAPTIVE_PASSTHROUGH;
+    static bool heavy_operations_on_the_sink(TLocalPartitionType::type idx) {
+        return is_shuffled_exchange(idx) || idx == 
TLocalPartitionType::ADAPTIVE_PASSTHROUGH;
     }
 
     bool need_to_local_exchange(const DataDistribution 
target_data_distribution,
@@ -166,7 +165,7 @@ private:
 
     // Input data distribution of this pipeline. We do local exchange when 
input data distribution
     // does not match the target data distribution.
-    DataDistribution _data_distribution {ExchangeType::NOOP};
+    DataDistribution _data_distribution {TLocalPartitionType::NOOP};
 
     // How many tasks should be created ?
     int _num_tasks = 1;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 79ef2dbd9b1..70364fc3ae4 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -276,7 +276,7 @@ Status 
PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr
         }
     }
     // 4. Build local exchanger
-    if (_runtime_state->enable_local_shuffle()) {
+    if (_runtime_state->plan_local_shuffle()) {
         SCOPED_TIMER(_plan_local_exchanger_timer);
         RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
                                              
_params.bucket_seq_to_instance_idx,
@@ -774,28 +774,35 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
             sink_id, local_exchange_id, use_global_hash_shuffle ? 
_total_instances : _num_instances,
             data_distribution.partition_exprs, bucket_seq_to_instance_idx);
     if (bucket_seq_to_instance_idx.empty() &&
-        data_distribution.distribution_type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
-        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
+        data_distribution.distribution_type == 
TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
+        data_distribution.distribution_type =
+                use_global_hash_shuffle ? 
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
+                                        : 
TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
+    }
+    if (!use_global_hash_shuffle &&
+        data_distribution.distribution_type == 
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
+        data_distribution.distribution_type = 
TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
     }
     RETURN_IF_ERROR(new_pip->set_sink(sink));
     RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), 
data_distribution.distribution_type,
-                                          num_buckets, use_global_hash_shuffle,
-                                          shuffle_idx_to_instance_idx));
+                                          num_buckets, 
shuffle_idx_to_instance_idx));
 
     // 2. Create and initialize LocalExchangeSharedState.
     std::shared_ptr<LocalExchangeSharedState> shared_state =
             LocalExchangeSharedState::create_shared(_num_instances);
     switch (data_distribution.distribution_type) {
-    case ExchangeType::HASH_SHUFFLE:
+    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
+    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances,
                 use_global_hash_shuffle ? _total_instances : _num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? cast_set<int>(
                                   
_runtime_state->query_options().local_exchange_free_blocks_limit)
-                        : 0);
+                        : 0,
+                data_distribution.distribution_type);
         break;
-    case ExchangeType::BUCKET_HASH_SHUFFLE:
+    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
         shared_state->exchanger = BucketShuffleExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances, num_buckets,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
@@ -803,7 +810,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
                                   
_runtime_state->query_options().local_exchange_free_blocks_limit)
                         : 0);
         break;
-    case ExchangeType::PASSTHROUGH:
+    case TLocalPartitionType::PASSTHROUGH:
         shared_state->exchanger = PassthroughExchanger::create_unique(
                 cur_pipe->num_tasks(), _num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
@@ -811,7 +818,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
                                   
_runtime_state->query_options().local_exchange_free_blocks_limit)
                         : 0);
         break;
-    case ExchangeType::BROADCAST:
+    case TLocalPartitionType::BROADCAST:
         shared_state->exchanger = BroadcastExchanger::create_unique(
                 cur_pipe->num_tasks(), _num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
@@ -819,7 +826,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
                                   
_runtime_state->query_options().local_exchange_free_blocks_limit)
                         : 0);
         break;
-    case ExchangeType::PASS_TO_ONE:
+    case TLocalPartitionType::PASS_TO_ONE:
         if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
             // If shared hash table is enabled for BJ, hash table will be 
built by only one task
             shared_state->exchanger = PassToOneExchanger::create_unique(
@@ -837,7 +844,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
                             : 0);
         }
         break;
-    case ExchangeType::ADAPTIVE_PASSTHROUGH:
+    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
         shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
@@ -948,9 +955,9 @@ Status PipelineFragmentContext::_add_local_exchange(
         
Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
         RETURN_IF_ERROR(_add_local_exchange_impl(
                 cast_set<int>(new_pip->operators().size()), pool, new_pip,
-                add_pipeline(new_pip, pip_idx + 2), 
DataDistribution(ExchangeType::PASSTHROUGH),
-                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
-                shuffle_idx_to_instance_idx));
+                add_pipeline(new_pip, pip_idx + 2),
+                DataDistribution(TLocalPartitionType::PASSTHROUGH), 
do_local_exchange, num_buckets,
+                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
     }
     return Status::OK();
 }
@@ -1698,6 +1705,43 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         break;
     }
+    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
+        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
+
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (!_dag.contains(downstream_pipeline_id)) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
+        cur_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+        int num_partitions = 0;
+        std::map<int, int> shuffle_id_to_instance_idx;
+        switch (tnode.local_exchange_node.partition_type) {
+        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
+            num_partitions = _params.num_buckets;
+            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
+            break;
+        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
+            for (int i = 0; i < _num_instances; i++) {
+                shuffle_id_to_instance_idx[i] = i;
+            }
+            num_partitions = _num_instances;
+            break;
+        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
+            num_partitions = _total_instances;
+            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
+            break;
+        default:
+            break;
+        }
+        sink_ops.push_back(std::make_shared<LocalExchangeSinkOperatorX>(
+                next_sink_operator_id(), op->operator_id(), tnode, 
num_partitions,
+                shuffle_id_to_instance_idx));
+        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
+        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
+        break;
+    }
     default:
         return Status::InternalError("Unsupported exec type in pipeline: {}",
                                      print_plan_node_type(tnode.node_type));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 126c8f4f617..236bf0ccaaa 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -395,8 +395,11 @@ public:
                
BeExecVersionManager::check_be_exec_version(_query_options.be_exec_version));
         return _query_options.be_exec_version;
     }
-    bool enable_local_shuffle() const {
-        return _query_options.__isset.enable_local_shuffle && 
_query_options.enable_local_shuffle;
+    bool plan_local_shuffle() const {
+        // If local shuffle is enabled and not planned by local shuffle 
planner, we should plan local shuffle in BE.
+        return _query_options.__isset.enable_local_shuffle && 
_query_options.enable_local_shuffle &&
+               (!_query_options.__isset.enable_local_shuffle_planner ||
+               !_query_options.enable_local_shuffle_planner);
     }
 
     MOCK_FUNCTION bool enable_local_exchange() const {
diff --git a/be/test/pipeline/local_exchanger_test.cpp 
b/be/test/pipeline/local_exchanger_test.cpp
index c87712caf5e..d0cada47356 100644
--- a/be/test/pipeline/local_exchanger_test.cpp
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -89,8 +89,9 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) {
     _local_states.resize(num_sources);
     auto profile = std::make_shared<RuntimeProfile>("");
     auto shared_state = 
LocalExchangeSharedState::create_shared(num_partitions);
-    shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, 
num_sources, num_partitions,
-                                                              
free_block_limit);
+    shared_state->exchanger =
+            ShuffleExchanger::create_unique(num_sink, num_sources, 
num_partitions, free_block_limit,
+                                            
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
     auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
@@ -1164,8 +1165,9 @@ TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) {
     _local_states.resize(num_sources);
     auto profile = std::make_shared<RuntimeProfile>("");
     auto shared_state = 
LocalExchangeSharedState::create_shared(num_partitions);
-    shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, 
num_sources, num_partitions,
-                                                              
free_block_limit);
+    shared_state->exchanger =
+            ShuffleExchanger::create_unique(num_sink, num_sources, 
num_partitions, free_block_limit,
+                                            
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
     auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
diff --git a/be/test/pipeline/pipeline_test.cpp 
b/be/test/pipeline/pipeline_test.cpp
index d349c2dad4f..d7521808ce9 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -481,7 +481,7 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
     DescriptorTbl* desc;
     OperatorPtr op;
     _build_fragment_context();
-    EXPECT_EQ(_runtime_state.front()->enable_local_shuffle(), true);
+    EXPECT_EQ(_runtime_state.front()->plan_local_shuffle(), true);
     auto cur_pipe = _build_pipeline(parallelism);
     {
         auto tnode = TPlanNodeBuilder(_next_node_id(), 
TPlanNodeType::EXCHANGE_NODE)
@@ -556,11 +556,12 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
     }
     {
         cur_pipe->init_data_distribution(_runtime_state.back().get());
-        EXPECT_EQ(cur_pipe->data_distribution().distribution_type, 
ExchangeType::HASH_SHUFFLE);
+        EXPECT_EQ(cur_pipe->data_distribution().distribution_type,
+                  TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
         EXPECT_EQ(cur_pipe->sink()
                           
->required_data_distribution(_runtime_state.back().get())
                           .distribution_type,
-                  ExchangeType::NOOP);
+                  TLocalPartitionType::NOOP);
         EXPECT_EQ(cur_pipe->need_to_local_exchange(
                           
cur_pipe->sink()->required_data_distribution(_runtime_state.back().get()),
                           1),
@@ -569,11 +570,11 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
     {
         cur_pipe->operators().front()->set_serial_operator();
         cur_pipe->init_data_distribution(_runtime_state.back().get());
-        EXPECT_EQ(cur_pipe->data_distribution().distribution_type, 
ExchangeType::NOOP);
+        EXPECT_EQ(cur_pipe->data_distribution().distribution_type, 
TLocalPartitionType::NOOP);
         EXPECT_EQ(cur_pipe->sink()
                           
->required_data_distribution(_runtime_state.back().get())
                           .distribution_type,
-                  ExchangeType::PASSTHROUGH);
+                  TLocalPartitionType::PASSTHROUGH);
         EXPECT_EQ(cur_pipe->need_to_local_exchange(
                           
cur_pipe->sink()->required_data_distribution(_runtime_state.back().get()),
                           1),
@@ -592,7 +593,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
     // Build pipeline
     DescriptorTbl* desc;
     _build_fragment_context();
-    EXPECT_EQ(_runtime_state.front()->enable_local_shuffle(), true);
+    EXPECT_EQ(_runtime_state.front()->plan_local_shuffle(), true);
     {
         TTupleDescriptor tuple0 = TTupleDescriptorBuilder().set_id(0).build();
         TSlotDescriptor slot0 =
@@ -875,12 +876,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
         if (pip_idx == 1) {
             // Pipeline(ExchangeOperator(id=1, HASH_PARTITIONED) -> 
HashJoinBuildOperator(id=0))
             
EXPECT_EQ(_pipelines[pip_idx]->data_distribution().distribution_type,
-                      ExchangeType::HASH_SHUFFLE);
+                      TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
             EXPECT_EQ(_pipelines[pip_idx]
                               ->sink()
                               
->required_data_distribution(_runtime_state.back().get())
                               .distribution_type,
-                      ExchangeType::HASH_SHUFFLE);
+                      TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
             EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange(
                               
_pipelines[pip_idx]->sink()->required_data_distribution(
                                       _runtime_state.back().get()),
@@ -891,7 +892,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
             _pipelines[pip_idx]->set_data_distribution(
                     
_pipelines[pip_idx]->children().front()->data_distribution());
             
EXPECT_EQ(_pipelines[pip_idx]->data_distribution().distribution_type,
-                      ExchangeType::HASH_SHUFFLE);
+                      TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
             EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange(
                               
_pipelines[pip_idx]->sink()->required_data_distribution(
                                       _runtime_state.back().get()),
@@ -902,7 +903,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                               .back()
                               
->required_data_distribution(_runtime_state.back().get())
                               .distribution_type,
-                      ExchangeType::HASH_SHUFFLE);
+                      TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE);
             EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange(
                               
_pipelines[pip_idx]->operators().back()->required_data_distribution(
                                       _runtime_state.back().get()),
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e495d529ef6..a7fa1b35c45 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -440,6 +440,9 @@ struct TQueryOptions {
   // Use paimon-cpp to read Paimon splits on BE
   201: optional bool enable_paimon_cpp_reader = false;
 
+  // enable plan local exchange node in fe
+  202: optional bool enable_local_shuffle_planner;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.
diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift
index 86a2d9be555..c28a095277a 100644
--- a/gensrc/thrift/Partitions.thrift
+++ b/gensrc/thrift/Partitions.thrift
@@ -52,6 +52,40 @@ enum TPartitionType {
   HIVE_TABLE_SINK_UNPARTITIONED = 8
 }
 
+enum TLocalPartitionType {
+  NOOP = 0,
+  // used to resume the global hash distribution because other distribution 
break the global hash distribution,
+  // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash 
distribution.
+  //
+  // for example:                                   look here, need resume to 
GLOBAL_EXECUTION_HASH_SHUFFLE
+  //                                                                           
 ↓
+  //   Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode →  
LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode
+  //                  ExchangeNode(BROADCAST) ↗                                
                                  ↑
+  //                                                                         
ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE)
+  GLOBAL_EXECUTION_HASH_SHUFFLE = 1,
+  // used to rebalance data for rebalance data and add parallelism
+  //
+  // for example:          look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to 
rebalance data
+  //                                         ↓
+  //  Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id, 
name)) → AggregationNode(group by(id,name))
+  //
+  // the LOCAL_EXECUTION_HASH_SHUFFLE is necessary because the hash 
distribution of scan node is based on id,
+  // but the hash distribution of aggregation node is based on id and name, so 
we need to rebalance data by both
+  // id and name to make sure the data with same id and name can be sent to 
the same instance of aggregation node.
+  // and we can not use GLOBAL_EXECUTION_HASH_SHUFFLE(id, name) here, because
+  // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` is used to mapping 
partial global instance index to local
+  // instance index, and discard the other backend's instance index, the data 
not belong to the local instance will be
+  // discarded, which cause data loss.
+  LOCAL_EXECUTION_HASH_SHUFFLE = 2,
+  BUCKET_HASH_SHUFFLE = 3,
+  // round-robin partition, used to rebalance data for rebalance data and add 
parallelism
+  PASSTHROUGH = 4,
+  ADAPTIVE_PASSTHROUGH = 5,
+  BROADCAST = 6,
+  PASS_TO_ONE = 7,
+  LOCAL_MERGE_SORT = 8
+}
+
 enum TDistributionType {
   UNPARTITIONED = 0,
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 7b281dcf712..7572c593c07 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -62,7 +62,8 @@ enum TPlanNodeType {
   GROUP_COMMIT_SCAN_NODE = 33,
   MATERIALIZATION_NODE = 34,
   REC_CTE_NODE = 35,
-  REC_CTE_SCAN_NODE = 36
+  REC_CTE_SCAN_NODE = 36,
+  LOCAL_EXCHANGE_NODE = 37
 }
 
 struct TKeyRange {
@@ -1294,6 +1295,24 @@ struct TExchangeNode {
   4: optional Partitions.TPartitionType partition_type
 }
 
+struct TLocalExchangeNode {
+  1: required Partitions.TLocalPartitionType partition_type
+  // when partition_type in (GLOBAL_EXECUTION_HASH_SHUFFLE, 
LOCAL_EXECUTION_HASH_SHUFFLE, BUCKET_HASH_SHUFFLE),
+  // the distribute_expr_lists is not null, and the legacy 
`TPlanNode.distribute_expr_lists` is deprecated
+  //
+  // the hash computation:
+  // 1. for BUCKET_HASH_SHUFFLE, use distribution_exprs to compute hash value 
and mod by
+  //    `TPipelineFragmentParams.num_buckets`, and mapping bucket index to 
local instance id by
+  //    `TPipelineFragmentParams.bucket_seq_to_instance_idx`
+  // 2. for LOCAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute 
hash value and mod by
+  //    `TPipelineFragmentParams.local_params.size`, and backend will mapping 
instance index to local instance
+  //    by `i -> i`, for example: 1 -> 1, 2 -> 2, ...
+  // 3. for GLOBAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute 
hash value and mod by
+  //    `TPipelineFragmentParams.total_instances`, and mapping global instance 
index to local instance by
+  //    `TPipelineFragmentParams.shuffle_idx_to_instance_idx`
+  2: optional list<Exprs.TExpr> distribute_expr_lists
+}
+
 struct TOlapRewriteNode {
     1: required list<Exprs.TExpr> columns
     2: required list<Types.TColumnType> column_types
@@ -1510,6 +1529,7 @@ struct TPlanNode {
   50: optional list<list<Exprs.TExpr>> distribute_expr_lists
   51: optional bool is_serial_operator
   52: optional TRecCTEScanNode rec_cte_scan_node
+  53: optional TLocalExchangeNode local_exchange_node
 
   // projections is final projections, which means projecting into results and 
materializing them into the output block.
   101: optional list<Exprs.TExpr> projections


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to