This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 09ae37dcc51535ee71b15e4ce3ad7036046637c6
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Jan 26 10:10:51 2024 +0800

    [pipelineX](localexchange) Adjust local exchange plan rule (#30393)
---
 .../main/java/org/apache/doris/planner/ScanNode.java   |  5 +++--
 .../src/main/java/org/apache/doris/qe/Coordinator.java | 18 ++++++++++--------
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 9881fe6f6e3..3633f2d1fd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -720,12 +720,13 @@ public abstract class ScanNode extends PlanNode {
                 || getShouldColoScan();
     }
 
-    public boolean ignoreStorageDataDistribution(ConnectContext context) {
+    public boolean ignoreStorageDataDistribution(ConnectContext context, int 
numBackends) {
         return context != null
                 && 
context.getSessionVariable().isIgnoreStorageDataDistribution()
                 && context.getSessionVariable().getEnablePipelineXEngine()
                 && !fragment.isHasNullAwareLeftAntiJoin()
-                && getScanRangeNum() < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+                && getScanRangeNum()
+                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * 
numBackends;
     }
 
     public int getScanRangeNum() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 222fc37a429..099aedd7e17 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2036,15 +2036,16 @@ public class Coordinator implements CoordInterface {
 
                         /**
                          * Ignore storage data distribution iff:
-                         * 1. Current fragment is not forced to use data 
distribution.
-                         * 2. `parallelExecInstanceNum` is larger than scan 
ranges.
-                         * 3. Use Nereids planner.
+                         * 1. `parallelExecInstanceNum * numBackends` is 
larger than scan ranges.
+                         * 2. Use Nereids planner.
                          */
                         boolean sharedScan = true;
                         int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
                                 leftMostNode.getNumInstances());
                         boolean ignoreStorageDataDistribution = 
scanNodes.stream()
-                                .allMatch(scanNode -> 
scanNode.ignoreStorageDataDistribution(context)) && useNereids;
+                                .allMatch(scanNode -> 
scanNode.ignoreStorageDataDistribution(context,
+                                        
fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId())
+                                                .scanRangeAssignment.size())) 
&& useNereids;
                         if (node.isPresent() && 
(!node.get().shouldDisableSharedScan(context)
                                 || ignoreStorageDataDistribution)) {
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
@@ -2913,12 +2914,13 @@ public class Coordinator implements CoordInterface {
 
         /**
          * Ignore storage data distribution iff:
-         * 1. Current fragment is not forced to use data distribution.
-         * 2. `parallelExecInstanceNum` is larger than scan ranges.
-         * 3. Use Nereids planner.
+         * 1. `parallelExecInstanceNum * numBackends` is larger than scan 
ranges.
+         * 2. Use Nereids planner.
          */
         boolean ignoreStorageDataDistribution = scanNodes.stream()
-                .allMatch(node -> node.ignoreStorageDataDistribution(context))
+                .allMatch(node -> node.ignoreStorageDataDistribution(context,
+                        
fragmentExecParamsMap.get(node.getFragment().getFragmentId())
+                                .scanRangeAssignment.size()))
                 && 
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
                     return addressScanRange.getValue().size() < 
parallelExecInstanceNum;
                 }) && useNereids;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to