This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c00dca70e60 [pipelineX](local shuffle) Support parallel execution 
despite of tablet number (#28266)
c00dca70e60 is described below

commit c00dca70e605727d10134a7d10acb4e71ae0d176
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Dec 14 12:53:54 2023 +0800

    [pipelineX](local shuffle) Support parallel execution despite of tablet 
number (#28266)
---
 be/src/pipeline/exec/aggregation_sink_operator.h   |   5 +-
 be/src/pipeline/exec/es_scan_operator.cpp          |   4 +-
 be/src/pipeline/exec/es_scan_operator.h            |   2 +-
 be/src/pipeline/exec/exchange_source_operator.h    |   4 +-
 be/src/pipeline/exec/file_scan_operator.h          |   4 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  14 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |   5 +-
 be/src/pipeline/exec/jdbc_scan_operator.cpp        |   4 +-
 be/src/pipeline/exec/jdbc_scan_operator.h          |   2 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        |   4 +-
 be/src/pipeline/exec/olap_scan_operator.h          |   2 +-
 be/src/pipeline/exec/scan_operator.cpp             |   8 +-
 be/src/pipeline/exec/scan_operator.h               |   8 +-
 be/src/pipeline/pipeline.h                         |  13 ++
 be/src/pipeline/pipeline_x/dependency.h            |  23 ++-
 .../local_exchange/local_exchange_sink_operator.h  |   2 +
 .../local_exchange_source_operator.h               |   2 +
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 107 ++++++++--
 .../pipeline_x/local_exchange/local_exchanger.h    |  59 +++++-
 be/src/pipeline/pipeline_x/operator.h              |  21 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  73 +++++--
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |   8 +-
 be/src/runtime/runtime_filter_mgr.cpp              |   2 +-
 be/src/runtime/runtime_filter_mgr.h                |   3 +-
 be/src/vec/exec/runtime_filter_consumer.cpp        |   6 +-
 be/src/vec/exec/runtime_filter_consumer.h          |   4 +-
 .../vec/runtime/shared_hash_table_controller.cpp   |   3 +-
 be/src/vec/runtime/shared_hash_table_controller.h  |   4 +-
 .../glue/translator/PhysicalPlanTranslator.java    |   8 +
 .../org/apache/doris/planner/OlapScanNode.java     |   5 +
 .../org/apache/doris/planner/PlanFragment.java     |  20 ++
 .../java/org/apache/doris/planner/ScanNode.java    |  16 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 228 ++++++++++-----------
 .../java/org/apache/doris/qe/SessionVariable.java  |  13 ++
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 35 files changed, 481 insertions(+), 206 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index cd85390dce0..639687ec74d 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -369,7 +369,10 @@ public:
     std::vector<TExpr> get_local_shuffle_exprs() const override { return 
_partition_exprs; }
     ExchangeType get_local_exchange_type() const override {
         if (_probe_expr_ctxs.empty()) {
-            return _needs_finalize ? ExchangeType::PASSTHROUGH : 
ExchangeType::NOOP;
+            return _needs_finalize || 
DataSinkOperatorX<LocalStateType>::_child_x
+                                              ->ignore_data_distribution()
+                           ? ExchangeType::PASSTHROUGH
+                           : ExchangeType::NOOP;
         }
         return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : 
ExchangeType::HASH_SHUFFLE;
     }
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index 8567db90948..c00ee6917ea 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -116,8 +116,8 @@ void EsScanLocalState::set_scan_ranges(RuntimeState* state,
 }
 
 EsScanOperatorX::EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
-                                 const DescriptorTbl& descs)
-        : ScanOperatorX<EsScanLocalState>(pool, tnode, operator_id, descs),
+                                 const DescriptorTbl& descs, int 
parallel_tasks)
+        : ScanOperatorX<EsScanLocalState>(pool, tnode, operator_id, descs, 
parallel_tasks),
           _tuple_id(tnode.es_scan_node.tuple_id),
           _tuple_desc(nullptr) {
     ScanOperatorX<EsScanLocalState>::_output_tuple_id = 
tnode.es_scan_node.tuple_id;
diff --git a/be/src/pipeline/exec/es_scan_operator.h 
b/be/src/pipeline/exec/es_scan_operator.h
index dbdbbe198bb..62d1a043c47 100644
--- a/be/src/pipeline/exec/es_scan_operator.h
+++ b/be/src/pipeline/exec/es_scan_operator.h
@@ -72,7 +72,7 @@ private:
 class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> {
 public:
     EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
-                    const DescriptorTbl& descs);
+                    const DescriptorTbl& descs, int parallel_tasks);
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 24927fc947c..49211321886 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -116,7 +116,9 @@ public:
         return _sub_plan_query_statistics_recvr;
     }
 
-    bool need_to_local_shuffle() const override { return !_is_hash_partition; }
+    bool need_to_local_shuffle() const override {
+        return !_is_hash_partition || 
OperatorX<ExchangeLocalState>::ignore_data_distribution();
+    }
 
 private:
     friend class ExchangeLocalState;
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index 6ae3344ed71..4e64bd850ba 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -70,8 +70,8 @@ private:
 class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
 public:
     FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
-                      const DescriptorTbl& descs)
-            : ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id, 
descs),
+                      const DescriptorTbl& descs, int parallel_tasks)
+            : ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id, 
descs, parallel_tasks),
               _table_name(tnode.file_scan_node.__isset.table_name ? 
tnode.file_scan_node.table_name
                                                                   : "") {
         _output_tuple_id = tnode.file_scan_node.tuple_id;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index efc75828484..b8ae2ca9666 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -62,9 +62,12 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     if (p._is_broadcast_join) {
         profile()->add_info_string("BroadcastJoin", "true");
         if (state->enable_share_hash_table_for_broadcast_join()) {
-            profile()->add_info_string("ShareHashTableEnabled", "true");
-            _should_build_hash_table = 
p._shared_hashtable_controller->should_build_hash_table(
-                    state->fragment_instance_id(), p.node_id());
+            _should_build_hash_table = info.task_idx == 0;
+            if (_should_build_hash_table) {
+                profile()->add_info_string("ShareHashTableEnabled", "true");
+                CHECK(p._shared_hashtable_controller->should_build_hash_table(
+                        state->fragment_instance_id(), p.node_id()));
+            }
         } else {
             profile()->add_info_string("ShareHashTableEnabled", "false");
         }
@@ -514,10 +517,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     } else if (!local_state._should_build_hash_table) {
         DCHECK(_shared_hashtable_controller != nullptr);
         DCHECK(_shared_hash_table_context != nullptr);
-        auto wait_timer = ADD_TIMER(local_state.profile(), 
"WaitForSharedHashTableTime");
-        SCOPED_TIMER(wait_timer);
-        RETURN_IF_ERROR(
-                _shared_hashtable_controller->wait_for_signal(state, 
_shared_hash_table_context));
+        CHECK(_shared_hash_table_context->signaled);
 
         local_state.profile()->add_info_string(
                 "SharedHashTableFrom",
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index c00d1b8e591..580c2bb8ff9 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -157,8 +157,11 @@ public:
 
     std::vector<TExpr> get_local_shuffle_exprs() const override { return 
_partition_exprs; }
     ExchangeType get_local_exchange_type() const override {
-        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || 
_is_broadcast_join) {
+        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return ExchangeType::NOOP;
+        } else if (_is_broadcast_join) {
+            return _child_x->ignore_data_distribution() ? 
ExchangeType::BROADCAST
+                                                        : ExchangeType::NOOP;
         }
         return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
                                _join_distribution == 
TJoinDistributionType::COLOCATE
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp 
b/be/src/pipeline/exec/jdbc_scan_operator.cpp
index 74890f647fc..f6c22db9283 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp
@@ -38,8 +38,8 @@ Status 
JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
 }
 
 JDBCScanOperatorX::JDBCScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, 
int operator_id,
-                                     const DescriptorTbl& descs)
-        : ScanOperatorX<JDBCScanLocalState>(pool, tnode, operator_id, descs),
+                                     const DescriptorTbl& descs, int 
parallel_tasks)
+        : ScanOperatorX<JDBCScanLocalState>(pool, tnode, operator_id, descs, 
parallel_tasks),
           _table_name(tnode.jdbc_scan_node.table_name),
           _tuple_id(tnode.jdbc_scan_node.tuple_id),
           _query_string(tnode.jdbc_scan_node.query_string),
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.h 
b/be/src/pipeline/exec/jdbc_scan_operator.h
index 2acf5b5ec9d..825e01acc2a 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.h
+++ b/be/src/pipeline/exec/jdbc_scan_operator.h
@@ -54,7 +54,7 @@ private:
 class JDBCScanOperatorX final : public ScanOperatorX<JDBCScanLocalState> {
 public:
     JDBCScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
-                      const DescriptorTbl& descs);
+                      const DescriptorTbl& descs, int parallel_tasks);
 
 private:
     friend class JDBCScanLocalState;
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 1e9c4da1e82..c751c167d0c 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -491,8 +491,8 @@ void OlapScanLocalState::add_filter_info(int id, const 
PredicateFilterInfo& upda
 }
 
 OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, 
int operator_id,
-                                     const DescriptorTbl& descs)
-        : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs),
+                                     const DescriptorTbl& descs, int 
parallel_tasks)
+        : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, 
parallel_tasks),
           _olap_scan_node(tnode.olap_scan_node) {
     _output_tuple_id = tnode.olap_scan_node.tuple_id;
     _col_distribute_ids = tnode.olap_scan_node.distribute_column_ids;
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 868d3efe555..f1db77b2054 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -188,7 +188,7 @@ private:
 class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
 public:
     OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
-                      const DescriptorTbl& descs);
+                      const DescriptorTbl& descs, int parallel_tasks);
 
 private:
     friend class OlapScanLocalState;
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 482d71c6265..d559cccdad9 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -117,13 +117,13 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
+    auto& p = _parent->cast<typename Derived::Parent>();
     RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
 
     _scan_dependency = 
ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
                                                      
PipelineXLocalState<>::_parent->node_id(),
                                                      state->get_query_ctx());
 
-    auto& p = _parent->cast<typename Derived::Parent>();
     set_scan_ranges(state, info.scan_ranges);
     _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
     for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
@@ -1362,9 +1362,11 @@ void 
ScanLocalState<Derived>::get_cast_types_for_variants() {
 
 template <typename LocalStateType>
 ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
-                                             int operator_id, const 
DescriptorTbl& descs)
+                                             int operator_id, const 
DescriptorTbl& descs,
+                                             int parallel_tasks)
         : OperatorX<LocalStateType>(pool, tnode, operator_id, descs),
-          _runtime_filter_descs(tnode.runtime_filters) {
+          _runtime_filter_descs(tnode.runtime_filters),
+          _parallel_tasks(parallel_tasks) {
     if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
         // Which means the request could be fullfilled in a single segment 
iterator request.
         if (tnode.limit > 0 && tnode.limit < 1024) {
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 18b5cebd6c1..9eda1be1692 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -429,8 +429,9 @@ public:
     TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
 
     bool need_to_local_shuffle() const override {
-        // If _col_distribute_ids is not empty, we prefer to not do local 
shuffle.
-        return _col_distribute_ids.empty();
+        // 1. `_col_distribute_ids` is empty means storage distribution is not 
effective, so we prefer to do local shuffle.
+        // 2. `ignore_data_distribution()` returns true means we ignore the 
distribution.
+        return _col_distribute_ids.empty() || 
OperatorX<LocalStateType>::ignore_data_distribution();
     }
 
     bool is_bucket_shuffle_scan() const override { return 
!_col_distribute_ids.empty(); }
@@ -443,7 +444,7 @@ public:
 protected:
     using LocalState = LocalStateType;
     ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
-                  const DescriptorTbl& descs);
+                  const DescriptorTbl& descs, int parallel_tasks = 0);
     virtual ~ScanOperatorX() = default;
     template <typename Derived>
     friend class ScanLocalState;
@@ -479,6 +480,7 @@ protected:
 
     // Record the value of the aggregate function 'count' from doris's be
     int64_t _push_down_count = -1;
+    const int _parallel_tasks = 0;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 5d623f899aa..832a3f51e3c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -144,6 +144,19 @@ public:
     void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; }
     int num_tasks() const { return _num_tasks; }
 
+    std::string debug_string() {
+        fmt::memory_buffer debug_string_buffer;
+        fmt::format_to(debug_string_buffer,
+                       "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: 
{}, "
+                       "_need_to_local_shuffle: {}]",
+                       _pipeline_id, _num_tasks, _num_tasks_created, 
_need_to_local_shuffle);
+        for (size_t i = 0; i < operatorXs.size(); i++) {
+            fmt::format_to(debug_string_buffer, "\n{}", 
operatorXs[i]->debug_string(i));
+        }
+        fmt::format_to(debug_string_buffer, "\n{}", 
_sink_x->debug_string(operatorXs.size()));
+        return fmt::to_string(debug_string_buffer);
+    }
+
 private:
     void _init_profile();
 
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index ecf0c8188f4..70698a58d2f 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -577,6 +577,7 @@ enum class ExchangeType : uint8_t {
     HASH_SHUFFLE = 1,
     PASSTHROUGH = 2,
     BUCKET_HASH_SHUFFLE = 3,
+    BROADCAST = 4,
 };
 
 inline std::string get_exchange_type_name(ExchangeType idx) {
@@ -589,6 +590,8 @@ inline std::string get_exchange_type_name(ExchangeType idx) 
{
         return "PASSTHROUGH";
     case ExchangeType::BUCKET_HASH_SHUFFLE:
         return "BUCKET_HASH_SHUFFLE";
+    case ExchangeType::BROADCAST:
+        return "BROADCAST";
     }
     LOG(FATAL) << "__builtin_unreachable";
     __builtin_unreachable();
@@ -623,16 +626,28 @@ public:
         dep->set_ready();
     }
 
-    void add_mem_usage(int channel_id, size_t delta) {
+    void add_mem_usage(int channel_id, size_t delta, bool 
update_total_mem_usage = true) {
         mem_trackers[channel_id]->consume(delta);
+        if (update_total_mem_usage) {
+            add_total_mem_usage(delta);
+        }
+    }
+
+    void sub_mem_usage(int channel_id, size_t delta, bool 
update_total_mem_usage = true) {
+        mem_trackers[channel_id]->release(delta);
+        if (update_total_mem_usage) {
+            sub_total_mem_usage(delta);
+        }
+    }
+
+    void add_total_mem_usage(size_t delta) {
         if (mem_usage.fetch_add(delta) > 
config::local_exchange_buffer_mem_limit) {
             sink_dependency->block();
         }
     }
 
-    void sub_mem_usage(int channel_id, size_t delta) {
-        mem_trackers[channel_id]->release(delta);
-        if (mem_usage.fetch_sub(delta) < 
config::local_exchange_buffer_mem_limit) {
+    void sub_total_mem_usage(size_t delta) {
+        if (mem_usage.fetch_sub(delta) <= 
config::local_exchange_buffer_mem_limit) {
             sink_dependency->set_ready();
         }
     }
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 2be71e5847c..01b5f9d0999 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -33,6 +33,7 @@ public:
 class Exchanger;
 class ShuffleExchanger;
 class PassthroughExchanger;
+class BroadcastExchanger;
 class LocalExchangeSinkOperatorX;
 class LocalExchangeSinkLocalState final
         : public PipelineXSinkLocalState<LocalExchangeSinkDependency> {
@@ -53,6 +54,7 @@ private:
     friend class ShuffleExchanger;
     friend class BucketShuffleExchanger;
     friend class PassthroughExchanger;
+    friend class BroadcastExchanger;
 
     Exchanger* _exchanger = nullptr;
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 1f87b86ebe2..0db2db29439 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -35,6 +35,7 @@ public:
 class Exchanger;
 class ShuffleExchanger;
 class PassthroughExchanger;
+class BroadcastExchanger;
 class LocalExchangeSourceOperatorX;
 class LocalExchangeSourceLocalState final
         : public PipelineXLocalState<LocalExchangeSourceDependency> {
@@ -50,6 +51,7 @@ private:
     friend class LocalExchangeSourceOperatorX;
     friend class ShuffleExchanger;
     friend class PassthroughExchanger;
+    friend class BroadcastExchanger;
 
     Exchanger* _exchanger = nullptr;
     int _channel_id;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 86e1b3866fe..4e637b111b3 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -49,8 +49,12 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
         do {
             const auto* offset_start = &((
                     
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
-            mutable_block->add_rows(partitioned_block.first.get(), 
offset_start,
+            auto block_wrapper = partitioned_block.first;
+            local_state._shared_state->sub_mem_usage(
+                    local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
+            mutable_block->add_rows(&block_wrapper->data_block, offset_start,
                                     offset_start + 
std::get<2>(partitioned_block.second));
+            block_wrapper->unref(local_state._shared_state);
         } while (mutable_block->rows() < state->batch_size() &&
                  
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
         *result_block = mutable_block->to_block();
@@ -58,8 +62,8 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
     if (_running_sink_operators == 0) {
         if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
             SCOPED_TIMER(local_state._copy_data_timer);
-            mutable_block =
-                    
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+            mutable_block = vectorized::MutableBlock::create_unique(
+                    partitioned_block.first->data_block.clone_empty());
             get_data(block);
         } else {
             COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -67,8 +71,8 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
         }
     } else if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
         SCOPED_TIMER(local_state._copy_data_timer);
-        mutable_block =
-                
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+        mutable_block = vectorized::MutableBlock::create_unique(
+                partitioned_block.first->data_block.clone_empty());
         get_data(block);
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -98,15 +102,45 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             local_state._partition_rows_histogram[channel_ids[i]]--;
         }
     }
-    auto new_block = vectorized::Block::create_shared(block->clone_empty());
-    new_block->swap(*block);
+
+    vectorized::Block data_block;
+    std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper;
+    if (_free_blocks.try_enqueue(data_block)) {
+        new_block_wrapper = 
ShuffleBlockWrapper::create_shared(std::move(data_block));
+    } else {
+        new_block_wrapper = 
ShuffleBlockWrapper::create_shared(block->clone_empty());
+    }
+
+    new_block_wrapper->data_block.swap(*block);
+    if (new_block_wrapper->data_block.empty()) {
+        return Status::OK();
+    }
+    
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
+    new_block_wrapper->ref(_num_partitions);
     if (get_type() == ExchangeType::HASH_SHUFFLE) {
         for (size_t i = 0; i < _num_partitions; i++) {
             size_t start = local_state._partition_rows_histogram[i];
             size_t size = local_state._partition_rows_histogram[i + 1] - start;
             if (size > 0) {
-                data_queue[i].enqueue({new_block, {row_idx, start, size}});
+                local_state._shared_state->add_mem_usage(
+                        i, new_block_wrapper->data_block.allocated_bytes(), 
false);
+                data_queue[i].enqueue({new_block_wrapper, {row_idx, start, 
size}});
+                local_state._shared_state->set_ready_to_read(i);
+            } else {
+                new_block_wrapper->unref(local_state._shared_state);
+            }
+        }
+    } else if (_num_senders != _num_sources) {
+        for (size_t i = 0; i < _num_partitions; i++) {
+            size_t start = local_state._partition_rows_histogram[i];
+            size_t size = local_state._partition_rows_histogram[i + 1] - start;
+            if (size > 0) {
+                local_state._shared_state->add_mem_usage(
+                        i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(), false);
+                data_queue[i % _num_sources].enqueue({new_block_wrapper, 
{row_idx, start, size}});
                 local_state._shared_state->set_ready_to_read(i);
+            } else {
+                new_block_wrapper->unref(local_state._shared_state);
             }
         }
     } else {
@@ -116,8 +150,12 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             size_t start = local_state._partition_rows_histogram[i];
             size_t size = local_state._partition_rows_histogram[i + 1] - start;
             if (size > 0) {
-                data_queue[map[i]].enqueue({new_block, {row_idx, start, 
size}});
+                local_state._shared_state->add_mem_usage(
+                        map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
+                data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}});
                 local_state._shared_state->set_ready_to_read(map[i]);
+            } else {
+                new_block_wrapper->unref(local_state._shared_state);
             }
         }
     }
@@ -128,10 +166,13 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
 Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block,
                                   SourceState source_state,
                                   LocalExchangeSinkLocalState& local_state) {
-    vectorized::Block new_block(in_block->clone_empty());
+    vectorized::Block new_block;
+    if (!_free_blocks.try_dequeue(new_block)) {
+        new_block = {in_block->clone_empty()};
+    }
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    local_state._shared_state->add_mem_usage(channel_id, new_block.bytes());
+    local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
     _data_queue[channel_id].enqueue(std::move(new_block));
     local_state._shared_state->set_ready_to_read(channel_id);
 
@@ -144,15 +185,53 @@ Status PassthroughExchanger::get_block(RuntimeState* 
state, vectorized::Block* b
     vectorized::Block next_block;
     if (_running_sink_operators == 0) {
         if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            *block = std::move(next_block);
-            local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->bytes());
+            block->swap(next_block);
+            _free_blocks.enqueue(std::move(next_block));
+            local_state._shared_state->sub_mem_usage(local_state._channel_id,
+                                                     block->allocated_bytes());
         } else {
             COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             source_state = SourceState::FINISHED;
         }
     } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+        block->swap(next_block);
+        _free_blocks.enqueue(std::move(next_block));
+        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+    } else {
+        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+        local_state._dependency->block();
+    }
+    return Status::OK();
+}
+
+Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block,
+                                SourceState source_state,
+                                LocalExchangeSinkLocalState& local_state) {
+    vectorized::Block new_block(in_block->clone_empty());
+    new_block.swap(*in_block);
+    _data_queue[0].enqueue(std::move(new_block));
+    local_state._shared_state->set_ready_to_read(0);
+
+    return Status::OK();
+}
+
+Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block,
+                                     SourceState& source_state,
+                                     LocalExchangeSourceLocalState& 
local_state) {
+    if (local_state._channel_id != 0) {
+        source_state = SourceState::FINISHED;
+        return Status::OK();
+    }
+    vectorized::Block next_block;
+    if (_running_sink_operators == 0) {
+        if (_data_queue[0].try_dequeue(next_block)) {
+            *block = std::move(next_block);
+        } else {
+            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+            source_state = SourceState::FINISHED;
+        }
+    } else if (_data_queue[0].try_dequeue(next_block)) {
         *block = std::move(next_block);
-        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->bytes());
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index 8c28469b50d..a5ce907a137 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -24,11 +24,20 @@ namespace doris::pipeline {
 
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
+struct ShuffleBlockWrapper;
 
 class Exchanger {
 public:
     Exchanger(int running_sink_operators, int num_partitions)
-            : _running_sink_operators(running_sink_operators), 
_num_partitions(num_partitions) {}
+            : _running_sink_operators(running_sink_operators),
+              _num_partitions(num_partitions),
+              _num_senders(running_sink_operators),
+              _num_sources(num_partitions) {}
+    Exchanger(int running_sink_operators, int num_sources, int num_partitions)
+            : _running_sink_operators(running_sink_operators),
+              _num_partitions(num_partitions),
+              _num_senders(running_sink_operators),
+              _num_sources(num_sources) {}
     virtual ~Exchanger() = default;
     virtual Status get_block(RuntimeState* state, vectorized::Block* block,
                              SourceState& source_state,
@@ -40,16 +49,35 @@ public:
 protected:
     friend struct LocalExchangeSourceDependency;
     friend struct LocalExchangeSharedState;
+    friend struct ShuffleBlockWrapper;
     std::atomic<int> _running_sink_operators = 0;
     const int _num_partitions;
+    const int _num_senders;
+    const int _num_sources;
+    moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
 };
 
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 
+struct ShuffleBlockWrapper {
+    ENABLE_FACTORY_CREATOR(ShuffleBlockWrapper);
+    ShuffleBlockWrapper(vectorized::Block&& data_block_) : 
data_block(std::move(data_block_)) {}
+    void ref(int delta) { ref_count += delta; }
+    void unref(LocalExchangeSharedState* shared_state) {
+        if (ref_count.fetch_sub(1) == 1) {
+            shared_state->sub_total_mem_usage(data_block.allocated_bytes());
+            data_block.clear_column_data();
+            
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
+        }
+    }
+    std::atomic<int> ref_count = 0;
+    vectorized::Block data_block;
+};
+
 class ShuffleExchanger : public Exchanger {
     using PartitionedBlock =
-            std::pair<std::shared_ptr<vectorized::Block>,
+            std::pair<std::shared_ptr<ShuffleBlockWrapper>,
                       std::tuple<std::shared_ptr<std::vector<uint32_t>>, 
size_t, size_t>>;
 
 public:
@@ -58,6 +86,10 @@ public:
             : Exchanger(running_sink_operators, num_partitions) {
         _data_queue.resize(num_partitions);
     }
+    ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions)
+            : Exchanger(running_sink_operators, num_sources, num_partitions) {
+        _data_queue.resize(num_partitions);
+    }
     ~ShuffleExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState 
source_state,
                 LocalExchangeSinkLocalState& local_state) override;
@@ -76,8 +108,8 @@ protected:
 
 class BucketShuffleExchanger : public ShuffleExchanger {
     ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
-    BucketShuffleExchanger(int running_sink_operators, int num_buckets)
-            : ShuffleExchanger(running_sink_operators, num_buckets) {}
+    BucketShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions)
+            : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions) {}
     ~BucketShuffleExchanger() override = default;
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
@@ -101,4 +133,23 @@ private:
     std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 };
 
+class BroadcastExchanger final : public Exchanger {
+public:
+    ENABLE_FACTORY_CREATOR(BroadcastExchanger);
+    BroadcastExchanger(int running_sink_operators, int num_partitions)
+            : Exchanger(running_sink_operators, num_partitions) {
+        _data_queue.resize(num_partitions);
+    }
+    ~BroadcastExchanger() override = default;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState 
source_state,
+                LocalExchangeSinkLocalState& local_state) override;
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, 
SourceState& source_state,
+                     LocalExchangeSourceLocalState& local_state) override;
+    ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
+
+private:
+    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
+};
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index c298ad692ae..783b15ac7eb 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -34,13 +34,14 @@ struct LocalStateInfo {
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
                             std::shared_ptr<LocalExchangeSinkDependency>>>
             le_state_map;
-    int task_idx;
+    const int task_idx;
 
     DependencySPtr dependency;
 };
 
 // This struct is used only for initializing local sink state.
 struct LocalSinkStateInfo {
+    const int task_idx;
     RuntimeProfile* parent_profile = nullptr;
     const int sender_id;
     std::vector<DependencySPtr>& dependencys;
@@ -180,9 +181,15 @@ public:
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
-    virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
-    virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; }
-    virtual ExchangeType get_local_exchange_type() const { return 
ExchangeType::NOOP; }
+    [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
+    [[nodiscard]] virtual std::vector<TExpr> get_local_shuffle_exprs() const { 
return {}; }
+    [[nodiscard]] virtual ExchangeType get_local_exchange_type() const {
+        return _child_x && _child_x->ignore_data_distribution() && !is_source()
+                       ? ExchangeType::PASSTHROUGH
+                       : ExchangeType::NOOP;
+    }
+    [[nodiscard]] bool ignore_data_distribution() const { return 
_ignore_data_distribution; }
+    void set_ignore_data_distribution() { _ignore_data_distribution = true; }
 
     Status prepare(RuntimeState* state) override;
 
@@ -296,6 +303,7 @@ protected:
     int64_t _limit; // -1: no limit
 
     std::string _op_name;
+    bool _ignore_data_distribution = false;
 };
 
 template <typename LocalStateType>
@@ -471,7 +479,10 @@ public:
 
     virtual void get_dependency(std::vector<DependencySPtr>& dependency, 
QueryContext* ctx) = 0;
     virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; }
-    virtual ExchangeType get_local_exchange_type() const { return 
ExchangeType::NOOP; }
+    virtual ExchangeType get_local_exchange_type() const {
+        return _child_x && _child_x->ignore_data_distribution() ? 
ExchangeType::PASSTHROUGH
+                                                                : 
ExchangeType::NOOP;
+    }
 
     Status close(RuntimeState* state) override {
         return Status::InternalError("Should not reach here!");
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 12fe179eceb..c92d06c5e91 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -228,15 +228,16 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
     static_cast<void>(root_pipeline->set_sink(_sink));
 
+    for (PipelinePtr& pipeline : _pipelines) {
+        DCHECK(pipeline->sink_x() != nullptr) << 
pipeline->operator_xs().size();
+        
static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+    }
     if (_enable_local_shuffle()) {
         RETURN_IF_ERROR(
                 _plan_local_exchange(request.num_buckets, 
request.bucket_seq_to_instance_idx));
     }
-
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
-        DCHECK(pipeline->sink_x() != nullptr) << 
pipeline->operator_xs().size();
-        
static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
         pipeline->children().clear();
         RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
     }
@@ -266,8 +267,11 @@ Status PipelineXFragmentContext::_plan_local_exchange(
             }
         }
 
-        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, 
_pipelines[pip_idx],
-                                             bucket_seq_to_instance_idx));
+        RETURN_IF_ERROR(_plan_local_exchange(
+                
_pipelines[pip_idx]->operator_xs().front()->ignore_data_distribution()
+                        ? _num_instances
+                        : num_buckets,
+                pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx));
     }
     return Status::OK();
 }
@@ -725,6 +729,12 @@ void 
PipelineXFragmentContext::_inherit_pipeline_properties(ExchangeType exchang
         
pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle());
         pipe_with_source->set_need_to_local_shuffle(true);
         break;
+    case ExchangeType::BROADCAST:
+        // If PASSTHROUGH local exchanger is planned, data will be split 
randomly. So we should make
+        // sure remaining operators should use local shuffle to make data 
distribution right.
+        
pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle());
+        pipe_with_source->set_need_to_local_shuffle(true);
+        break;
     default:
         __builtin_unreachable();
     }
@@ -764,25 +774,29 @@ Status PipelineXFragmentContext::_add_local_exchange(
 
     // 3. Create and initialize LocalExchangeSharedState.
     auto shared_state = LocalExchangeSharedState::create_shared();
-    shared_state->source_dependencies.resize(_num_instances, nullptr);
-    shared_state->mem_trackers.resize(_num_instances);
     switch (exchange_type) {
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger =
                 ShuffleExchanger::create_unique(new_pip->num_tasks(), 
_num_instances);
         break;
     case ExchangeType::BUCKET_HASH_SHUFFLE:
-        shared_state->exchanger =
-                BucketShuffleExchanger::create_unique(new_pip->num_tasks(), 
num_buckets);
+        shared_state->exchanger = BucketShuffleExchanger::create_unique(
+                new_pip->num_tasks(), _num_instances, num_buckets);
         break;
     case ExchangeType::PASSTHROUGH:
         shared_state->exchanger =
                 PassthroughExchanger::create_unique(new_pip->num_tasks(), 
_num_instances);
         break;
+    case ExchangeType::BROADCAST:
+        shared_state->exchanger =
+                BroadcastExchanger::create_unique(new_pip->num_tasks(), 
_num_instances);
+        break;
     default:
         return Status::InternalError("Unsupported local exchange type : " +
                                      std::to_string((int)exchange_type));
     }
+    shared_state->source_dependencies.resize(_num_instances, nullptr);
+    shared_state->mem_trackers.resize(_num_instances, nullptr);
     auto sink_dep = std::make_shared<LocalExchangeSinkDependency>(sink_id, 
local_exchange_id,
                                                                   
_runtime_state->get_query_ctx());
     sink_dep->set_shared_state(shared_state);
@@ -838,6 +852,8 @@ Status PipelineXFragmentContext::_add_local_exchange(
     }
     cur_pipe->set_children(new_children);
     _dag[downstream_pipeline_id] = edges_with_source;
+    
RETURN_IF_ERROR(new_pip->sink_x()->set_child(new_pip->operator_xs().back()));
+    
RETURN_IF_ERROR(cur_pipe->sink_x()->set_child(cur_pipe->operator_xs().back()));
 
     CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + 
new_pip->operator_xs().size())
             << "total_op_num: " << total_op_num
@@ -855,35 +871,54 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
     std::stringstream error_msg;
     switch (tnode.node_type) {
     case TPlanNodeType::OLAP_SCAN_NODE: {
-        op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), 
descs));
+        op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        const bool shared_scan =
-                find_with_default(request.per_node_shared_scans, 
op->node_id(), false);
-        if (shared_scan) {
-            cur_pipe->set_num_tasks(1);
+        if (find_with_default(request.per_node_shared_scans, op->node_id(), 
false)) {
+            if (request.__isset.parallel_instances) {
+                cur_pipe->set_num_tasks(request.parallel_instances);
+            }
+            op->set_ignore_data_distribution();
         }
         break;
     }
     case doris::TPlanNodeType::JDBC_SCAN_NODE: {
         if (config::enable_java_support) {
-            op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), 
descs));
+            op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), 
descs, _num_instances));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
         } else {
             return Status::InternalError(
                     "Jdbc scan node is disabled, you can change be config 
enable_java_support "
                     "to true and restart be.");
         }
+        if (find_with_default(request.per_node_shared_scans, op->node_id(), 
false)) {
+            if (request.__isset.parallel_instances) {
+                cur_pipe->set_num_tasks(request.parallel_instances);
+            }
+            op->set_ignore_data_distribution();
+        }
         break;
     }
     case doris::TPlanNodeType::FILE_SCAN_NODE: {
-        op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), 
descs));
+        op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        if (find_with_default(request.per_node_shared_scans, op->node_id(), 
false)) {
+            if (request.__isset.parallel_instances) {
+                cur_pipe->set_num_tasks(request.parallel_instances);
+            }
+            op->set_ignore_data_distribution();
+        }
         break;
     }
     case TPlanNodeType::ES_SCAN_NODE:
     case TPlanNodeType::ES_HTTP_SCAN_NODE: {
-        op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs));
+        op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        if (find_with_default(request.per_node_shared_scans, op->node_id(), 
false)) {
+            if (request.__isset.parallel_instances) {
+                cur_pipe->set_num_tasks(request.parallel_instances);
+            }
+            op->set_ignore_data_distribution();
+        }
         break;
     }
     case TPlanNodeType::EXCHANGE_NODE: {
@@ -891,6 +926,10 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         DCHECK_GT(num_senders, 0);
         op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), 
descs, num_senders));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        if (request.__isset.parallel_instances) {
+            cur_pipe->set_num_tasks(request.parallel_instances);
+            op->set_ignore_data_distribution();
+        }
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index cda47527264..9a7c5d99050 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -79,8 +79,12 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
 
     {
         // set sink local state
-        LocalSinkStateInfo info {_task_profile.get(), local_params.sender_id,
-                                 get_downstream_dependency(), _le_state_map, 
tsink};
+        LocalSinkStateInfo info {_task_idx,
+                                 _task_profile.get(),
+                                 local_params.sender_id,
+                                 get_downstream_dependency(),
+                                 _le_state_map,
+                                 tsink};
         RETURN_IF_ERROR(_sink->setup_local_state(state, info));
     }
 
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index a2120c92389..c1ebf2103f8 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -111,7 +111,7 @@ Status RuntimeFilterMgr::get_consume_filters(const int 
filter_id,
 
 Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& 
desc,
                                                   const TQueryOptions& 
options, int node_id,
-                                                  bool build_bf_exactly) {
+                                                  bool build_bf_exactly, int 
merged_rf_num) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 5f9ee46d656..2541299b5d8 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -83,7 +83,8 @@ public:
 
     // register filter
     Status register_consumer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
-                                    int node_id, bool build_bf_exactly = 
false);
+                                    int node_id, bool build_bf_exactly = false,
+                                    int merged_rf_num = 0);
     Status register_producer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     bool build_bf_exactly = false);
 
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 9eda2788f06..d97338da86d 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -30,9 +30,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t 
filter_id,
     _blocked_by_rf = std::make_shared<std::atomic_bool>(false);
 }
 
-Status RuntimeFilterConsumer::init(RuntimeState* state) {
+Status RuntimeFilterConsumer::init(RuntimeState* state, int parallel_tasks) {
     _state = state;
-    RETURN_IF_ERROR(_register_runtime_filter());
+    RETURN_IF_ERROR(_register_runtime_filter(parallel_tasks));
     return Status::OK();
 }
 
@@ -45,7 +45,7 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile* 
profile) {
     profile->add_info_string("RuntimeFilters: ", ss.str());
 }
 
-Status RuntimeFilterConsumer::_register_runtime_filter() {
+Status RuntimeFilterConsumer::_register_runtime_filter(int parallel_tasks) {
     int filter_size = _runtime_filter_descs.size();
     _runtime_filter_ctxs.reserve(filter_size);
     _runtime_filter_ready_flag.reserve(filter_size);
diff --git a/be/src/vec/exec/runtime_filter_consumer.h 
b/be/src/vec/exec/runtime_filter_consumer.h
index 3fa822b4b73..00c10cfa9d8 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -30,7 +30,7 @@ public:
                           const RowDescriptor& row_descriptor, 
VExprContextSPtrs& conjuncts);
     ~RuntimeFilterConsumer() = default;
 
-    Status init(RuntimeState* state);
+    Status init(RuntimeState* state, int parallel_tasks = 0);
 
     // Try to append late arrived runtime filters.
     // Return num of filters which are applied already.
@@ -42,7 +42,7 @@ public:
 
 protected:
     // Register and get all runtime filters at Init phase.
-    Status _register_runtime_filter();
+    Status _register_runtime_filter(int parallel_tasks);
     // Get all arrived runtime filters at Open phase.
     Status _acquire_runtime_filter();
     // Append late-arrival runtime filters to the vconjunct_ctx.
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 0dc50f20859..53e24df183a 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -105,7 +105,8 @@ Status 
SharedHashTableController::wait_for_signal(RuntimeState* state,
     // maybe builder signaled before other instances waiting,
     // so here need to check value of `signaled`
     while (!context->signaled) {
-        _cv.wait_for(lock, std::chrono::milliseconds(400), [&]() { return 
context->signaled; });
+        _cv.wait_for(lock, std::chrono::milliseconds(400),
+                     [&]() { return context->signaled.load(); });
         // return if the instances is cancelled(eg. query timeout)
         RETURN_IF_CANCELLED(state);
     }
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index e1c01709042..b8770e63856 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -60,8 +60,8 @@ struct SharedHashTableContext {
     std::shared_ptr<void> hash_table_variants;
     std::shared_ptr<Block> block;
     std::map<int, SharedRuntimeFilterContext> runtime_filters;
-    bool signaled {};
-    bool short_circuit_for_null_in_probe_side {};
+    std::atomic<bool> signaled = false;
+    bool short_circuit_for_null_in_probe_side = false;
 };
 
 using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 7050323b1af..d85115750d8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -863,6 +863,10 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             // Set colocate info in agg node. This is a hint for local 
shuffling to decide which type of
             // local exchanger will be used.
             aggregationNode.setColocate(true);
+
+            if (aggregate.getAggMode().isFinalPhase) {
+                inputPlanFragment.setHasColocateFinalizeAggNode(true);
+            }
         }
         setPlanRoot(inputPlanFragment, aggregationNode, aggregate);
         if (aggregate.getStats() != null) {
@@ -1134,6 +1138,10 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 null, null, null, hashJoin.isMarkJoin());
 
         PlanFragment currentFragment = connectJoinNode(hashJoinNode, 
leftFragment, rightFragment, context, hashJoin);
+
+        if (joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN) {
+            currentFragment.setHasNullAwareLeftAntiJoin(true);
+        }
         if (JoinUtils.shouldColocateJoin(physicalHashJoin)) {
             // TODO: add reason
             hashJoinNode.setColocate(true, "");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 0683e54082a..15d83702ff3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1277,9 +1277,14 @@ public class OlapScanNode extends ScanNode {
         // In pipeline exec engine, the instance num equals be_num * parallel 
instance.
         // so here we need count distinct be_num to do the work. make sure get 
right instance
         if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
+                && 
!ConnectContext.get().getSessionVariable().getEnablePipelineXEngine()
                 && 
ConnectContext.get().getSessionVariable().getEnableSharedScan()) {
             return 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
         }
+        if 
(ConnectContext.get().getSessionVariable().getEnablePipelineXEngine()
+                && 
ConnectContext.get().getSessionVariable().isIgnoreScanDistribution()) {
+            return 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        }
         return scanRangeLocations.size();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index df4aa499feb..c128a2ecec7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -149,6 +149,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     // has colocate plan node
     private boolean hasColocatePlanNode = false;
 
+    private boolean hasColocateFinalizeAggNode = false;
+
+    private boolean hasNullAwareLeftAntiJoin = false;
+
     private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
 
     /**
@@ -470,4 +474,20 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     public void setBucketNum(int bucketNum) {
         this.bucketNum = bucketNum;
     }
+
+    public boolean isHasColocateFinalizeAggNode() {
+        return hasColocateFinalizeAggNode;
+    }
+
+    public void setHasColocateFinalizeAggNode(boolean 
hasColocateFinalizeAggNode) {
+        this.hasColocateFinalizeAggNode = hasColocateFinalizeAggNode;
+    }
+
+    public boolean isHasNullAwareLeftAntiJoin() {
+        return hasNullAwareLeftAntiJoin;
+    }
+
+    public void setHasNullAwareLeftAntiJoin(boolean hasNullAwareLeftAntiJoin) {
+        this.hasNullAwareLeftAntiJoin = hasNullAwareLeftAntiJoin;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 0559600f920..6eece9aa545 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -43,6 +43,7 @@ import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.external.FederationBackendPolicy;
+import org.apache.doris.planner.external.FileScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -711,7 +712,20 @@ public abstract class ScanNode extends PlanNode {
     // 1. is key search
     // 2. session variable not enable_shared_scan
     public boolean shouldDisableSharedScan(ConnectContext context) {
-        return isKeySearch() || 
!context.getSessionVariable().getEnableSharedScan();
+        return isKeySearch() || context == null
+                || !context.getSessionVariable().getEnableSharedScan()
+                || !context.getSessionVariable().getEnablePipelineEngine()
+                || context.getSessionVariable().getEnablePipelineXEngine()
+                || this instanceof FileScanNode
+                || getShouldColoScan();
+    }
+
+    public boolean ignoreScanDistribution(ConnectContext context) {
+        return !isKeySearch() && context != null
+                && context.getSessionVariable().isIgnoreScanDistribution()
+                && context.getSessionVariable().getEnablePipelineXEngine()
+                && !fragment.isHasColocateFinalizeAggNode()
+                && !fragment.isHasNullAwareLeftAntiJoin();
     }
 
     public boolean haveLimitAndConjunts() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ba1499fae11..3c8dd8c1dc8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.DataSink;
@@ -45,7 +46,6 @@ import org.apache.doris.planner.IntersectNode;
 import org.apache.doris.planner.MultiCastDataSink;
 import org.apache.doris.planner.MultiCastPlanFragment;
 import org.apache.doris.planner.OlapScanNode;
-import org.apache.doris.planner.OriginalPlanner;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNode;
@@ -60,7 +60,6 @@ import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.planner.external.ExternalScanNode;
 import org.apache.doris.planner.external.FileQueryScanNode;
-import org.apache.doris.planner.external.FileScanNode;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
@@ -247,6 +246,7 @@ public class Coordinator implements CoordInterface {
 
     private boolean enablePipelineEngine = false;
     private boolean enablePipelineXEngine = false;
+    private boolean useNereids = false;
 
     // Runtime filter merge instance address and ID
     public TNetworkAddress runtimeFilterMergeAddr;
@@ -307,7 +307,8 @@ public class Coordinator implements CoordInterface {
                 && (fragments.size() > 0);
 
         initQueryOptions(context);
-        if (planner instanceof OriginalPlanner) {
+        useNereids = planner instanceof NereidsPlanner;
+        if (!useNereids) {
             // Enable local shuffle on pipelineX engine only if Nereids 
planner is applied.
             queryOptions.setEnableLocalShuffle(false);
         }
@@ -1584,7 +1585,10 @@ public class Coordinator implements CoordInterface {
                     dest.server = dummyServer;
                     dest.setBrpcServer(dummyServer);
 
-                    for (FInstanceExecParam instanceExecParams : 
destParams.instanceExecParams) {
+                    int parallelTasksNum = destParams.ignoreDataDistribution
+                            ? destParams.parallelTasksNum : 
destParams.instanceExecParams.size();
+                    for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
+                        FInstanceExecParam instanceExecParams = 
destParams.instanceExecParams.get(insIdx);
                         if 
(instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
                             dest.fragment_instance_id = 
instanceExecParams.instanceId;
                             dest.server = toRpcHost(instanceExecParams.host);
@@ -1692,7 +1696,10 @@ public class Coordinator implements CoordInterface {
                         dest.server = dummyServer;
                         dest.setBrpcServer(dummyServer);
 
-                        for (FInstanceExecParam instanceExecParams : 
destParams.instanceExecParams) {
+                        int parallelTasksNum = 
destParams.ignoreDataDistribution
+                                ? destParams.parallelTasksNum : 
destParams.instanceExecParams.size();
+                        for (int insIdx = 0; insIdx < parallelTasksNum; 
insIdx++) {
+                            FInstanceExecParam instanceExecParams = 
destParams.instanceExecParams.get(insIdx);
                             if 
(instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
                                 dest.fragment_instance_id = 
instanceExecParams.instanceId;
                                 dest.server = 
toRpcHost(instanceExecParams.host);
@@ -1989,7 +1996,6 @@ public class Coordinator implements CoordInterface {
                     for (Integer planNodeId : value.keySet()) {
                         List<TScanRangeParams> perNodeScanRanges = 
value.get(planNodeId);
                         List<List<TScanRangeParams>> perInstanceScanRanges = 
Lists.newArrayList();
-                        List<Boolean> sharedScanOpts = Lists.newArrayList();
 
                         Optional<ScanNode> node = 
scanNodes.stream().filter(scanNode -> {
                             return scanNode.getId().asInt() == planNodeId;
@@ -2000,9 +2006,20 @@ public class Coordinator implements CoordInterface {
                         // 2. This fragment has a colocated scan node
                         // 3. This fragment has a FileScanNode
                         // 4. Disable shared scan optimization by session 
variable
-                        if (!enablePipelineEngine || (node.isPresent() && 
node.get().getShouldColoScan())
-                                || (node.isPresent() && node.get() instanceof 
FileScanNode)
-                                || (node.isPresent() && 
node.get().shouldDisableSharedScan(context))) {
+                        boolean sharedScan = true;
+                        if (node.isPresent() && 
(!node.get().shouldDisableSharedScan(context)
+                                || (node.get().ignoreScanDistribution(context) 
&& useNereids))) {
+                            int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
+                                    leftMostNode.getNumInstances());
+                            expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
+                            // if have limit and conjunts, only need 1 
instance to save cpu and
+                            // mem resource
+                            if (node.get().haveLimitAndConjunts()) {
+                                expectedInstanceNum = 1;
+                            }
+
+                            perInstanceScanRanges = 
Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
+                        } else {
                             int expectedInstanceNum = 1;
                             if (parallelExecInstanceNum > 1) {
                                 //the scan instance num should not larger than 
the tablets num
@@ -2010,38 +2027,27 @@ public class Coordinator implements CoordInterface {
                             }
                             // if have limit and conjunts, only need 1 
instance to save cpu and
                             // mem resource
-                            if (node.isPresent() && 
node.get().haveLimitAndConjunts()) {
+                            if (node.get().haveLimitAndConjunts()) {
                                 expectedInstanceNum = 1;
                             }
 
                             perInstanceScanRanges = 
ListUtil.splitBySize(perNodeScanRanges,
                                     expectedInstanceNum);
-                            sharedScanOpts = 
Collections.nCopies(perInstanceScanRanges.size(), false);
-                        } else {
-                            int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
-                                    leftMostNode.getNumInstances());
-                            expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
-                            // if have limit and conjunts, only need 1 
instance to save cpu and
-                            // mem resource
-                            if (node.isPresent() && 
node.get().haveLimitAndConjunts()) {
-                                expectedInstanceNum = 1;
-                            }
-
-                            perInstanceScanRanges = 
Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
-                            sharedScanOpts = 
Collections.nCopies(perInstanceScanRanges.size(), true);
+                            sharedScan = false;
                         }
 
                         LOG.debug("scan range number per instance is: {}", 
perInstanceScanRanges.size());
 
                         for (int j = 0; j < perInstanceScanRanges.size(); j++) 
{
                             List<TScanRangeParams> scanRangeParams = 
perInstanceScanRanges.get(j);
-                            boolean sharedScan = sharedScanOpts.get(j);
 
                             FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, key, 0, params);
                             instanceParam.perNodeScanRanges.put(planNodeId, 
scanRangeParams);
                             instanceParam.perNodeSharedScans.put(planNodeId, 
sharedScan);
                             params.instanceExecParams.add(instanceParam);
                         }
+                        params.ignoreDataDistribution = sharedScan;
+                        params.parallelTasksNum = sharedScan ? 1 : 
params.instanceExecParams.size();
                     }
                 }
             }
@@ -2156,74 +2162,8 @@ public class Coordinator implements CoordInterface {
 
     private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
             int parallelExecInstanceNum, FragmentExecParams params) {
-        Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(fragmentId);
-        BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdTobucketSeqToScanRangeMap.get(fragmentId);
-        Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
-
-        // 1. count each node in one fragment should scan how many tablet, 
gather them in one list
-        Map<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressToScanRanges
-                = Maps.newHashMap();
-        for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> 
scanRanges : bucketSeqToScanRange.entrySet()) {
-            TNetworkAddress address = 
bucketSeqToAddress.get(scanRanges.getKey());
-            Map<Integer, List<TScanRangeParams>> nodeScanRanges = 
scanRanges.getValue();
-
-            // We only care about the node scan ranges of scan nodes which 
belong to this fragment
-            Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = 
Maps.newHashMap();
-            for (Integer scanNodeId : nodeScanRanges.keySet()) {
-                if (scanNodeIds.contains(scanNodeId)) {
-                    filteredNodeScanRanges.put(scanNodeId, 
nodeScanRanges.get(scanNodeId));
-                }
-            }
-
-            // set bucket for scanRange, the pair is <bucket_num, 
map<scanNode_id, list<scanRange>>>>
-            // we should make sure
-            // 1. same bucket in some address be
-            // 2. different scanNode id scan different scanRange which belong 
to the scanNode id
-            // 3. split how many scanRange one instance should scan, same 
bucket do not split to different instance
-            Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
filteredScanRanges
-                    = Pair.of(scanRanges.getKey(), filteredNodeScanRanges);
-
-            if (!addressToScanRanges.containsKey(address)) {
-                addressToScanRanges.put(address, Lists.newArrayList());
-            }
-            addressToScanRanges.get(address).add(filteredScanRanges);
-        }
-        FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
-        for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressScanRange
-                : addressToScanRanges.entrySet()) {
-            List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> 
scanRange = addressScanRange.getValue();
-            Map<Integer, List<TScanRangeParams>> range
-                    = findOrInsert(assignment, addressScanRange.getKey(), new 
HashMap<>());
-            int expectedInstanceNum = 1;
-            if (parallelExecInstanceNum > 1) {
-                //the scan instance num should not larger than the tablets num
-                expectedInstanceNum = Math.min(scanRange.size(), 
parallelExecInstanceNum);
-            }
-
-            // 2.split how many scanRange one instance should scan
-            List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> 
perInstanceScanRanges
-                    = ListUtil.splitBySize(scanRange, expectedInstanceNum);
-
-            // 3.construct instanceExecParam add the scanRange should be scan 
by instance
-            for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> 
perInstanceScanRange
-                    : perInstanceScanRanges) {
-                FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
-
-                for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : perInstanceScanRange) {
-                    instanceParam.bucketSeqSet.add(nodeScanRangeMap.first);
-                    for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange
-                            : nodeScanRangeMap.second.entrySet()) {
-                        if 
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
-                            range.put(nodeScanRange.getKey(), 
Lists.newArrayList());
-                            
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), 
Lists.newArrayList());
-                        }
-                        
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
-                        
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
-                    }
-                }
-                params.instanceExecParams.add(instanceParam);
-            }
-        }
+        assignScanRanges(fragmentId, parallelExecInstanceNum, params, 
fragmentIdTobucketSeqToScanRangeMap,
+                fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
     }
 
     private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() {
@@ -2882,44 +2822,81 @@ public class Coordinator implements CoordInterface {
 
         private void computeInstanceParam(PlanFragmentId fragmentId,
                 int parallelExecInstanceNum, FragmentExecParams params) {
-            Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(fragmentId);
-            BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
-            Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
-
-            // 1. count each node in one fragment should scan how many tablet, 
gather them in one list
-            Map<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressToScanRanges
-                    = Maps.newHashMap();
-            for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> 
scanRanges
-                    : bucketSeqToScanRange.entrySet()) {
-                TNetworkAddress address = 
bucketSeqToAddress.get(scanRanges.getKey());
-                Map<Integer, List<TScanRangeParams>> nodeScanRanges = 
scanRanges.getValue();
-                // We only care about the node scan ranges of scan nodes which 
belong to this fragment
-                Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = 
Maps.newHashMap();
-                for (Integer scanNodeId : nodeScanRanges.keySet()) {
-                    if (scanNodeIds.contains(scanNodeId)) {
-                        filteredNodeScanRanges.put(scanNodeId, 
nodeScanRanges.get(scanNodeId));
-                    }
-                }
-                Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
filteredScanRanges
-                        = Pair.of(scanRanges.getKey(), filteredNodeScanRanges);
+            assignScanRanges(fragmentId, parallelExecInstanceNum, params, 
fragmentIdBucketSeqToScanRangeMap,
+                    fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+        }
+    }
+
+    private void assignScanRanges(PlanFragmentId fragmentId, int 
parallelExecInstanceNum, FragmentExecParams params,
+            Map<PlanFragmentId, BucketSeqToScanRange> 
fragmentIdBucketSeqToScanRangeMap,
+            Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
curFragmentIdToSeqToAddressMap,
+            Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) {
+        Map<Integer, TNetworkAddress> bucketSeqToAddress = 
curFragmentIdToSeqToAddressMap.get(fragmentId);
+        BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
+        Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
+
+        boolean ignoreScanDistribution = scanNodes.stream().filter(scanNode -> 
{
+            return scanNodeIds.contains(scanNode.getId().asInt());
+        }).allMatch(node -> node.ignoreScanDistribution(context)) && 
useNereids;
 
-                if (!addressToScanRanges.containsKey(address)) {
-                    addressToScanRanges.put(address, Lists.newArrayList());
+        // 1. count each node in one fragment should scan how many tablet, 
gather them in one list
+        Map<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressToScanRanges
+                = Maps.newHashMap();
+        for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> 
scanRanges
+                : bucketSeqToScanRange.entrySet()) {
+            TNetworkAddress address = 
bucketSeqToAddress.get(scanRanges.getKey());
+            Map<Integer, List<TScanRangeParams>> nodeScanRanges = 
scanRanges.getValue();
+            // We only care about the node scan ranges of scan nodes which 
belong to this fragment
+            Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = 
Maps.newHashMap();
+            for (Integer scanNodeId : nodeScanRanges.keySet()) {
+                if (scanNodeIds.contains(scanNodeId)) {
+                    filteredNodeScanRanges.put(scanNodeId, 
nodeScanRanges.get(scanNodeId));
                 }
-                addressToScanRanges.get(address).add(filteredScanRanges);
             }
-            FragmentScanRangeAssignment assignment = 
params.scanRangeAssignment;
-            for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressScanRange
-                    : addressToScanRanges.entrySet()) {
-                List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> 
scanRange = addressScanRange.getValue();
-                Map<Integer, List<TScanRangeParams>> range
-                        = findOrInsert(assignment, addressScanRange.getKey(), 
new HashMap<>());
+            Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
filteredScanRanges
+                    = Pair.of(scanRanges.getKey(), filteredNodeScanRanges);
+
+            if (!addressToScanRanges.containsKey(address)) {
+                addressToScanRanges.put(address, Lists.newArrayList());
+            }
+            addressToScanRanges.get(address).add(filteredScanRanges);
+        }
+        FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
+        for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressScanRange
+                : addressToScanRanges.entrySet()) {
+            List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> 
scanRange = addressScanRange.getValue();
+            Map<Integer, List<TScanRangeParams>> range
+                    = findOrInsert(assignment, addressScanRange.getKey(), new 
HashMap<>());
+
+            if (ignoreScanDistribution) {
+                FInstanceExecParam instanceParam = new FInstanceExecParam(
+                        null, addressScanRange.getKey(), 0, params);
+
+                for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : scanRange) {
+                    instanceParam.addBucketSeq(nodeScanRangeMap.first);
+                    for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange
+                            : nodeScanRangeMap.second.entrySet()) {
+                        if 
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
+                            range.put(nodeScanRange.getKey(), 
Lists.newArrayList());
+                            
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), 
Lists.newArrayList());
+                            
instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true);
+                        }
+                        
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
+                        
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
+                                .addAll(nodeScanRange.getValue());
+                    }
+                }
+                params.instanceExecParams.add(instanceParam);
+                for (int i = 1; i < parallelExecInstanceNum; i++) {
+                    params.instanceExecParams.add(new FInstanceExecParam(
+                            null, addressScanRange.getKey(), 0, params));
+                }
+            } else {
                 int expectedInstanceNum = 1;
                 if (parallelExecInstanceNum > 1) {
                     //the scan instance num should not larger than the tablets 
num
                     expectedInstanceNum = Math.min(scanRange.size(), 
parallelExecInstanceNum);
                 }
-
                 // 2. split how many scanRange one instance should scan
                 List<List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> perInstanceScanRanges
                         = ListUtil.splitBySize(scanRange, expectedInstanceNum);
@@ -2947,6 +2924,8 @@ public class Coordinator implements CoordInterface {
                 }
             }
         }
+        params.parallelTasksNum = ignoreScanDistribution ? 1 : 
params.instanceExecParams.size();
+        params.ignoreDataDistribution = ignoreScanDistribution;
     }
 
     private final Map<PlanFragmentId, BucketSeqToScanRange> 
fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap();
@@ -3561,6 +3540,8 @@ public class Coordinator implements CoordInterface {
     // used to assemble TPlanFragmentExecParams
     protected class FragmentExecParams {
         public PlanFragment fragment;
+        public int parallelTasksNum = 0;
+        public boolean ignoreDataDistribution = false;
         public List<TPlanFragmentDestination> destinations = 
Lists.newArrayList();
         public Map<Integer, Integer> perExchNumSenders = Maps.newHashMap();
 
@@ -3705,6 +3686,9 @@ public class Coordinator implements CoordInterface {
                     params.setFileScanParams(fileScanRangeParamsMap);
                     params.setNumBuckets(fragment.getBucketNum());
                     params.setPerNodeSharedScans(perNodeSharedScans);
+                    if (ignoreDataDistribution) {
+                        params.setParallelInstances(parallelTasksNum);
+                    }
                     res.put(instanceExecParam.host, params);
                     
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, 
Integer>());
                     instanceIdx.put(instanceExecParam.host, 0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index df1f093cfb6..af81911dbaf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -219,6 +219,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String ENABLE_PIPELINE_X_ENGINE = 
"enable_pipeline_x_engine";
 
     public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";
+    public static final String IGNORE_SCAN_DISTRIBUTION = 
"ignore_scan_distribution";
 
     public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
 
@@ -783,6 +784,10 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true)
     private boolean enableSharedScan = false;
 
+    @VariableMgr.VarAttr(name = IGNORE_SCAN_DISTRIBUTION, fuzzy = false, 
varType = VariableAnnotation.EXPERIMENTAL,
+            needForward = true)
+    private boolean ignoreScanDistribution = false;
+
     @VariableMgr.VarAttr(
             name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
             description = {"是否在pipelineX引擎上开启local shuffle优化",
@@ -3143,4 +3148,12 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean isMaterializedViewRewriteEnableContainForeignTable() {
         return materializedViewRewriteEnableContainForeignTable;
     }
+
+    public boolean isIgnoreScanDistribution() {
+        return ignoreScanDistribution && getEnablePipelineXEngine() && 
enableLocalShuffle;
+    }
+
+    public void setIgnoreScanDistribution(boolean ignoreScanDistribution) {
+        this.ignoreScanDistribution = ignoreScanDistribution;
+    }
 }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 177bec22059..14e84c4bdc9 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -692,6 +692,7 @@ struct TPipelineFragmentParams {
   34: optional i32 num_buckets
   35: optional map<i32, i32> bucket_seq_to_instance_idx
   36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
+  37: optional i32 parallel_instances
 }
 
 struct TPipelineFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to