This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 09ae37dcc51535ee71b15e4ce3ad7036046637c6 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Jan 26 10:10:51 2024 +0800 [pipelineX](localexchange) Adjust local exchange plan rule (#30393) --- .../main/java/org/apache/doris/planner/ScanNode.java | 5 +++-- .../src/main/java/org/apache/doris/qe/Coordinator.java | 18 ++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 9881fe6f6e3..3633f2d1fd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -720,12 +720,13 @@ public abstract class ScanNode extends PlanNode { || getShouldColoScan(); } - public boolean ignoreStorageDataDistribution(ConnectContext context) { + public boolean ignoreStorageDataDistribution(ConnectContext context, int numBackends) { return context != null && context.getSessionVariable().isIgnoreStorageDataDistribution() && context.getSessionVariable().getEnablePipelineXEngine() && !fragment.isHasNullAwareLeftAntiJoin() - && getScanRangeNum() < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + && getScanRangeNum() + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numBackends; } public int getScanRangeNum() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 222fc37a429..099aedd7e17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2036,15 +2036,16 @@ public class Coordinator implements CoordInterface { /** * Ignore storage data distribution iff: - * 1. Current fragment is not forced to use data distribution. - * 2. `parallelExecInstanceNum` is larger than scan ranges. - * 3. Use Nereids planner. + * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. + * 2. Use Nereids planner. */ boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); boolean ignoreStorageDataDistribution = scanNodes.stream() - .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context)) && useNereids; + .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, + fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId()) + .scanRangeAssignment.size())) && useNereids; if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); @@ -2913,12 +2914,13 @@ public class Coordinator implements CoordInterface { /** * Ignore storage data distribution iff: - * 1. Current fragment is not forced to use data distribution. - * 2. `parallelExecInstanceNum` is larger than scan ranges. - * 3. Use Nereids planner. + * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. + * 2. Use Nereids planner. */ boolean ignoreStorageDataDistribution = scanNodes.stream() - .allMatch(node -> node.ignoreStorageDataDistribution(context)) + .allMatch(node -> node.ignoreStorageDataDistribution(context, + fragmentExecParamsMap.get(node.getFragment().getFragmentId()) + .scanRangeAssignment.size())) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; }) && useNereids; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org