This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch auto-pick-59295-branch-3.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3d5d0edfc63044d271a0cb11656b854b94b9858f Author: 924060929 <[email protected]> AuthorDate: Wed Dec 24 18:52:33 2025 +0800 [fix](coordinator) fix legacy coordinator use 1 instance for shuffle fragment when children has multiple instances (#59295) fix legacy coordinator use 1 instance for shuffle fragment when children has multiple instances, introduced by #39999 (cherry picked from commit ae2ba6f6403d8832bd2fb7c32273db4e09b45876) --- .../main/java/org/apache/doris/qe/Coordinator.java | 12 ++- .../org/apache/doris/qe/OldCoordinatorTest.java | 102 +++++++++++++++++++++ 2 files changed, 109 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 2eb4f115b4c..aef568b3c90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1968,15 +1968,17 @@ public class Coordinator implements CoordInterface { private int findMaxParallelFragmentIndex(PlanFragment fragment) { Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment has no children"); - // exclude broadcast join right side's child fragments - List<PlanFragment> childFragmentCandidates = fragment.getChildren().stream() - .filter(e -> e.getOutputPartition() != DataPartition.UNPARTITIONED) - .collect(Collectors.toList()); + List<PlanFragment> childFragmentCandidates = fragment.getChildren(); int maxParallelism = 0; int maxParaIndex = 0; for (int i = 0; i < childFragmentCandidates.size(); i++) { - PlanFragmentId childFragmentId = childFragmentCandidates.get(i).getFragmentId(); + PlanFragment planFragment = childFragmentCandidates.get(i); + // exclude broadcast join right side's child fragments + if (planFragment.getOutputPartition() == DataPartition.UNPARTITIONED) { + continue; + } + PlanFragmentId childFragmentId = planFragment.getFragmentId(); int currentChildFragmentParallelism = fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size(); if (currentChildFragmentParallelism > maxParallelism) { maxParallelism = currentChildFragmentParallelism; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java new file mode 100644 index 00000000000..40e628cd71c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class OldCoordinatorTest extends TestWithFeService { + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + createTable("create table test.tbl(id int, value int) distributed by hash(id) buckets 1 properties('replication_num'='1')"); + createTable("create table test.tbl2(id int, value int) distributed by hash(id) buckets 1 properties('replication_num'='1')"); + createTable("create table test.tbl3(id int, value int) distributed by hash(id) buckets 10 properties('replication_num'='1')"); + } + + @Test + public void test() throws Exception { + connectContext.getSessionVariable().setDisableNereidsRules(RuleType.PRUNE_EMPTY_PARTITION.name()); + connectContext.getSessionVariable().setDisableJoinReorder(true); + connectContext.getSessionVariable().parallelPipelineTaskNum = 2; + connectContext.getSessionVariable().setEnableLocalShuffle(false); + StmtExecutor stmtExecutor = getSqlStmtExecutor( + "select *\n" + + "from (\n" // left most has exchange trigger shuffle instance logic + + " select a.value\n" + + " from (select value from test.tbl group by value)a\n" + + " join[broadcast] test.tbl2\n" + + " on a.value=tbl2.id\n" + + ")b\n" + + "join[shuffle] test.tbl3 c\n" // 2 instances, should use as the shuffle instances + + "on b.value = c.id\n" + + "join[shuffle] (\n" // 1 instance, skip + + " select a.value\n" + + " from (select value from test.tbl group by value)a\n" + + " join[broadcast] test.tbl2\n" + + " on a.value=tbl2.id\n" + + ")d\n" + + "on c.id = d.value\n" + + "join[shuffle] test.tbl e\n" // 1 instance, skip + + "on d.value=e.id"); + + AtomicBoolean shuffleFragmentHasMultiInstances = new AtomicBoolean(false); + new Coordinator(connectContext, null, stmtExecutor.planner()) { + public void test() throws Exception { + super.processFragmentAssignmentAndParams(); + + Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = getFragmentExecParamsMap(); + PlanFragmentId scanTbl3FragmentId = null; + for (FragmentExecParams fragmentExecParams : fragmentExecParamsMap.values()) { + PlanNode planRoot = fragmentExecParams.fragment.getPlanRoot(); + if (planRoot instanceof OlapScanNode && ((OlapScanNode) planRoot).getOlapTable().getName() + .equals("tbl3")) { + scanTbl3FragmentId = fragmentExecParams.fragment.getId(); + break; + } + } + + for (FragmentExecParams fragmentExecParams : fragmentExecParamsMap.values()) { + List<FInstanceExecParam> instances = fragmentExecParams.instanceExecParams; + boolean childScanTbl3 = false; + for (PlanFragment child : fragmentExecParams.fragment.getChildren()) { + if (child.getFragmentId().equals(scanTbl3FragmentId)) { + childScanTbl3 = true; + break; + } + } + if (childScanTbl3 && instances.size() >= 2) { + shuffleFragmentHasMultiInstances.set(true); + } + } + } + }.test(); + Assertions.assertTrue(shuffleFragmentHasMultiInstances.get()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
