This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2cde7b0839d [fix](nereids) fix NereidsCoordinator compute wrong result when exists CTE (#44753) 2cde7b0839d is described below commit 2cde7b0839d1607ef6dd619b7d9db95bc4c211ee Author: 924060929 <lanhuaj...@selectdb.com> AuthorDate: Mon Dec 2 16:41:17 2024 +0800 [fix](nereids) fix NereidsCoordinator compute wrong result when exists CTE (#44753) fix NereidsCoordinator compute wrong result when exists CTE, introduced by #41730 --- .../doris/qe/runtime/ThriftPlansBuilder.java | 11 ++-- .../distribute/test_multicast_sink.groovy | 60 ++++++++++++++++++++++ 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index a02ee90e901..54bc0b24d3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -240,14 +240,19 @@ public class ThriftPlansBuilder { return senderNum; } - private static void setMultiCastDestinationThrift(PipelineDistributedPlan fragmentPlan) { + private static void setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) { MultiCastDataSink multiCastDataSink = (MultiCastDataSink) fragmentPlan.getFragmentJob().getFragment().getSink(); List<List<TPlanFragmentDestination>> destinationList = multiCastDataSink.getDestinations(); List<DataStreamSink> dataStreamSinks = multiCastDataSink.getDataStreamSinks(); for (int i = 0; i < dataStreamSinks.size(); i++) { - DataStreamSink realSink = dataStreamSinks.get(i); List<TPlanFragmentDestination> destinations = destinationList.get(i); + if (!destinations.isEmpty()) { + // we should only set destination only once, + // because all backends share the same MultiCastDataSink object + continue; + } + DataStreamSink realSink = dataStreamSinks.get(i); for (Entry<DataSink, List<AssignedJob>> kv : fragmentPlan.getDestinations().entrySet()) { DataSink sink = kv.getKey(); if (sink == realSink) { @@ -318,7 +323,7 @@ public class ThriftPlansBuilder { List<TPlanFragmentDestination> nonMultiCastDestinations; if (fragment.getSink() instanceof MultiCastDataSink) { nonMultiCastDestinations = Lists.newArrayList(); - setMultiCastDestinationThrift(fragmentPlan); + setMultiCastDestinationThriftIfNotSet(fragmentPlan); } else { nonMultiCastDestinations = nonMultiCastDestinationToThrift(fragmentPlan); } diff --git a/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink.groovy b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink.groovy new file mode 100644 index 00000000000..eeeaad06d5e --- /dev/null +++ b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_multicast_sink") { + multi_sql """ + drop table if exists table_1_undef_partitions2_keys3_properties4_distributed_by5; + CREATE TABLE `table_1_undef_partitions2_keys3_properties4_distributed_by5` ( + `col_int_undef_signed` int NULL, + `col_int_undef_signed_not_null` int NOT NULL, + `col_date_undef_signed` date NULL, + `col_date_undef_signed_not_null` date NOT NULL, + `col_varchar_10__undef_signed` varchar(10) NULL, + `col_varchar_10__undef_signed_not_null` varchar(10) NOT NULL, + `col_varchar_1024__undef_signed` varchar(1024) NULL, + `col_varchar_1024__undef_signed_not_null` varchar(1024) NOT NULL, + `pk` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`col_int_undef_signed`, `col_int_undef_signed_not_null`, `col_date_undef_signed`) + DISTRIBUTED BY HASH(`pk`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + insert into table_1_undef_partitions2_keys3_properties4_distributed_by5 values(3, 6, '2023-12-17', '2023-12-17', 'ok', 'v', 'want', 'z', 0); + set enable_nereids_distribute_planner=true; + set parallel_pipeline_task_num = 1; + """ + + for (def i in 0..<100) { + test { + sql """ + WITH cte1 AS( + SELECT t1.`pk` + FROM table_1_undef_partitions2_keys3_properties4_distributed_by5 AS t1 + ORDER BY t1.pk + ) + SELECT cte1.`pk` AS pk1 + FROM cte1 + LEFT OUTER JOIN cte1 AS alias1 + ON cte1 . `pk` = alias1 . `pk` + WHERE cte1.`pk` < 3 + LIMIT 66666666 + """ + result([[0]]) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org