This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4d4550c2522abeef2e8b5ecc8bc3e5e0c6b1d83d
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Jun 13 16:34:33 2024 +0800

    [Feature](sink) support parallel result sink (#36053)
    
    ## Proposed changes
    support parallel result sink
---
 be/src/pipeline/exec/result_file_sink_operator.cpp | 12 +++----
 be/src/pipeline/exec/result_file_sink_operator.h   |  4 +--
 be/src/pipeline/exec/result_sink_operator.cpp      | 20 +++++++----
 be/src/pipeline/exec/result_sink_operator.h        |  4 +--
 be/src/runtime/buffer_control_block.cpp            | 23 +++++++-----
 be/src/runtime/buffer_control_block.h              |  3 +-
 be/src/runtime/result_buffer_mgr.cpp               | 26 +++++++-------
 be/src/runtime/runtime_state.h                     |  4 ---
 .../nereids/properties/RequestPropertyDeriver.java |  7 +++-
 .../org/apache/doris/planner/PlanFragment.java     |  4 ---
 .../main/java/org/apache/doris/qe/Coordinator.java | 41 ++++++++++++++++------
 .../java/org/apache/doris/qe/ResultReceiver.java   | 13 +++++--
 .../java/org/apache/doris/qe/SessionVariable.java  | 20 ++++++++---
 .../apache/doris/nereids/DistributeHintTest.java   |  1 +
 .../org/apache/doris/planner/ColocatePlanTest.java |  2 ++
 .../planner/PaimonPredicateConverterTest.java      |  1 +
 .../doris/utframe/DemoMultiBackendsTest.java       |  1 +
 gensrc/thrift/PaloInternalService.thrift           |  4 ++-
 .../suites/empty_relation/eliminate_empty.groovy   |  1 +
 .../nereids_broadcast_shuffle_join/load.groovy     |  1 +
 .../nereids_clickbench_shape_p0/query20.groovy     |  1 +
 .../suites/nereids_p0/hint/multi_leading.groovy    |  1 +
 .../eliminate_outer_join.groovy                    |  1 +
 .../push_down_alias_through_join.groovy            |  1 +
 .../infer_set_operator_distinct.groovy             |  1 +
 .../merge_aggregate/merge_aggregate.groovy         |  1 +
 .../simplify_window_expression.groovy              |  1 +
 .../nereids_ssb_shape_sf100_p0/shape/flat.groovy   |  1 +
 .../push_filter_through_ptopn.groovy               |  1 +
 .../push_filter_through_window.groovy              |  1 +
 .../shape/query9.groovy                            |  1 +
 .../noStatsRfPrune/query9.groovy                   |  1 +
 .../no_stats_shape/query9.groovy                   |  1 +
 .../rf_prune/query9.groovy                         |  2 +-
 .../shape/query9.groovy                            |  1 +
 .../tpch/push_filter_window_eqset.groovy           |  1 +
 36 files changed, 139 insertions(+), 70 deletions(-)

diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 381e234e78b..0cd14899f52 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -124,8 +124,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
         std::mt19937 g(rd());
         shuffle(_channels.begin(), _channels.end(), g);
 
-        for (int i = 0; i < _channels.size(); ++i) {
-            RETURN_IF_ERROR(_channels[i]->init_stub(state));
+        for (auto& _channel : _channels) {
+            RETURN_IF_ERROR(_channel->init_stub(state));
         }
     }
     _writer->set_dependency(_async_writer_dependency.get(), 
_finish_dependency.get());
@@ -139,9 +139,9 @@ Status ResultFileSinkLocalState::open(RuntimeState* state) {
     auto& p = _parent->cast<ResultFileSinkOperatorX>();
     if (!p._is_top_sink) {
         int local_size = 0;
-        for (int i = 0; i < _channels.size(); ++i) {
-            RETURN_IF_ERROR(_channels[i]->open(state));
-            if (_channels[i]->is_local()) {
+        for (auto& _channel : _channels) {
+            RETURN_IF_ERROR(_channel->open(state));
+            if (_channel->is_local()) {
                 local_size++;
             }
         }
@@ -175,7 +175,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
         // close sender, this is normal path end
         if (_sender) {
             _sender->update_return_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
-            static_cast<void>(_sender->close(final_status));
+            RETURN_IF_ERROR(_sender->close(final_status));
         }
         state->exec_env()->result_mgr()->cancel_at_time(
                 time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index 9699011d3be..4fa31f615ce 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -17,8 +17,6 @@
 
 #pragma once
 
-#include <stdint.h>
-
 #include "operator.h"
 #include "vec/sink/writer/vfile_result_writer.h"
 
@@ -39,7 +37,7 @@ public:
     using Base = AsyncWriterSink<vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
     ENABLE_FACTORY_CREATOR(ResultFileSinkLocalState);
     ResultFileSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
-    ~ResultFileSinkLocalState();
+    ~ResultFileSinkLocalState() override;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index b8ae962ea28..24c5162c4f4 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -18,6 +18,7 @@
 #include "result_sink_operator.h"
 
 #include <memory>
+#include <utility>
 
 #include "common/object_pool.h"
 #include "exec/rowid_fetcher.h"
@@ -44,10 +45,13 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
     _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
 
-    // create sender
-    RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-            state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
-            state->execution_timeout()));
+    if (state->query_options().enable_parallel_result_sink) {
+        _sender = _parent->cast<ResultSinkOperatorX>()._sender;
+    } else {
+        RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
+                state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, 
&_sender,
+                state->execution_timeout()));
+    }
     _sender->set_dependency(_dependency->shared_from_this());
     return Status::OK();
 }
@@ -104,9 +108,6 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, 
const RowDescriptor& r
 }
 
 Status ResultSinkOperatorX::prepare(RuntimeState* state) {
-    auto fragment_instance_id = state->fragment_instance_id();
-    auto title = fmt::format("VDataBufferSender 
(dst_fragment_instance_id={:x}-{:x})",
-                             fragment_instance_id.hi, fragment_instance_id.lo);
     // prepare output_expr
     // From the thrift expressions create the real exprs.
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
@@ -118,6 +119,11 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
     }
     // Prepare the exprs to run.
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
+
+    if (state->query_options().enable_parallel_result_sink) {
+        RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
+                state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout()));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 12a6bd0b11f..0ccb7f4946b 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -17,8 +17,6 @@
 
 #pragma once
 
-#include <stdint.h>
-
 #include "operator.h"
 #include "runtime/buffer_control_block.h"
 #include "runtime/result_writer.h"
@@ -159,6 +157,8 @@ private:
 
     // for fetch data by rowids
     TFetchOption _fetch_option;
+
+    std::shared_ptr<BufferControlBlock> _sender = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index c823b7d886a..8ef23265e3f 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -138,7 +138,7 @@ Status 
BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
             }
             _buffer_rows += num_rows;
         } else {
-            auto ctx = _waiting_rpc.front();
+            auto* ctx = _waiting_rpc.front();
             _waiting_rpc.pop_front();
             ctx->on_data(result, _packet_num);
             _packet_num++;
@@ -252,6 +252,11 @@ Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
 
 Status BufferControlBlock::close(Status exec_status) {
     std::unique_lock<std::mutex> l(_lock);
+    close_cnt++;
+    if (close_cnt < _result_sink_dependencys.size()) {
+        return Status::OK();
+    }
+
     _is_close = true;
     _status = exec_status;
 
@@ -286,16 +291,18 @@ void BufferControlBlock::cancel() {
 
 void BufferControlBlock::set_dependency(
         std::shared_ptr<pipeline::Dependency> result_sink_dependency) {
-    _result_sink_dependency = result_sink_dependency;
+    _result_sink_dependencys.push_back(result_sink_dependency);
 }
 
 void BufferControlBlock::_update_dependency() {
-    if (_result_sink_dependency &&
-        (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled)) 
{
-        _result_sink_dependency->set_ready();
-    } else if (_result_sink_dependency &&
-               (!_batch_queue_empty && _buffer_rows < _buffer_limit && 
!_is_cancelled)) {
-        _result_sink_dependency->block();
+    if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) {
+        for (auto dependency : _result_sink_dependencys) {
+            dependency->set_ready();
+        }
+    } else if (!_batch_queue_empty && _buffer_rows < _buffer_limit && 
!_is_cancelled) {
+        for (auto dependency : _result_sink_dependencys) {
+            dependency->block();
+        }
     }
 }
 
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 98c84d4a72f..c8c240f928a 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -131,7 +131,8 @@ protected:
     // only used for FE using return rows to check limit
     std::unique_ptr<QueryStatistics> _query_statistics;
     std::atomic_bool _batch_queue_empty = false;
-    std::shared_ptr<pipeline::Dependency> _result_sink_dependency;
+    std::vector<std::shared_ptr<pipeline::Dependency>> 
_result_sink_dependencys;
+    size_t close_cnt = 0;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index bf6fb8a8f35..23f440d1909 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -96,13 +96,13 @@ Status ResultBufferMgr::create_sender(const TUniqueId& 
query_id, int buffer_size
 
 std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const 
TUniqueId& query_id) {
     std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
-    BufferMap::iterator iter = _buffer_map.find(query_id);
+    auto iter = _buffer_map.find(query_id);
 
     if (_buffer_map.end() != iter) {
         return iter->second;
     }
 
-    return std::shared_ptr<BufferControlBlock>();
+    return {};
 }
 
 void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
@@ -128,8 +128,7 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, 
GetResultBatchCtx* c
     tid.__set_lo(finst_id.lo());
     std::shared_ptr<BufferControlBlock> cb = find_control_block(tid);
     if (cb == nullptr) {
-        LOG(WARNING) << "no result for this query, id=" << print_id(tid);
-        ctx->on_failure(Status::InternalError("no result for this query"));
+        ctx->on_failure(Status::InternalError("no result for this query, 
tid={}", print_id(tid)));
         return;
     }
     cb->get_batch(ctx);
@@ -139,8 +138,7 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& 
finst_id,
                                          std::shared_ptr<arrow::RecordBatch>* 
result) {
     std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
     if (cb == nullptr) {
-        LOG(WARNING) << "no result for this query, id=" << print_id(finst_id);
-        return Status::InternalError("no result for this query");
+        return Status::InternalError("no result for this query, finst_id={}", 
print_id(finst_id));
     }
     RETURN_IF_ERROR(cb->get_arrow_batch(result));
     return Status::OK();
@@ -149,7 +147,7 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& 
finst_id,
 void ResultBufferMgr::cancel(const TUniqueId& query_id) {
     {
         std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
-        BufferMap::iterator iter = _buffer_map.find(query_id);
+        auto iter = _buffer_map.find(query_id);
 
         if (_buffer_map.end() != iter) {
             iter->second->cancel();
@@ -169,7 +167,7 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id) {
 
 void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& 
query_id) {
     std::lock_guard<std::mutex> l(_timeout_lock);
-    TimeoutMap::iterator iter = _timeout_map.find(cancel_time);
+    auto iter = _timeout_map.find(cancel_time);
 
     if (_timeout_map.end() == iter) {
         _timeout_map.insert(
@@ -189,11 +187,11 @@ void ResultBufferMgr::cancel_thread() {
         time_t now_time = time(nullptr);
         {
             std::lock_guard<std::mutex> l(_timeout_lock);
-            TimeoutMap::iterator end = _timeout_map.upper_bound(now_time + 1);
+            auto end = _timeout_map.upper_bound(now_time + 1);
 
-            for (TimeoutMap::iterator iter = _timeout_map.begin(); iter != 
end; ++iter) {
-                for (int i = 0; i < iter->second.size(); ++i) {
-                    query_to_cancel.push_back(iter->second[i]);
+            for (auto iter = _timeout_map.begin(); iter != end; ++iter) {
+                for (const auto& id : iter->second) {
+                    query_to_cancel.push_back(id);
                 }
             }
 
@@ -201,8 +199,8 @@ void ResultBufferMgr::cancel_thread() {
         }
 
         // cancel query
-        for (int i = 0; i < query_to_cancel.size(); ++i) {
-            cancel(query_to_cancel[i]);
+        for (const auto& id : query_to_cancel) {
+            cancel(id);
         }
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index af628ff9e89..d61bca2182e 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -367,10 +367,6 @@ public:
         return _query_options.return_object_data_as_binary;
     }
 
-    bool enable_exchange_node_parallel_merge() const {
-        return _query_options.enable_enable_exchange_node_parallel_merge;
-    }
-
     segment_v2::CompressionTypePB fragement_transmission_compression_type() 
const {
         if (_query_options.__isset.fragment_transmission_compression_codec) {
             if (_query_options.fragment_transmission_compression_codec == 
"lz4") {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 750707c52c4..0b4929e0a87 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -154,7 +154,12 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
 
     @Override
     public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> 
physicalResultSink, PlanContext context) {
-        addRequestPropertyToChildren(PhysicalProperties.GATHER);
+        if (context.getSessionVariable().enableParallelResultSink()
+                && !context.getStatementContext().isShortCircuitQuery()) {
+            addRequestPropertyToChildren(PhysicalProperties.ANY);
+        } else {
+            addRequestPropertyToChildren(PhysicalProperties.GATHER);
+        }
         return null;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index 2469d087cdd..7418e15bdc8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -248,10 +248,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         return hasColocatePlanNode;
     }
 
-    public void setDataPartition(DataPartition dataPartition) {
-        this.dataPartition = dataPartition;
-    }
-
     /**
      * Finalize plan tree and create stream sink, if needed.
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a191de9ba80..3f379232bb1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -215,7 +215,7 @@ public class Coordinator implements CoordInterface {
 
     private final Map<Pair<Integer, Long>, PipelineExecContext> 
pipelineExecContexts = new HashMap<>();
     private final List<PipelineExecContext> needCheckPipelineExecContexts = 
Lists.newArrayList();
-    private ResultReceiver receiver;
+    private List<ResultReceiver> receivers = Lists.newArrayList();
     protected final List<ScanNode> scanNodes;
     private int scanRangeNum = 0;
     // number of instances of this query, equals to
@@ -682,13 +682,27 @@ public class Coordinator implements CoordInterface {
         DataSink topDataSink = topParams.fragment.getSink();
         this.timeoutDeadline = System.currentTimeMillis() + 
queryOptions.getExecutionTimeout() * 1000L;
         if (topDataSink instanceof ResultSink || topDataSink instanceof 
ResultFileSink) {
+            Boolean enableParallelResultSink = 
queryOptions.isEnableParallelResultSink()
+                    && topDataSink instanceof ResultSink;
             TNetworkAddress execBeAddr = 
topParams.instanceExecParams.get(0).host;
-            receiver = new ResultReceiver(queryId, 
topParams.instanceExecParams.get(0).instanceId,
-                    addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline,
-                    
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
+            Set<TNetworkAddress> addrs = new HashSet<>();
+            for (FInstanceExecParam param : topParams.instanceExecParams) {
+                if (addrs.contains(param.host)) {
+                    continue;
+                }
+                addrs.add(param.host);
+                receivers.add(new ResultReceiver(queryId, param.instanceId, 
addressToBackendID.get(param.host),
+                        toBrpcHost(param.host), this.timeoutDeadline,
+                        
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), 
enableParallelResultSink));
+            }
 
             if (!context.isReturnResultFromLocal()) {
                 
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
+                if (enableParallelResultSink) {
+                    context.setFinstId(queryId);
+                } else {
+                    
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
+                }
                 
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
                 
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
                 context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
@@ -1084,13 +1098,13 @@ public class Coordinator implements CoordInterface {
 
     @Override
     public RowBatch getNext() throws Exception {
-        if (receiver == null) {
+        if (receivers.isEmpty()) {
             throw new UserException("There is no receiver.");
         }
 
         RowBatch resultBatch;
         Status status = new Status();
-        resultBatch = receiver.getNext(status);
+        resultBatch = receivers.get(receivers.size() - 1).getNext(status);
         if (!status.ok()) {
             LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
                     DebugUtil.printId(queryId), status.getErrorMsg());
@@ -1129,7 +1143,12 @@ public class Coordinator implements CoordInterface {
         }
 
         if (resultBatch.isEos()) {
-            this.returnedAllResults = true;
+            receivers.remove(receivers.size() - 1);
+            if (receivers.isEmpty()) {
+                returnedAllResults = true;
+            } else {
+                resultBatch.setEos(false);
+            }
 
             // if this query is a block query do not cancel.
             Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
@@ -1250,7 +1269,7 @@ public class Coordinator implements CoordInterface {
     }
 
     private void cancelInternal(Status cancelReason) {
-        if (null != receiver) {
+        for (ResultReceiver receiver : receivers) {
             receiver.cancel(cancelReason);
         }
         if (null != pointExec) {
@@ -1814,9 +1833,9 @@ public class Coordinator implements CoordInterface {
                                 leftMostNode.getNumInstances());
                         boolean forceToLocalShuffle = context != null
                                 && 
context.getSessionVariable().isForceToLocalShuffle();
-                        boolean ignoreStorageDataDistribution = 
forceToLocalShuffle || (scanNodes.stream()
-                                .allMatch(scanNode -> 
scanNode.ignoreStorageDataDistribution(context,
-                                        addressToBackendID.size())) && 
useNereids);
+                        boolean ignoreStorageDataDistribution = 
forceToLocalShuffle || (scanNodes.stream().allMatch(
+                                scanNode -> 
scanNode.ignoreStorageDataDistribution(context, addressToBackendID.size()))
+                                && useNereids);
                         if (node.isPresent() && 
(!node.get().shouldDisableSharedScan(context)
                                 || ignoreStorageDataDistribution)) {
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 6637509d4f2..b591a5d3c6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -54,17 +54,26 @@ public class ResultReceiver {
     private Long backendId;
     private Thread currentThread;
     private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = 
null;
+    private Boolean enableParallelResultSink = false;
 
     int maxMsgSizeOfResultReceiver;
 
     public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, 
TNetworkAddress address, long timeoutTs,
-            int maxMsgSizeOfResultReceiver) {
+            int maxMsgSizeOfResultReceiver, Boolean enableParallelResultSink) {
         this.queryId = 
Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
         this.finstId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
         this.backendId = backendId;
         this.address = address;
         this.timeoutTs = timeoutTs;
         this.maxMsgSizeOfResultReceiver = maxMsgSizeOfResultReceiver;
+        this.enableParallelResultSink = enableParallelResultSink;
+    }
+
+    Types.PUniqueId getRealFinstId() {
+        if (enableParallelResultSink) {
+            return queryId;
+        }
+        return finstId;
     }
 
     public RowBatch getNext(Status status) throws TException {
@@ -75,7 +84,7 @@ public class ResultReceiver {
         try {
             while (!isDone && runStatus.ok()) {
                 InternalService.PFetchDataRequest request = 
InternalService.PFetchDataRequest.newBuilder()
-                        .setFinstId(finstId)
+                        .setFinstId(getRealFinstId())
                         .setRespInAttachment(false)
                         .build();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 51a63a2ee78..23ab0fb2701 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -132,7 +132,6 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String INSERT_MAX_FILTER_RATIO = 
"insert_max_filter_ratio";
     public static final String ENABLE_SPILLING = "enable_spilling";
     public static final String ENABLE_SHORT_CIRCUIT_QUERY = 
"enable_short_circuit_point_query";
-    public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = 
"enable_exchange_node_parallel_merge";
 
     public static final String ENABLE_SERVER_SIDE_PREPARED_STATEMENT = 
"enable_server_side_prepared_statement";
     public static final String PREFER_JOIN_METHOD = "prefer_join_method";
@@ -194,6 +193,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = 
"enable_sync_runtime_filter_size";
 
+    public static final String ENABLE_PARALLEL_RESULT_SINK = 
"enable_parallel_result_sink";
+
     public static final String BE_NUMBER_FOR_TEST = "be_number_for_test";
 
     // max ms to wait transaction publish finish when exec insert stmt.
@@ -655,9 +656,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_SHORT_CIRCUIT_QUERY)
     public boolean enableShortCircuitQuery = true;
 
-    @VariableMgr.VarAttr(name = ENABLE_EXCHANGE_NODE_PARALLEL_MERGE)
-    public boolean enableExchangeNodeParallelMerge = false;
-
     // By default, the number of Limit items after OrderBy is changed from 
65535 items
     // before v1.2.0 (not included), to return all items by default
     @VariableMgr.VarAttr(name = DEFAULT_ORDER_BY_LIMIT)
@@ -1052,6 +1050,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward = 
true)
     private boolean enableSyncRuntimeFilterSize = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_PARALLEL_RESULT_SINK, needForward = 
true, fuzzy = true)
+    private boolean enableParallelResultSink = true;
+
     @VariableMgr.VarAttr(name = USE_RF_DEFAULT)
     public boolean useRuntimeFilterDefaultSize = false;
 
@@ -1946,6 +1947,7 @@ public class SessionVariable implements Serializable, 
Writable {
         this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 : 
1048576;
         this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
         // this.enableHashJoinEarlyStartProbe = random.nextBoolean();
+        this.enableParallelResultSink = random.nextBoolean();
         int randomInt = random.nextInt(4);
         if (randomInt % 2 == 0) {
             this.rewriteOrToInPredicateThreshold = 100000;
@@ -3341,7 +3343,6 @@ public class SessionVariable implements Serializable, 
Writable {
         }
 
         tResult.setEnableSpilling(enableSpilling);
-        
tResult.setEnableEnableExchangeNodeParallelMerge(enableExchangeNodeParallelMerge);
 
         tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs);
         tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum);
@@ -3422,6 +3423,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
 
         tResult.setEnableLocalMergeSort(enableLocalMergeSort);
+        tResult.setEnableParallelResultSink(enableParallelResultSink);
         return tResult;
     }
 
@@ -3765,6 +3767,14 @@ public class SessionVariable implements Serializable, 
Writable {
         return enablePipelineXEngine || enablePipelineEngine;
     }
 
+    public boolean enableParallelResultSink() {
+        return enableParallelResultSink;
+    }
+
+    public void setParallelResultSink(Boolean enableParallelResultSink) {
+        this.enableParallelResultSink   = enableParallelResultSink;
+    }
+
     public boolean enableSyncRuntimeFilterSize() {
         return enableSyncRuntimeFilterSize;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/DistributeHintTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/DistributeHintTest.java
index e97a9c75a87..4405075b039 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/DistributeHintTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/DistributeHintTest.java
@@ -45,6 +45,7 @@ class DistributeHintTest extends TestWithFeService implements 
MemoPatternMatchSu
         createDatabase("test");
         useDatabase("test");
         
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+        connectContext.getSessionVariable().setParallelResultSink(false);
 
         createTable("CREATE TABLE `t1` (\n"
                 + "  `a` int(11) NULL,\n"
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
index c5275c20f0d..5110ea2fde1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
@@ -151,6 +151,7 @@ public class ColocatePlanTest extends TestWithFeService {
     // 2. scan node with two tablet one instance
     @Test
     public void sqlAggWithColocateTable() throws Exception {
+        connectContext.getSessionVariable().setParallelResultSink(false);
         String sql = "select k1, k2, count(*) from db1.test_multi_partition 
where k2 = 1 group by k1, k2";
         StmtExecutor executor = getSqlStmtExecutor(sql);
         Planner planner = executor.planner();
@@ -186,6 +187,7 @@ public class ColocatePlanTest extends TestWithFeService {
     // Fix #8778
     @Test
     public void rollupAndMoreThanOneInstanceWithoutColocate() throws Exception 
{
+        connectContext.getSessionVariable().setParallelResultSink(false);
         String createColocateTblStmtStr = "create table 
db1.test_colocate_one_backend(k1 int, k2 int, k3 int, k4 int) "
                 + "distributed by hash(k1, k2, k3) buckets 10 
properties('replication_num' = '1');";
         createTable(createColocateTblStmtStr);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonPredicateConverterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonPredicateConverterTest.java
index 474f16ca0fc..fde1b6f74c2 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonPredicateConverterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonPredicateConverterTest.java
@@ -55,6 +55,7 @@ public class PaimonPredicateConverterTest extends 
TestWithFeService {
         DataField paimonFieldV1 = new DataField(2, "v1", new IntType());
         RowType rowType = new RowType(Lists.newArrayList(paimonFieldK1, 
paimonFieldK2, paimonFieldV1));
         PaimonPredicateConverter converter = new 
PaimonPredicateConverter(rowType);
+        connectContext.getSessionVariable().setParallelResultSink(false);
 
         // k1=1
         String sql1 = "SELECT * from db1.tbl1 where k1 = 1";
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
index 4d9ed685fda..7481e9ffd19 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
@@ -125,6 +125,7 @@ public class DemoMultiBackendsTest {
     public void testCreateDbAndTable() throws Exception {
         // 1. create connect context
         ConnectContext ctx = UtFrameUtils.createDefaultCtx();
+        ctx.getSessionVariable().setParallelResultSink(false);
         // 2. create database db1
         String createDbStmtStr = "create database db1;";
         CreateDbStmt createDbStmt = (CreateDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 3619bf9d97d..439c666349d 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -143,7 +143,7 @@ struct TQueryOptions {
   // whether enable spilling to disk
   31: optional bool enable_spilling = false;
   // whether enable parallel merge in exchange node
-  32: optional bool enable_enable_exchange_node_parallel_merge = false;
+  32: optional bool enable_enable_exchange_node_parallel_merge = false; // 
deprecated
 
   // Time in ms to wait until runtime filters are delivered.
   33: optional i32 runtime_filter_wait_time_ms = 1000
@@ -301,6 +301,8 @@ struct TQueryOptions {
   112: optional i32 max_column_reader_num = 0
 
   113: optional bool enable_local_merge_sort = false;
+
+  114: optional bool enable_parallel_result_sink = false;
   
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false
diff --git a/regression-test/suites/empty_relation/eliminate_empty.groovy 
b/regression-test/suites/empty_relation/eliminate_empty.groovy
index 2399523ee3d..47246b633ca 100644
--- a/regression-test/suites/empty_relation/eliminate_empty.groovy
+++ b/regression-test/suites/empty_relation/eliminate_empty.groovy
@@ -25,6 +25,7 @@ suite("eliminate_empty") {
         SET enable_fallback_to_original_planner=false;
         set disable_nereids_rules='PRUNE_EMPTY_PARTITION';
         set forbid_unknown_col_stats=false;
+        set enable_parallel_result_sink=false;
     """
     qt_onerow_union """
         select * from (select 1, 2 union select 3, 4) T order by 1, 2
diff --git a/regression-test/suites/nereids_broadcast_shuffle_join/load.groovy 
b/regression-test/suites/nereids_broadcast_shuffle_join/load.groovy
index 3c0fc19b119..9e3c715cf8c 100644
--- a/regression-test/suites/nereids_broadcast_shuffle_join/load.groovy
+++ b/regression-test/suites/nereids_broadcast_shuffle_join/load.groovy
@@ -17,6 +17,7 @@
 
 suite("broadcastJoin") {
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
     String database = context.config.getDbNameByFile(context.file)
     sql "drop database if exists ${database}"
     sql "create database ${database}"
diff --git a/regression-test/suites/nereids_clickbench_shape_p0/query20.groovy 
b/regression-test/suites/nereids_clickbench_shape_p0/query20.groovy
index cf50bf0cfb3..bf98e0bc821 100644
--- a/regression-test/suites/nereids_clickbench_shape_p0/query20.groovy
+++ b/regression-test/suites/nereids_clickbench_shape_p0/query20.groovy
@@ -22,6 +22,7 @@ suite("query20") {
     sql 'set enable_nereids_planner=true'
     sql 'set enable_fallback_to_original_planner=false'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
 
     sql 'set topn_opt_limit_threshold = 1024'
     def ckBench = """SELECT UserID FROM hits WHERE UserID = 
435090932899640449"""
diff --git a/regression-test/suites/nereids_p0/hint/multi_leading.groovy 
b/regression-test/suites/nereids_p0/hint/multi_leading.groovy
index bcefcfef141..9ea97cc024e 100644
--- a/regression-test/suites/nereids_p0/hint/multi_leading.groovy
+++ b/regression-test/suites/nereids_p0/hint/multi_leading.groovy
@@ -21,6 +21,7 @@ suite("multi_leading") {
     sql 'DROP DATABASE IF EXISTS test_multi_leading'
     sql 'CREATE DATABASE IF NOT EXISTS test_multi_leading'
     sql 'use test_multi_leading'
+    sql "set enable_parallel_result_sink=false;"
 
     // setting planner to nereids
     sql 'set exec_mem_limit=21G'
diff --git 
a/regression-test/suites/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.groovy
 
b/regression-test/suites/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.groovy
index 81492dd7d48..d2cfae6eeb0 100644
--- 
a/regression-test/suites/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.groovy
@@ -21,6 +21,7 @@ suite("eliminate_outer_join") {
     sql "SET enable_fallback_to_original_planner=false"
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     sql 'set be_number_for_test=3'
+    sql "set enable_parallel_result_sink=false;"
 
     sql """
         DROP TABLE IF EXISTS t
diff --git 
a/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_alias_through_join.groovy
 
b/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_alias_through_join.groovy
index 021d6056951..c54fd49f47f 100644
--- 
a/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_alias_through_join.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_alias_through_join.groovy
@@ -23,6 +23,7 @@ suite("push_down_alias_through_join") {
     sql "use regression_test_nereids_rules_p0"
     sql "set disable_join_reorder=true"
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
 
     // Push alias through inner join where condition not use alias
     qt_pushdown_inner_join"""
diff --git 
a/regression-test/suites/nereids_rules_p0/infer_set_operator_distinct/infer_set_operator_distinct.groovy
 
b/regression-test/suites/nereids_rules_p0/infer_set_operator_distinct/infer_set_operator_distinct.groovy
index e653b5eccab..930c6578a81 100644
--- 
a/regression-test/suites/nereids_rules_p0/infer_set_operator_distinct/infer_set_operator_distinct.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/infer_set_operator_distinct/infer_set_operator_distinct.groovy
@@ -20,6 +20,7 @@ suite("test_infer_set_operator_distinct") {
     sql "set runtime_filter_mode=OFF"
     sql "SET enable_fallback_to_original_planner=false"
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
 
 
     sql """
diff --git 
a/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy
 
b/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy
index 4a20cf4d68b..a824475c570 100644
--- 
a/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy
@@ -18,6 +18,7 @@ suite("merge_aggregate") {
     sql "SET enable_nereids_planner=true"
     sql "SET enable_fallback_to_original_planner=false"
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
 
     sql """
           DROP TABLE IF EXISTS mal_test1
diff --git 
a/regression-test/suites/nereids_rules_p0/simplify_window_expression/simplify_window_expression.groovy
 
b/regression-test/suites/nereids_rules_p0/simplify_window_expression/simplify_window_expression.groovy
index 3e247b2a78f..10c76049e8e 100644
--- 
a/regression-test/suites/nereids_rules_p0/simplify_window_expression/simplify_window_expression.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/simplify_window_expression/simplify_window_expression.groovy
@@ -18,6 +18,7 @@
 suite("simplify_window_expression") {
     sql "SET enable_nereids_planner=true"
     sql "SET enable_fallback_to_original_planner=false"
+    sql "set enable_parallel_result_sink=false;"
     sql """
           DROP TABLE IF EXISTS mal_test_simplify_window
          """
diff --git 
a/regression-test/suites/nereids_ssb_shape_sf100_p0/shape/flat.groovy 
b/regression-test/suites/nereids_ssb_shape_sf100_p0/shape/flat.groovy
index 3f685edbfb9..a74f05c54c6 100644
--- a/regression-test/suites/nereids_ssb_shape_sf100_p0/shape/flat.groovy
+++ b/regression-test/suites/nereids_ssb_shape_sf100_p0/shape/flat.groovy
@@ -25,6 +25,7 @@ suite("q1.1") {
     sql 'set exec_mem_limit=21G' 
     sql 'SET enable_pipeline_engine = true'
     sql 'set parallel_pipeline_task_num=8'
+    sql "set enable_parallel_result_sink=false;"
     
 sql 'set be_number_for_test=3'
 sql 'set enable_runtime_filter_prune=false'
diff --git 
a/regression-test/suites/nereids_syntax_p0/push_filter_through_ptopn.groovy 
b/regression-test/suites/nereids_syntax_p0/push_filter_through_ptopn.groovy
index 162e71074fd..739517182f8 100644
--- a/regression-test/suites/nereids_syntax_p0/push_filter_through_ptopn.groovy
+++ b/regression-test/suites/nereids_syntax_p0/push_filter_through_ptopn.groovy
@@ -18,6 +18,7 @@
 suite("push_filter_through_ptopn") {
     sql """set enable_nereids_planner=true"""
     sql """set enable_partition_topn=true"""
+    sql "set enable_parallel_result_sink=false;"
     sql """
         DROP TABLE IF EXISTS push_filter_through_ptopn_tbl
     """
diff --git 
a/regression-test/suites/nereids_syntax_p0/push_filter_through_window.groovy 
b/regression-test/suites/nereids_syntax_p0/push_filter_through_window.groovy
index 8c10c045391..c43f094bdb8 100644
--- a/regression-test/suites/nereids_syntax_p0/push_filter_through_window.groovy
+++ b/regression-test/suites/nereids_syntax_p0/push_filter_through_window.groovy
@@ -17,6 +17,7 @@
 
 suite("push_filter_through_window") {
     sql """set enable_nereids_planner=true"""
+    sql "set enable_parallel_result_sink=false;"
     sql """
         DROP TABLE IF EXISTS push_filter_through_window_tbl
     """
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/shape/query9.groovy 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/shape/query9.groovy
index 220cb0682a9..f950a4f4776 100644
--- a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/shape/query9.groovy
+++ b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/shape/query9.groovy
@@ -32,6 +32,7 @@ suite("query9") {
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
 
     def ds = """select case when (select count(*) 
                   from store_sales 
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query9.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query9.groovy
index e3fd19dd569..18b92e42c5f 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query9.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query9.groovy
@@ -33,6 +33,7 @@ sql 'set enable_runtime_filter_prune=true'
     sql 'set enable_nereids_timeout = false'
     sql 'SET enable_pipeline_engine = true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
 
     qt_ds_shape_9 '''
     explain shape plan
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query9.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query9.groovy
index ac3262a5f1a..cedb73f4d68 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query9.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query9.groovy
@@ -33,6 +33,7 @@ sql 'set enable_runtime_filter_prune=false'
     sql 'set enable_nereids_timeout = false'
     sql 'SET enable_pipeline_engine = true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
 
     qt_ds_shape_9 '''
     explain shape plan
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/rf_prune/query9.groovy 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/rf_prune/query9.groovy
index f2fc8a59402..ed0cf228f02 100644
--- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/rf_prune/query9.groovy
+++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/rf_prune/query9.groovy
@@ -31,7 +31,7 @@ suite("query9") {
     sql 'set runtime_filter_type=8'
     sql 'set enable_runtime_filter_prune=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
-
+    sql "set enable_parallel_result_sink=false;"
 
     def ds = """select case when (select count(*) 
                   from store_sales 
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query9.groovy 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query9.groovy
index 60beccf62ce..c49925dda42 100644
--- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query9.groovy
+++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query9.groovy
@@ -31,6 +31,7 @@ suite("query9") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
+    sql "set enable_parallel_result_sink=false;"
 
     def ds = """select case when (select count(*) 
                   from store_sales 
diff --git 
a/regression-test/suites/nereids_tpch_p0/tpch/push_filter_window_eqset.groovy 
b/regression-test/suites/nereids_tpch_p0/tpch/push_filter_window_eqset.groovy
index f0dffcf7d4f..02e5e7b4ff4 100644
--- 
a/regression-test/suites/nereids_tpch_p0/tpch/push_filter_window_eqset.groovy
+++ 
b/regression-test/suites/nereids_tpch_p0/tpch/push_filter_window_eqset.groovy
@@ -22,6 +22,7 @@ suite("push_filter_window_eqset") {
     sql "use ${db}"
     sql 'set enable_nereids_planner=true'
     sql 'set enable_fallback_to_original_planner=false';
+    sql "set enable_parallel_result_sink=false;"
 
     /**
     check the filter is pushed through window


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to