This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d0c209206bb [pipelineX](bug) Fix correctness problem using multiple BE (#29765) d0c209206bb is described below commit d0c209206bb359a57c98bf80f3aaa217bf87c0bb Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Jan 10 14:49:46 2024 +0800 [pipelineX](bug) Fix correctness problem using multiple BE (#29765) --- .../src/main/java/org/apache/doris/planner/ScanNode.java | 4 +++- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index d21fab7ac9a..885bb335e49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -724,7 +724,9 @@ public abstract class ScanNode extends PlanNode { return !isKeySearch() && context != null && context.getSessionVariable().isIgnoreStorageDataDistribution() && context.getSessionVariable().getEnablePipelineXEngine() - && !fragment.isHasNullAwareLeftAntiJoin(); + && !fragment.isHasNullAwareLeftAntiJoin() + && ((this instanceof OlapScanNode) && ((OlapScanNode) this).getScanTabletIds().size() + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum()); } public boolean haveLimitAndConjunts() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9367316b201..efee918dfde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2044,9 +2044,10 @@ public class Coordinator implements CoordInterface { boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); + boolean ignoreStorageDataDistribution = scanNodes.stream() + .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context)) && useNereids; if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) - || (node.get().ignoreStorageDataDistribution(context) - && expectedInstanceNum > perNodeScanRanges.size() && useNereids))) { + || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and conjunts, only need 1 instance to save cpu and // mem resource @@ -2906,9 +2907,8 @@ public class Coordinator implements CoordInterface { * 2. `parallelExecInstanceNum` is larger than scan ranges. * 3. Use Nereids planner. */ - boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> { - return scanNodeIds.contains(scanNode.getId().asInt()); - }).allMatch(node -> node.ignoreStorageDataDistribution(context)) + boolean ignoreStorageDataDistribution = scanNodes.stream() + .allMatch(node -> node.ignoreStorageDataDistribution(context)) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; }) && useNereids; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org