This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9af64d848f89914a84ee240d62e63321c9c8209d Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Wed Mar 6 09:26:52 2024 +0800 [fix](pipelineX) fix error distribution in DistinctStreamingAggOperatorX (#31804) --- .../exec/distinct_streaming_aggregation_operator.h | 2 +- .../test_distinct_streaming_agg_local_shuffle.out | 25 +++++ ...est_distinct_streaming_agg_local_shuffle.groovy | 102 +++++++++++++++++++++ 3 files changed, 128 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 41f8a74b651..17abe3d7ff0 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -94,7 +94,7 @@ public: bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution() const override { - if (_needs_finalize) { + if (_needs_finalize || !_probe_expr_ctxs.empty()) { return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); diff --git a/regression-test/data/correctness_p0/test_distinct_streaming_agg_local_shuffle.out b/regression-test/data/correctness_p0/test_distinct_streaming_agg_local_shuffle.out new file mode 100644 index 00000000000..0ca13082f08 --- /dev/null +++ b/regression-test/data/correctness_p0/test_distinct_streaming_agg_local_shuffle.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +0 1 0 +1 1 1 +2 1 2 +3 1 3 +4 1 4 +5 1 5 +6 1 6 +7 1 7 +8 1 8 +9 1 9 + +-- !select -- +0 1 0 +1 1 1 +2 1 2 +3 1 3 +4 1 4 +5 1 5 +6 1 6 +7 1 7 +8 1 8 +9 1 9 + diff --git a/regression-test/suites/correctness_p0/test_distinct_streaming_agg_local_shuffle.groovy b/regression-test/suites/correctness_p0/test_distinct_streaming_agg_local_shuffle.groovy new file mode 100644 index 00000000000..5a6b469cb1c --- /dev/null +++ b/regression-test/suites/correctness_p0/test_distinct_streaming_agg_local_shuffle.groovy @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate +// and modified by Doris. + +suite("test_distinct_streaming_agg_local_shuffle") { + + sql """drop table if exists table_10_undef_partitions2_keys3_properties4_distributed_by5;""" + sql """ + create table table_10_undef_partitions2_keys3_properties4_distributed_by5 ( + col_bigint_undef_signed bigint/*agg_type_placeholder*/ , + col_varchar_10__undef_signed varchar(10)/*agg_type_placeholder*/ , + col_varchar_64__undef_signed varchar(64)/*agg_type_placeholder*/ , + pk int/*agg_type_placeholder*/ + ) engine=olap + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); """ + + sql """ + insert into table_10_undef_partitions2_keys3_properties4_distributed_by5(pk,col_bigint_undef_signed,col_varchar_10__undef_signed,col_varchar_64__undef_signed) values (0,-94,'had','y'),(1,672609,'k','h'),(2,-3766684,'a','p'),(3,5070261,'on','x'),(4,null,'u','at'),(5,-86,'v','c'),(6,21910,'how','m'),(7,-63,'that''s','go'),(8,-8276281,'s','a'),(9,-101,'w','y'); + """ + + sql """ + drop table if exists table_10_undef_partitions2_keys3_properties4_distributed_by52 + """ + + sql """ + create table table_10_undef_partitions2_keys3_properties4_distributed_by52 ( + pk int, + col_bigint_undef_signed bigint , + col_varchar_10__undef_signed varchar(10) , + col_varchar_64__undef_signed varchar(64) + ) engine=olap + DUPLICATE KEY(pk, col_bigint_undef_signed, col_varchar_10__undef_signed) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + insert into table_10_undef_partitions2_keys3_properties4_distributed_by52(pk,col_bigint_undef_signed,col_varchar_10__undef_signed,col_varchar_64__undef_signed) values (0,null,'g','i'),(1,-6138328,'z','do'),(2,-23217,'g','about'),(3,104,'you''re','z'),(4,null,'oh','i'),(5,-54,'want','to'),(6,null,'x','c'),(7,null,'you''re','come'),(8,3447,'really','from'),(9,-5459,'i','will'); + """ + + sql """ + drop table if exists table_10_undef_partitions2_keys3_properties4_distributed_by53 + """ + + sql """ + create table table_10_undef_partitions2_keys3_properties4_distributed_by53 ( + pk int, + col_varchar_10__undef_signed varchar(10) , + col_bigint_undef_signed bigint , + col_varchar_64__undef_signed varchar(64) + ) engine=olap + DUPLICATE KEY(pk, col_varchar_10__undef_signed) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + + sql """ + insert into table_10_undef_partitions2_keys3_properties4_distributed_by53(pk,col_bigint_undef_signed,col_varchar_10__undef_signed,col_varchar_64__undef_signed) values (0,null,'right','g'),(1,-486256,'on','on'),(2,-1,'I''ll','at'),(3,29263,'h','don''t'),(4,5453,'a','s'),(5,-119,'j','can''t'),(6,89,'one','n'),(7,-7227,'s','u'),(8,94,'time','b'),(9,1816630,'yes','yes'); + """ + + sql """set experimental_enable_pipeline_x_engine=true""" + sql """set enable_local_shuffle = true""" + + qt_select """ + + SELECT table1 . `pk` AS field1 , COUNT( DISTINCT table1 . `pk` ) AS field2 , table1 . `pk` AS field3 FROM table_10_undef_partitions2_keys3_properties4_distributed_by52 AS table1 LEFT OUTER JOIN table_10_undef_partitions2_keys3_properties4_distributed_by52 AS table2 ON table2 . `pk` +<= table2 . `pk` LEFT JOIN table_10_undef_partitions2_keys3_properties4_distributed_by5 AS table3 ON table2 . col_varchar_10__undef_signed = table2 . col_varchar_10__undef_signed WHERE table1 . col_varchar_64__undef_signed > 'look' AND table1 . col_varchar_64__undef_signed <= 'zzzz' OR table1 . col_varchar_10__undef_signed = table1 . col_varchar_10__undef_signed OR table1 . col_varchar_10__undef_signed > 'wmXlKwiRcZ' AND table1 . col_varchar_10__undef_signed <= 'z' AND table1 . `p [...] + + """ + + + sql """set experimental_enable_pipeline_x_engine=false""" + sql """set enable_local_shuffle = false""" + + + qt_select """ + + SELECT table1 . `pk` AS field1 , COUNT( DISTINCT table1 . `pk` ) AS field2 , table1 . `pk` AS field3 FROM table_10_undef_partitions2_keys3_properties4_distributed_by52 AS table1 LEFT OUTER JOIN table_10_undef_partitions2_keys3_properties4_distributed_by52 AS table2 ON table2 . `pk` +<= table2 . `pk` LEFT JOIN table_10_undef_partitions2_keys3_properties4_distributed_by5 AS table3 ON table2 . col_varchar_10__undef_signed = table2 . col_varchar_10__undef_signed WHERE table1 . col_varchar_64__undef_signed > 'look' AND table1 . col_varchar_64__undef_signed <= 'zzzz' OR table1 . col_varchar_10__undef_signed = table1 . col_varchar_10__undef_signed OR table1 . col_varchar_10__undef_signed > 'wmXlKwiRcZ' AND table1 . col_varchar_10__undef_signed <= 'z' AND table1 . `p [...] + + """ + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org