This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 03f1cbde7ae [enhancement](Nereids) support 4 phases distinct aggregate with full distribution (#35871) 03f1cbde7ae is described below commit 03f1cbde7aea4aca9a1a2ac5b3ef43e250d2ca92 Author: 924060929 <924060...@qq.com> AuthorDate: Thu Jun 6 11:27:34 2024 +0800 [enhancement](Nereids) support 4 phases distinct aggregate with full distribution (#35871) The origin implementation of 4 phases distinct aggregate only support the pattern which not contains `group by`, and only one distinct aggregate function for example: ```sql select count(distinct sex), sum(age) from student ``` This pr complement the 4 phases distinct aggregate with full distribution, to avoid data skew in the `group by`. for example ```sql select sex, sum(distinct age) from student group by sex; ``` The sex only contains two distinct values, `male` and `female`, and the table store millions rows. Shuffle by the `sex` cause the data skew and lots of instances process empty rows. The 4 phase aggregate shuffle `sex, age` to distinct rows first, so more instances can do parallel distinct, the plan shape will like this: ``` PhysicalAggregate(groupBy=[sex], output=[sex, sum(partial_sum(age))], mode=BUFFER_TO_RESULT) | PhysicalDistribute(columns=[sex]) | PhysicalAggregate(groupBy=[sex], output=[sex, partial_sum(age)], mode=INPUT_TO_BUFFER) | PhysicalAggregate(groupBy=[sex, age], output=[sex, age], mode=BUFFER_TO_BUFFER) | PhysicalDistribute(columns=[sex, age]) # more columns to shuffle avoid data skew | PhysicalAggregate(groupBy=[sex, age], output=[sex, age], mode=INPUT_TO_BUFFER) | PhysicalOlapScan(name=student) ``` --- .../properties/ChildrenPropertiesRegulator.java | 10 -- .../org/apache/doris/nereids/rules/RuleType.java | 1 + .../rules/implementation/AggregateStrategies.java | 119 ++++++++++++++++----- .../nereids/trees/plans/algebra/Aggregate.java | 22 ++++ .../data/nereids_p0/aggregate/aggregate.out | 4 + .../suites/nereids_p0/aggregate/aggregate.groovy | 22 ++++ .../suites/nereids_syntax_p0/agg_4_phase.groovy | 4 +- 7 files changed, 144 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 038e2646a6d..3beed014aac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -113,16 +113,6 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> { // this means one stage gather agg, usually bad pattern return false; } - // forbid three or four stage distinct agg inter by distribute - if (agg.getAggMode() == AggMode.BUFFER_TO_BUFFER && children.get(0).getPlan() instanceof PhysicalDistribute) { - // if distinct without group by key, we prefer three or four stage distinct agg - // because the second phase of multi-distinct only have one instance, and it is slow generally. - if (agg.getGroupByExpressions().size() == 1 - && agg.getOutputExpressions().size() == 1) { - return true; - } - return false; - } // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle // TODO: this is forbid good plan after cte reuse by mistake diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 39acb6e8319..1b78d5bbb33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -448,6 +448,7 @@ public enum RuleType { TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT(RuleTypeClass.IMPLEMENTATION), THREE_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION), FOUR_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION), + FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE(RuleTypeClass.IMPLEMENTATION), LOGICAL_UNION_TO_PHYSICAL_UNION(RuleTypeClass.IMPLEMENTATION), LOGICAL_EXCEPT_TO_PHYSICAL_EXCEPT(RuleTypeClass.IMPLEMENTATION), LOGICAL_INTERSECT_TO_PHYSICAL_INTERSECT(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 937515ad8b5..eae17b1a99e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -75,6 +75,7 @@ import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -303,15 +304,89 @@ public class AggregateStrategies implements ImplementationRuleFactory { // .thenApplyMulti(ctx -> twoPhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) // ), RuleType.THREE_PHASE_AGGREGATE_WITH_DISTINCT.build( - basePattern - .when(agg -> agg.getDistinctArguments().size() == 1) - .thenApplyMulti(ctx -> threePhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) + basePattern + .when(agg -> agg.getDistinctArguments().size() == 1) + .thenApplyMulti(ctx -> threePhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) ), + /* + * sql: + * select count(distinct name), sum(age) from student; + * <p> + * 4 phase plan + * DISTINCT_GLOBAL(BUFFER_TO_RESULT, groupBy(), + * output[count(partial_count(name)), sum(partial_sum(partial_sum(age)))], + * GATHER) + * +--DISTINCT_LOCAL(INPUT_TO_BUFFER, groupBy(), + * output(partial_count(name), partial_sum(partial_sum(age))), + * hash distribute by name) + * +--GLOBAL(BUFFER_TO_BUFFER, groupBy(name), + * output(name, partial_sum(age)), + * hash_distribute by name) + * +--LOCAL(INPUT_TO_BUFFER, groupBy(name), output(name, partial_sum(age))) + * +--scan(name, age) + */ RuleType.FOUR_PHASE_AGGREGATE_WITH_DISTINCT.build( - basePattern - .when(agg -> agg.getDistinctArguments().size() == 1) - .when(agg -> agg.getGroupByExpressions().isEmpty()) - .thenApplyMulti(ctx -> fourPhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) + basePattern + .when(agg -> agg.getDistinctArguments().size() == 1) + .when(agg -> agg.getGroupByExpressions().isEmpty()) + .thenApplyMulti(ctx -> { + Function<List<Expression>, RequireProperties> secondPhaseRequireDistinctHash = + groupByAndDistinct -> RequireProperties.of( + PhysicalProperties.createHash( + ctx.root.getDistinctArguments(), ShuffleType.REQUIRE + ) + ); + Function<LogicalAggregate<? extends Plan>, RequireProperties> fourPhaseRequireGather = + agg -> RequireProperties.of(PhysicalProperties.GATHER); + return fourPhaseAggregateWithDistinct( + ctx.root, ctx.connectContext, + secondPhaseRequireDistinctHash, fourPhaseRequireGather + ); + }) + ), + /* + * sql: + * select age, count(distinct name) from student group by age; + * <p> + * 4 phase plan + * DISTINCT_GLOBAL(BUFFER_TO_RESULT, groupBy(age), + * output[age, sum(partial_count(name))], + * hash distribute by name) + * +--DISTINCT_LOCAL(INPUT_TO_BUFFER, groupBy(age), + * output(age, partial_count(name)), + * hash distribute by age, name) + * +--GLOBAL(BUFFER_TO_BUFFER, groupBy(age, name), + * output(age, name), + * hash_distribute by age, name) + * +--LOCAL(INPUT_TO_BUFFER, groupBy(age, name), output(age, name)) + * +--scan(age, name) + */ + RuleType.FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE.build( + basePattern + .when(agg -> agg.everyDistinctArgumentNumIsOne() && !agg.getGroupByExpressions().isEmpty()) + .when(agg -> + ImmutableSet.builder() + .addAll(agg.getGroupByExpressions()) + .addAll(agg.getDistinctArguments()) + .build().size() > agg.getGroupByExpressions().size() + ) + .thenApplyMulti(ctx -> { + Function<List<Expression>, RequireProperties> secondPhaseRequireGroupByAndDistinctHash = + groupByAndDistinct -> RequireProperties.of( + PhysicalProperties.createHash(groupByAndDistinct, ShuffleType.REQUIRE) + ); + + Function<LogicalAggregate<? extends Plan>, RequireProperties> fourPhaseRequireGroupByHash = + agg -> RequireProperties.of( + PhysicalProperties.createHash( + agg.getGroupByExpressions(), ShuffleType.REQUIRE + ) + ); + return fourPhaseAggregateWithDistinct( + ctx.root, ctx.connectContext, + secondPhaseRequireGroupByAndDistinctHash, fourPhaseRequireGroupByHash + ); + }) ) ); } @@ -1690,19 +1765,10 @@ public class AggregateStrategies implements ImplementationRuleFactory { return connectContext == null || connectContext.getSessionVariable().enablePushDownNoGroupAgg(); } - /** - * sql: - * select count(distinct name), sum(age) from student; - * <p> - * 4 phase plan - * DISTINCT_GLOBAL, BUFFER_TO_RESULT groupBy(), output[count(name), sum(age#5)], [GATHER] - * +--DISTINCT_LOCAL, INPUT_TO_BUFFER, groupBy()), output(count(name), partial_sum(age)), hash distribute by name - * +--GLOBAL, BUFFER_TO_BUFFER, groupBy(name), output(name, partial_sum(age)), hash_distribute by name - * +--LOCAL, INPUT_TO_BUFFER, groupBy(name), output(name, partial_sum(age)) - * +--scan(name, age) - */ private List<PhysicalHashAggregate<? extends Plan>> fourPhaseAggregateWithDistinct( - LogicalAggregate<? extends Plan> logicalAgg, ConnectContext connectContext) { + LogicalAggregate<? extends Plan> logicalAgg, ConnectContext connectContext, + Function<List<Expression>, RequireProperties> secondPhaseRequireSupplier, + Function<LogicalAggregate<? extends Plan>, RequireProperties> fourPhaseRequireSupplier) { boolean couldBanned = couldConvertToMulti(logicalAgg); Set<AggregateFunction> aggregateFunctions = logicalAgg.getAggregateFunctions(); @@ -1775,16 +1841,13 @@ public class AggregateStrategies implements ImplementationRuleFactory { globalAggOutput = ImmutableList.of(new Alias(new NullLiteral(TinyIntType.INSTANCE))); } - RequireProperties requireGather = RequireProperties.of(PhysicalProperties.GATHER); - - RequireProperties requireDistinctHash = RequireProperties.of( - PhysicalProperties.createHash(logicalAgg.getDistinctArguments(), ShuffleType.REQUIRE)); + RequireProperties secondPhaseRequire = secondPhaseRequireSupplier.apply(localAggGroupBy); //phase 2 PhysicalHashAggregate<? extends Plan> anyLocalHashGlobalAgg = new PhysicalHashAggregate<>( localAggGroupBy, globalAggOutput, Optional.of(ImmutableList.copyOf(logicalAgg.getDistinctArguments())), bufferToBufferParam, false, logicalAgg.getLogicalProperties(), - requireDistinctHash, anyLocalAgg); + secondPhaseRequire, anyLocalAgg); // phase 3 AggregateParam distinctLocalParam = new AggregateParam( @@ -1828,7 +1891,7 @@ public class AggregateStrategies implements ImplementationRuleFactory { PhysicalHashAggregate<? extends Plan> distinctLocal = new PhysicalHashAggregate<>( logicalAgg.getGroupByExpressions(), localDistinctOutput, Optional.empty(), distinctLocalParam, false, logicalAgg.getLogicalProperties(), - requireDistinctHash, anyLocalHashGlobalAgg); + secondPhaseRequire, anyLocalHashGlobalAgg); //phase 4 AggregateParam distinctGlobalParam = new AggregateParam( @@ -1842,7 +1905,7 @@ public class AggregateStrategies implements ImplementationRuleFactory { if (aggregateFunction.isDistinct()) { Set<Expression> aggChild = Sets.newLinkedHashSet(aggregateFunction.children()); Preconditions.checkArgument(aggChild.size() == 1 - || aggregateFunction.getDistinctArguments().size() == 1, + || aggregateFunction.getDistinctArguments().size() == 1, "cannot process more than one child in aggregate distinct function: " + aggregateFunction); AggregateFunction nonDistinct = aggregateFunction @@ -1862,10 +1925,12 @@ public class AggregateStrategies implements ImplementationRuleFactory { }); globalDistinctOutput.add(outputExprPhase4); } + + RequireProperties fourPhaseRequire = fourPhaseRequireSupplier.apply(logicalAgg); PhysicalHashAggregate<? extends Plan> distinctGlobal = new PhysicalHashAggregate<>( logicalAgg.getGroupByExpressions(), globalDistinctOutput, Optional.empty(), distinctGlobalParam, false, logicalAgg.getLogicalProperties(), - requireGather, distinctLocal); + fourPhaseRequire, distinctLocal); return ImmutableList.<PhysicalHashAggregate<? extends Plan>>builder() .add(distinctGlobal) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java index e7d09b8cf8b..acce8eef309 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; /** * Common interface for logical/physical Aggregate. @@ -68,4 +69,25 @@ public interface Aggregate<CHILD_TYPE extends Plan> extends UnaryPlan<CHILD_TYPE } return distinctArguments.build(); } + + /** everyDistinctArgumentNumIsOne */ + default boolean everyDistinctArgumentNumIsOne() { + AtomicBoolean hasDistinctArguments = new AtomicBoolean(false); + for (NamedExpression outputExpression : getOutputExpressions()) { + boolean distinctArgumentSizeNotOne = outputExpression.anyMatch(expr -> { + if (expr instanceof AggregateFunction) { + AggregateFunction aggFun = (AggregateFunction) expr; + if (aggFun.isDistinct()) { + hasDistinctArguments.set(true); + return aggFun.getDistinctArguments().size() != 1; + } + } + return false; + }); + if (distinctArgumentSizeNotOne) { + return false; + } + } + return hasDistinctArguments.get(); + } } diff --git a/regression-test/data/nereids_p0/aggregate/aggregate.out b/regression-test/data/nereids_p0/aggregate/aggregate.out index 9578f6bff09..9264ba994e8 100644 --- a/regression-test/data/nereids_p0/aggregate/aggregate.out +++ b/regression-test/data/nereids_p0/aggregate/aggregate.out @@ -685,3 +685,7 @@ TESTING AGAIN -- !having_with_limit -- 7 -32767.0 + +-- !four_phase_full_distribute -- +hello 1 1 +world 1 1 diff --git a/regression-test/suites/nereids_p0/aggregate/aggregate.groovy b/regression-test/suites/nereids_p0/aggregate/aggregate.groovy index 179146650a4..0558c00c181 100644 --- a/regression-test/suites/nereids_p0/aggregate/aggregate.groovy +++ b/regression-test/suites/nereids_p0/aggregate/aggregate.groovy @@ -347,4 +347,26 @@ suite("aggregate") { sql "insert into table_10_undef_partitions2_keys3_properties4_distributed_by5(pk,col_bigint_undef_signed,col_varchar_10__undef_signed,col_varchar_64__undef_signed) values (0,111,'from','t'),(1,null,'h','out'),(2,3814,'get','q'),(3,5166561111626303305,'s','right'),(4,2688963514917402600,'b','hey'),(5,-5065987944147755706,'p','mean'),(6,31061,'v','d'),(7,122,'the','t'),(8,-2882446,'going','a'),(9,-43,'y','a');" sql "SELECT MIN( `pk` ) FROM table_10_undef_partitions2_keys3_properties4_distributed_by5 WHERE ( col_varchar_64__undef_signed LIKE CONCAT ('come' , '%' ) OR col_varchar_10__undef_signed IN ( 'could' , 'was' , 'that' ) ) OR ( `pk` IS NULL OR ( `pk` <> 186 ) ) AND ( `pk` IS NOT NULL OR `pk` BETWEEN 255 AND -99 + 8 ) AND ( ( `pk` != 6 ) OR `pk` IS NULL );" + + sql "drop table if exists test_four_phase_full_distribute" + sql """CREATE TABLE `test_four_phase_full_distribute` ( + `id` INT NULL, + `age` INT NULL, + `name` VARCHAR(65533) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql "insert into test_four_phase_full_distribute values(1, 21, 'hello'), (2, 22, 'world')" + sql " sync " + order_qt_four_phase_full_distribute """select + /*+SET_VAR(disable_nereids_rules='TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT,THREE_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI,THREE_PHASE_AGGREGATE_WITH_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT')*/ + name, count(distinct name), count(distinct age) + from test_four_phase_full_distribute + group by name + """ } diff --git a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy index 19cac99c153..d3b418660ab 100644 --- a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy +++ b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy @@ -58,4 +58,6 @@ suite("agg_4_phase") { qt_4phase (test_sql) sql """select GROUP_CONCAT(distinct name, " ") from agg_4_phase_tbl;""" -} \ No newline at end of file + + sql """select /*+SET_VAR(disable_nereids_rules='TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,THREE_PHASE_AGGREGATE_WITH_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT')*/ GROUP_CONCAT(distinct name, " ") from agg_4_phase_tbl group by gender;""" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org