This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6bb784abbcc branch-3.0: [fix](coordinator) fix cte with local shuffle
throw exception #52870 (#52875)
6bb784abbcc is described below
commit 6bb784abbcca1d2f1745c98eb9f9cbf6859b475e
Author: 924060929 <[email protected]>
AuthorDate: Wed Jul 9 09:49:58 2025 +0800
branch-3.0: [fix](coordinator) fix cte with local shuffle throw exception
#52870 (#52875)
cherry pick from #52870
---
.../main/java/org/apache/doris/qe/Coordinator.java | 4 +-
.../java/org/apache/doris/qe/LocalShuffleTest.java | 221 +++++++++++++++++++++
2 files changed, 223 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d05fb4e3694..cc6dd424ecb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1573,7 +1573,7 @@ public class Coordinator implements CoordInterface {
}
// process bucket shuffle join on fragment without scan
node
while (bucketSeq < bucketNum) {
- TPlanFragmentDestination dest =
setDestination(destParams, params.destinations.size(),
+ TPlanFragmentDestination dest =
setDestination(destParams, destinations.size(),
bucketSeq);
bucketSeq++;
destinations.add(dest);
@@ -1611,7 +1611,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id =
destParams.instanceExecParams.get(j).instanceId;
dest.server =
toRpcHost(destParams.instanceExecParams.get(j).host);
dest.brpc_server =
toBrpcHost(destParams.instanceExecParams.get(j).host);
- destParams.instanceExecParams.get(j).recvrId =
params.destinations.size();
+ destParams.instanceExecParams.get(j).recvrId =
destinations.size();
destinations.add(dest);
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalShuffleTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalShuffleTest.java
new file mode 100644
index 00000000000..597b7984cb0
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalShuffleTest.java
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.qe.Coordinator.FInstanceExecParam;
+import org.apache.doris.qe.Coordinator.FragmentExecParams;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.SetMultimap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class LocalShuffleTest extends TestWithFeService {
+ @Override
+ protected int backendNum() {
+ return 2;
+ }
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+ useDatabase("test");
+ createTable("CREATE TABLE `oooo_oneid` (\n"
+ + " `identity_code` varchar(128) NULL COMMENT \"\",\n"
+ + " `identity_value` varchar(128) NULL COMMENT \"\",\n"
+ + " `oneid` bigint NULL COMMENT \"\",\n"
+ + " INDEX idx_oneid (`oneid`) USING INVERTED COMMENT ''\n"
+ + ") ENGINE=OLAP\n"
+ + "UNIQUE KEY(`identity_code`, `identity_value`)\n"
+ + "COMMENT 'oooo'\n"
+ + "DISTRIBUTED BY HASH(`identity_code`, `identity_value`)
BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 2\"\n"
+ + ")");
+
+ createTable("CREATE TABLE `aaaa_target` (\n"
+ + " `base_id` varchar(128) NULL COMMENT \"\",\n"
+ + " `hash_partition_id` bigint NOT NULL COMMENT \"\",\n"
+ + " `j_bbbb` text NULL COMMENT \"j_bbbb\"\n"
+ + ") ENGINE=OLAP\n"
+ + "UNIQUE KEY(`base_id`, `hash_partition_id`)\n"
+ + "COMMENT 'aaaa'\n"
+ + "PARTITION BY LIST (`hash_partition_id`)\n"
+ + "(PARTITION p0 VALUES IN (\"0\"),\n"
+ + "PARTITION p1 VALUES IN (\"1\"),\n"
+ + "PARTITION p2 VALUES IN (\"2\"),\n"
+ + "PARTITION p3 VALUES IN (\"3\"),\n"
+ + "PARTITION p4 VALUES IN (\"4\"),\n"
+ + "PARTITION p5 VALUES IN (\"5\"),\n"
+ + "PARTITION p6 VALUES IN (\"6\"),\n"
+ + "PARTITION p7 VALUES IN (\"7\"),\n"
+ + "PARTITION p8 VALUES IN (\"8\"),\n"
+ + "PARTITION p9 VALUES IN (\"9\"),\n"
+ + "PARTITION p10 VALUES IN (\"10\"),\n"
+ + "PARTITION p11 VALUES IN (\"11\"),\n"
+ + "PARTITION p12 VALUES IN (\"12\"),\n"
+ + "PARTITION p13 VALUES IN (\"13\"),\n"
+ + "PARTITION p14 VALUES IN (\"14\"),\n"
+ + "PARTITION p15 VALUES IN (\"15\"),\n"
+ + "PARTITION p16 VALUES IN (\"16\"),\n"
+ + "PARTITION p17 VALUES IN (\"17\"),\n"
+ + "PARTITION p18 VALUES IN (\"18\"),\n"
+ + "PARTITION p19 VALUES IN (\"19\"))\n"
+ + "DISTRIBUTED BY HASH(`base_id`, `hash_partition_id`) BUCKETS
1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 2\"\n"
+ + ");");
+
+ createTable("CREATE TABLE `bbbb_target` (\n"
+ + " `base_id` varchar(128) NULL COMMENT \"\",\n"
+ + " `hash_partition_id` bigint NOT NULL COMMENT \"\"\n"
+ + ") ENGINE=OLAP\n"
+ + "UNIQUE KEY(`base_id`, `hash_partition_id`)\n"
+ + "COMMENT 'bbbb'\n"
+ + "PARTITION BY LIST (`hash_partition_id`)\n"
+ + "(PARTITION p0 VALUES IN (\"0\"),\n"
+ + "PARTITION p1 VALUES IN (\"1\"),\n"
+ + "PARTITION p2 VALUES IN (\"2\"),\n"
+ + "PARTITION p3 VALUES IN (\"3\"),\n"
+ + "PARTITION p4 VALUES IN (\"4\"),\n"
+ + "PARTITION p5 VALUES IN (\"5\"),\n"
+ + "PARTITION p6 VALUES IN (\"6\"),\n"
+ + "PARTITION p7 VALUES IN (\"7\"),\n"
+ + "PARTITION p8 VALUES IN (\"8\"),\n"
+ + "PARTITION p9 VALUES IN (\"9\"),\n"
+ + "PARTITION p10 VALUES IN (\"10\"),\n"
+ + "PARTITION p11 VALUES IN (\"11\"),\n"
+ + "PARTITION p12 VALUES IN (\"12\"),\n"
+ + "PARTITION p13 VALUES IN (\"13\"),\n"
+ + "PARTITION p14 VALUES IN (\"14\"),\n"
+ + "PARTITION p15 VALUES IN (\"15\"),\n"
+ + "PARTITION p16 VALUES IN (\"16\"),\n"
+ + "PARTITION p17 VALUES IN (\"17\"),\n"
+ + "PARTITION p18 VALUES IN (\"18\"),\n"
+ + "PARTITION p19 VALUES IN (\"19\"))\n"
+ + "DISTRIBUTED BY HASH(`base_id`, `hash_partition_id`) BUCKETS
1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 2\"\n"
+ + ");");
+
+ createTable("CREATE TABLE `cccc_target` (\n"
+ + " `base_id` varchar(128) NULL COMMENT \"\",\n"
+ + " `hash_partition_id` bigint NOT NULL COMMENT \"\"\n"
+ + ") ENGINE=OLAP\n"
+ + "UNIQUE KEY(`base_id`, `hash_partition_id`)\n"
+ + "COMMENT 'bbbb'\n"
+ + "PARTITION BY LIST (`hash_partition_id`)\n"
+ + "(PARTITION p0 VALUES IN (\"0\"),\n"
+ + "PARTITION p1 VALUES IN (\"1\"),\n"
+ + "PARTITION p2 VALUES IN (\"2\"),\n"
+ + "PARTITION p3 VALUES IN (\"3\"),\n"
+ + "PARTITION p4 VALUES IN (\"4\"),\n"
+ + "PARTITION p5 VALUES IN (\"5\"),\n"
+ + "PARTITION p6 VALUES IN (\"6\"),\n"
+ + "PARTITION p7 VALUES IN (\"7\"),\n"
+ + "PARTITION p8 VALUES IN (\"8\"),\n"
+ + "PARTITION p9 VALUES IN (\"9\"),\n"
+ + "PARTITION p10 VALUES IN (\"10\"),\n"
+ + "PARTITION p11 VALUES IN (\"11\"),\n"
+ + "PARTITION p12 VALUES IN (\"12\"),\n"
+ + "PARTITION p13 VALUES IN (\"13\"),\n"
+ + "PARTITION p14 VALUES IN (\"14\"),\n"
+ + "PARTITION p15 VALUES IN (\"15\"),\n"
+ + "PARTITION p16 VALUES IN (\"16\"),\n"
+ + "PARTITION p17 VALUES IN (\"17\"),\n"
+ + "PARTITION p18 VALUES IN (\"18\"),\n"
+ + "PARTITION p19 VALUES IN (\"19\"))\n"
+ + "DISTRIBUTED BY HASH(`base_id`, `hash_partition_id`) BUCKETS
1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 2\"\n"
+ + ");");
+ }
+
+ @Test
+ public void testCteWithLocalShuffle() throws Exception {
+ connectContext.getState().reset();
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
+
connectContext.getSessionVariable().setDisableNereidsRules(RuleType.PRUNE_EMPTY_PARTITION.name());
+ connectContext.getSessionVariable().setQueryTimeoutS(10);
+ connectContext.getSessionVariable().setDisableJoinReorder(true);
+
+ String sql = "with oooo_oneid as (select identity_value from
test.oooo_oneid),\n"
+ + " aaaa_frm_$oooo as (select j_bbbb,base_id from
test.aaaa_target),\n"
+ + " bbbb_frm_$aaaa_target as (select * from bbbb_target
where base_id in (select 1 from aaaa_frm_$oooo)),\n"
+ + " bbbb_cdn_1602527468_lnkb_bbbb_target$aaaa_target as
(select base_id from bbbb_frm_$aaaa_target),\n"
+ + " bbbb_frm_$oooo as (select *\n"
+ + " from bbbb_target\n"
+ + " where base_id in (select
identity_value from oooo_oneid)\n"
+ + " ),\n"
+ + " cccc_frm_$bbbb_target as (select * from cccc_target
where base_id in (select 1 from bbbb_frm_$oooo))\n"
+ + "select\n"
+ + " j_bbbb in (select base_id from
bbbb_cdn_1602527468_lnkb_bbbb_target$aaaa_target),\n"
+ + " j_bbbb in (select base_id from
bbbb_cdn_1602527468_lnkb_bbbb_target$aaaa_target)\n"
+ + "from (select j_bbbb\n"
+ + " from oooo_oneid\n"
+ + " left outer join aaaa_frm_$oooo on
(oooo_oneid.identity_value = aaaa_frm_$oooo.base_id)\n"
+ + " left outer join bbbb_frm_$oooo on
(oooo_oneid.identity_value = bbbb_frm_$oooo.base_id)\n"
+ + ")final_plain_table";
+ StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
+ try {
+ stmtExecutor.execute();
+ } catch (Throwable t) {
+ // ignore
+ }
+ Coordinator coord = stmtExecutor.getCoord();
+ Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap =
coord.getFragmentExecParamsMap();
+
+ for (FragmentExecParams fragmentExecParams :
fragmentExecParamsMap.values()) {
+ // skip check root fragment
+ if (fragmentExecParams.fragment.getChildren().isEmpty()) {
+ continue;
+ }
+ SetMultimap<TNetworkAddress, Integer> receiverIds =
LinkedHashMultimap.create();
+ ArrayListMultimap<TNetworkAddress, FInstanceExecParam>
hostToInstances = ArrayListMultimap.create();
+ boolean setRecvrId = false;
+ for (FInstanceExecParam instanceExecParam :
fragmentExecParams.instanceExecParams) {
+ if (instanceExecParam.recvrId != -1) {
+ setRecvrId = true;
+ }
+ receiverIds.put(instanceExecParam.host,
instanceExecParam.recvrId);
+ hostToInstances.put(instanceExecParam.host, instanceExecParam);
+ }
+ if (!setRecvrId) {
+ // skip check when share broadcast hash table
+ continue;
+ }
+ for (Entry<TNetworkAddress, Collection<Integer>> hostToReceiverIds
: receiverIds.asMap()
+ .entrySet()) {
+ // if this host has 2 instances, it should contain 2 receiverId
+ Assertions.assertEquals(
+ hostToInstances.get(hostToReceiverIds.getKey()).size(),
+ hostToReceiverIds.getValue().size()
+ );
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]