This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch auto-pick-59295-branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3d5d0edfc63044d271a0cb11656b854b94b9858f
Author: 924060929 <[email protected]>
AuthorDate: Wed Dec 24 18:52:33 2025 +0800

    [fix](coordinator) fix legacy coordinator use 1 instance for shuffle 
fragment when children has multiple instances (#59295)
    
    fix legacy coordinator use 1 instance for shuffle fragment when children
    has multiple instances, introduced by #39999
    
    (cherry picked from commit ae2ba6f6403d8832bd2fb7c32273db4e09b45876)
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  12 ++-
 .../org/apache/doris/qe/OldCoordinatorTest.java    | 102 +++++++++++++++++++++
 2 files changed, 109 insertions(+), 5 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2eb4f115b4c..aef568b3c90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1968,15 +1968,17 @@ public class Coordinator implements CoordInterface {
     private int findMaxParallelFragmentIndex(PlanFragment fragment) {
         Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment 
has no children");
 
-        // exclude broadcast join right side's child fragments
-        List<PlanFragment> childFragmentCandidates = 
fragment.getChildren().stream()
-                .filter(e -> e.getOutputPartition() != 
DataPartition.UNPARTITIONED)
-                .collect(Collectors.toList());
+        List<PlanFragment> childFragmentCandidates = fragment.getChildren();
 
         int maxParallelism = 0;
         int maxParaIndex = 0;
         for (int i = 0; i < childFragmentCandidates.size(); i++) {
-            PlanFragmentId childFragmentId = 
childFragmentCandidates.get(i).getFragmentId();
+            PlanFragment planFragment = childFragmentCandidates.get(i);
+            // exclude broadcast join right side's child fragments
+            if (planFragment.getOutputPartition() == 
DataPartition.UNPARTITIONED) {
+                continue;
+            }
+            PlanFragmentId childFragmentId = planFragment.getFragmentId();
             int currentChildFragmentParallelism = 
fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size();
             if (currentChildFragmentParallelism > maxParallelism) {
                 maxParallelism = currentChildFragmentParallelism;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
new file mode 100644
index 00000000000..40e628cd71c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class OldCoordinatorTest extends TestWithFeService {
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+        createTable("create table test.tbl(id int, value int) distributed by 
hash(id) buckets 1 properties('replication_num'='1')");
+        createTable("create table test.tbl2(id int, value int) distributed by 
hash(id) buckets 1 properties('replication_num'='1')");
+        createTable("create table test.tbl3(id int, value int) distributed by 
hash(id) buckets 10 properties('replication_num'='1')");
+    }
+
+    @Test
+    public void test() throws Exception {
+        
connectContext.getSessionVariable().setDisableNereidsRules(RuleType.PRUNE_EMPTY_PARTITION.name());
+        connectContext.getSessionVariable().setDisableJoinReorder(true);
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
+        connectContext.getSessionVariable().setEnableLocalShuffle(false);
+        StmtExecutor stmtExecutor = getSqlStmtExecutor(
+                "select *\n"
+                        + "from (\n"   // left most has exchange trigger 
shuffle instance logic
+                        + "  select a.value\n"
+                        + "  from (select value from test.tbl group by 
value)a\n"
+                        + "  join[broadcast] test.tbl2\n"
+                        + "  on a.value=tbl2.id\n"
+                        + ")b\n"
+                        + "join[shuffle] test.tbl3 c\n"   // 2 instances, 
should use as the shuffle instances
+                        + "on b.value = c.id\n"
+                        + "join[shuffle] (\n"            // 1 instance, skip
+                        + "  select a.value\n"
+                        + "  from (select value from test.tbl group by 
value)a\n"
+                        + "  join[broadcast] test.tbl2\n"
+                        + "  on a.value=tbl2.id\n"
+                        + ")d\n"
+                        + "on c.id = d.value\n"
+                        + "join[shuffle] test.tbl e\n"   // 1 instance, skip
+                        + "on d.value=e.id");
+
+        AtomicBoolean shuffleFragmentHasMultiInstances = new 
AtomicBoolean(false);
+        new Coordinator(connectContext, null, stmtExecutor.planner()) {
+            public void test() throws Exception {
+                super.processFragmentAssignmentAndParams();
+
+                Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap 
= getFragmentExecParamsMap();
+                PlanFragmentId scanTbl3FragmentId = null;
+                for (FragmentExecParams fragmentExecParams : 
fragmentExecParamsMap.values()) {
+                    PlanNode planRoot = 
fragmentExecParams.fragment.getPlanRoot();
+                    if (planRoot instanceof OlapScanNode && ((OlapScanNode) 
planRoot).getOlapTable().getName()
+                            .equals("tbl3")) {
+                        scanTbl3FragmentId = 
fragmentExecParams.fragment.getId();
+                        break;
+                    }
+                }
+
+                for (FragmentExecParams fragmentExecParams : 
fragmentExecParamsMap.values()) {
+                    List<FInstanceExecParam> instances = 
fragmentExecParams.instanceExecParams;
+                    boolean childScanTbl3 = false;
+                    for (PlanFragment child : 
fragmentExecParams.fragment.getChildren()) {
+                        if (child.getFragmentId().equals(scanTbl3FragmentId)) {
+                            childScanTbl3 = true;
+                            break;
+                        }
+                    }
+                    if (childScanTbl3 && instances.size() >= 2) {
+                        shuffleFragmentHasMultiInstances.set(true);
+                    }
+                }
+            }
+        }.test();
+        Assertions.assertTrue(shuffleFragmentHasMultiInstances.get());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to