This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 03f1cbde7ae [enhancement](Nereids) support 4 phases distinct aggregate 
with full distribution (#35871)
03f1cbde7ae is described below

commit 03f1cbde7aea4aca9a1a2ac5b3ef43e250d2ca92
Author: 924060929 <924060...@qq.com>
AuthorDate: Thu Jun 6 11:27:34 2024 +0800

    [enhancement](Nereids) support 4 phases distinct aggregate with full 
distribution (#35871)
    
    The origin implementation of 4 phases distinct aggregate only support the 
pattern which not contains `group by`, and only one distinct aggregate function
    
    for example:
    ```sql
    select count(distinct sex), sum(age)
    from student
    ```
    
    This pr complement the 4 phases distinct aggregate with full distribution, 
to avoid data skew in the `group by`.
    
    for example
    ```sql
    select sex, sum(distinct age)
    from student
    group by sex;
    ```
    The sex only contains two distinct values, `male` and `female`, and the 
table store millions rows.
    Shuffle by the `sex` cause the data skew and lots of instances process 
empty rows.
    
    The 4 phase aggregate shuffle `sex, age` to distinct rows first, so more 
instances can do parallel distinct, the plan shape will like this:
    ```
    
    PhysicalAggregate(groupBy=[sex], output=[sex, sum(partial_sum(age))], 
mode=BUFFER_TO_RESULT)
                                            |
                             PhysicalDistribute(columns=[sex])
                                            |
    PhysicalAggregate(groupBy=[sex], output=[sex, partial_sum(age)], 
mode=INPUT_TO_BUFFER)
                                            |
        PhysicalAggregate(groupBy=[sex, age], output=[sex, age], 
mode=BUFFER_TO_BUFFER)
                                            |
                             PhysicalDistribute(columns=[sex, age])   # more 
columns to shuffle avoid data skew
                                            |
    PhysicalAggregate(groupBy=[sex, age], output=[sex, age], 
mode=INPUT_TO_BUFFER)
                                            |
                              PhysicalOlapScan(name=student)
    ```
---
 .../properties/ChildrenPropertiesRegulator.java    |  10 --
 .../org/apache/doris/nereids/rules/RuleType.java   |   1 +
 .../rules/implementation/AggregateStrategies.java  | 119 ++++++++++++++++-----
 .../nereids/trees/plans/algebra/Aggregate.java     |  22 ++++
 .../data/nereids_p0/aggregate/aggregate.out        |   4 +
 .../suites/nereids_p0/aggregate/aggregate.groovy   |  22 ++++
 .../suites/nereids_syntax_p0/agg_4_phase.groovy    |   4 +-
 7 files changed, 144 insertions(+), 38 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 038e2646a6d..3beed014aac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -113,16 +113,6 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
             // this means one stage gather agg, usually bad pattern
             return false;
         }
-        // forbid three or four stage distinct agg inter by distribute
-        if (agg.getAggMode() == AggMode.BUFFER_TO_BUFFER && 
children.get(0).getPlan() instanceof PhysicalDistribute) {
-            // if distinct without group by key, we prefer three or four stage 
distinct agg
-            // because the second phase of multi-distinct only have one 
instance, and it is slow generally.
-            if (agg.getGroupByExpressions().size() == 1
-                    && agg.getOutputExpressions().size() == 1) {
-                return true;
-            }
-            return false;
-        }
 
         // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle
         // TODO: this is forbid good plan after cte reuse by mistake
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 39acb6e8319..1b78d5bbb33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -448,6 +448,7 @@ public enum RuleType {
     TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT(RuleTypeClass.IMPLEMENTATION),
     THREE_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
     FOUR_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
+    
FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_UNION_TO_PHYSICAL_UNION(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_EXCEPT_TO_PHYSICAL_EXCEPT(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_INTERSECT_TO_PHYSICAL_INTERSECT(RuleTypeClass.IMPLEMENTATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 937515ad8b5..eae17b1a99e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -75,6 +75,7 @@ import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.TypeCoercionUtils;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -303,15 +304,89 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
             //         .thenApplyMulti(ctx -> 
twoPhaseAggregateWithDistinct(ctx.root, ctx.connectContext))
             // ),
             RuleType.THREE_PHASE_AGGREGATE_WITH_DISTINCT.build(
-                    basePattern
-                            .when(agg -> agg.getDistinctArguments().size() == 
1)
-                            .thenApplyMulti(ctx -> 
threePhaseAggregateWithDistinct(ctx.root, ctx.connectContext))
+                basePattern
+                    .when(agg -> agg.getDistinctArguments().size() == 1)
+                    .thenApplyMulti(ctx -> 
threePhaseAggregateWithDistinct(ctx.root, ctx.connectContext))
             ),
+            /*
+             * sql:
+             * select count(distinct name), sum(age) from student;
+             * <p>
+             * 4 phase plan
+             * DISTINCT_GLOBAL(BUFFER_TO_RESULT, groupBy(),
+             *          output[count(partial_count(name)), 
sum(partial_sum(partial_sum(age)))],
+             *          GATHER)
+             * +--DISTINCT_LOCAL(INPUT_TO_BUFFER, groupBy(),
+             *          output(partial_count(name), 
partial_sum(partial_sum(age))),
+             *          hash distribute by name)
+             *    +--GLOBAL(BUFFER_TO_BUFFER, groupBy(name),
+             *          output(name, partial_sum(age)),
+             *          hash_distribute by name)
+             *       +--LOCAL(INPUT_TO_BUFFER, groupBy(name), output(name, 
partial_sum(age)))
+             *          +--scan(name, age)
+             */
             RuleType.FOUR_PHASE_AGGREGATE_WITH_DISTINCT.build(
-                    basePattern
-                            .when(agg -> agg.getDistinctArguments().size() == 
1)
-                            .when(agg -> agg.getGroupByExpressions().isEmpty())
-                            .thenApplyMulti(ctx -> 
fourPhaseAggregateWithDistinct(ctx.root, ctx.connectContext))
+                basePattern
+                    .when(agg -> agg.getDistinctArguments().size() == 1)
+                    .when(agg -> agg.getGroupByExpressions().isEmpty())
+                    .thenApplyMulti(ctx -> {
+                        Function<List<Expression>, RequireProperties> 
secondPhaseRequireDistinctHash =
+                                groupByAndDistinct -> RequireProperties.of(
+                                        PhysicalProperties.createHash(
+                                                
ctx.root.getDistinctArguments(), ShuffleType.REQUIRE
+                                        )
+                                );
+                        Function<LogicalAggregate<? extends Plan>, 
RequireProperties> fourPhaseRequireGather =
+                                agg -> 
RequireProperties.of(PhysicalProperties.GATHER);
+                        return fourPhaseAggregateWithDistinct(
+                                ctx.root, ctx.connectContext,
+                                secondPhaseRequireDistinctHash, 
fourPhaseRequireGather
+                        );
+                    })
+            ),
+            /*
+             * sql:
+             * select age, count(distinct name) from student group by age;
+             * <p>
+             * 4 phase plan
+             * DISTINCT_GLOBAL(BUFFER_TO_RESULT, groupBy(age),
+             *          output[age, sum(partial_count(name))],
+             *          hash distribute by name)
+             * +--DISTINCT_LOCAL(INPUT_TO_BUFFER, groupBy(age),
+             *          output(age, partial_count(name)),
+             *          hash distribute by age, name)
+             *    +--GLOBAL(BUFFER_TO_BUFFER, groupBy(age, name),
+             *          output(age, name),
+             *          hash_distribute by age, name)
+             *       +--LOCAL(INPUT_TO_BUFFER, groupBy(age, name), output(age, 
name))
+             *          +--scan(age, name)
+             */
+            
RuleType.FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE.build(
+                basePattern
+                    .when(agg -> agg.everyDistinctArgumentNumIsOne() && 
!agg.getGroupByExpressions().isEmpty())
+                    .when(agg ->
+                        ImmutableSet.builder()
+                            .addAll(agg.getGroupByExpressions())
+                            .addAll(agg.getDistinctArguments())
+                            .build().size() > 
agg.getGroupByExpressions().size()
+                    )
+                    .thenApplyMulti(ctx -> {
+                        Function<List<Expression>, RequireProperties> 
secondPhaseRequireGroupByAndDistinctHash =
+                                groupByAndDistinct -> RequireProperties.of(
+                                        
PhysicalProperties.createHash(groupByAndDistinct, ShuffleType.REQUIRE)
+                                );
+
+                        Function<LogicalAggregate<? extends Plan>, 
RequireProperties> fourPhaseRequireGroupByHash =
+                                agg -> RequireProperties.of(
+                                        PhysicalProperties.createHash(
+                                                agg.getGroupByExpressions(), 
ShuffleType.REQUIRE
+                                        )
+                                );
+                        return fourPhaseAggregateWithDistinct(
+                                ctx.root, ctx.connectContext,
+                                secondPhaseRequireGroupByAndDistinctHash, 
fourPhaseRequireGroupByHash
+                        );
+                    })
             )
         );
     }
@@ -1690,19 +1765,10 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
         return connectContext == null || 
connectContext.getSessionVariable().enablePushDownNoGroupAgg();
     }
 
-    /**
-     * sql:
-     * select count(distinct name), sum(age) from student;
-     * <p>
-     * 4 phase plan
-     * DISTINCT_GLOBAL, BUFFER_TO_RESULT groupBy(), output[count(name), 
sum(age#5)], [GATHER]
-     * +--DISTINCT_LOCAL, INPUT_TO_BUFFER, groupBy()), output(count(name), 
partial_sum(age)), hash distribute by name
-     *    +--GLOBAL, BUFFER_TO_BUFFER, groupBy(name), output(name, 
partial_sum(age)), hash_distribute by name
-     *       +--LOCAL, INPUT_TO_BUFFER, groupBy(name), output(name, 
partial_sum(age))
-     *          +--scan(name, age)
-     */
     private List<PhysicalHashAggregate<? extends Plan>> 
fourPhaseAggregateWithDistinct(
-            LogicalAggregate<? extends Plan> logicalAgg, ConnectContext 
connectContext) {
+            LogicalAggregate<? extends Plan> logicalAgg, ConnectContext 
connectContext,
+            Function<List<Expression>, RequireProperties> 
secondPhaseRequireSupplier,
+            Function<LogicalAggregate<? extends Plan>, RequireProperties> 
fourPhaseRequireSupplier) {
         boolean couldBanned = couldConvertToMulti(logicalAgg);
 
         Set<AggregateFunction> aggregateFunctions = 
logicalAgg.getAggregateFunctions();
@@ -1775,16 +1841,13 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
             globalAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
         }
 
-        RequireProperties requireGather = 
RequireProperties.of(PhysicalProperties.GATHER);
-
-        RequireProperties requireDistinctHash = RequireProperties.of(
-                
PhysicalProperties.createHash(logicalAgg.getDistinctArguments(), 
ShuffleType.REQUIRE));
+        RequireProperties secondPhaseRequire = 
secondPhaseRequireSupplier.apply(localAggGroupBy);
 
         //phase 2
         PhysicalHashAggregate<? extends Plan> anyLocalHashGlobalAgg = new 
PhysicalHashAggregate<>(
                 localAggGroupBy, globalAggOutput, 
Optional.of(ImmutableList.copyOf(logicalAgg.getDistinctArguments())),
                 bufferToBufferParam, false, logicalAgg.getLogicalProperties(),
-                requireDistinctHash, anyLocalAgg);
+                secondPhaseRequire, anyLocalAgg);
 
         // phase 3
         AggregateParam distinctLocalParam = new AggregateParam(
@@ -1828,7 +1891,7 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
         PhysicalHashAggregate<? extends Plan> distinctLocal = new 
PhysicalHashAggregate<>(
                 logicalAgg.getGroupByExpressions(), localDistinctOutput, 
Optional.empty(),
                 distinctLocalParam, false, logicalAgg.getLogicalProperties(),
-                requireDistinctHash, anyLocalHashGlobalAgg);
+                secondPhaseRequire, anyLocalHashGlobalAgg);
 
         //phase 4
         AggregateParam distinctGlobalParam = new AggregateParam(
@@ -1842,7 +1905,7 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                     if (aggregateFunction.isDistinct()) {
                         Set<Expression> aggChild = 
Sets.newLinkedHashSet(aggregateFunction.children());
                         Preconditions.checkArgument(aggChild.size() == 1
-                                || 
aggregateFunction.getDistinctArguments().size() == 1,
+                                        || 
aggregateFunction.getDistinctArguments().size() == 1,
                                 "cannot process more than one child in 
aggregate distinct function: "
                                         + aggregateFunction);
                         AggregateFunction nonDistinct = aggregateFunction
@@ -1862,10 +1925,12 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
             });
             globalDistinctOutput.add(outputExprPhase4);
         }
+
+        RequireProperties fourPhaseRequire = 
fourPhaseRequireSupplier.apply(logicalAgg);
         PhysicalHashAggregate<? extends Plan> distinctGlobal = new 
PhysicalHashAggregate<>(
                 logicalAgg.getGroupByExpressions(), globalDistinctOutput, 
Optional.empty(),
                 distinctGlobalParam, false, logicalAgg.getLogicalProperties(),
-                requireGather, distinctLocal);
+                fourPhaseRequire, distinctLocal);
 
         return ImmutableList.<PhysicalHashAggregate<? extends Plan>>builder()
                 .add(distinctGlobal)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
index e7d09b8cf8b..acce8eef309 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Common interface for logical/physical Aggregate.
@@ -68,4 +69,25 @@ public interface Aggregate<CHILD_TYPE extends Plan> extends 
UnaryPlan<CHILD_TYPE
         }
         return distinctArguments.build();
     }
+
+    /** everyDistinctArgumentNumIsOne */
+    default boolean everyDistinctArgumentNumIsOne() {
+        AtomicBoolean hasDistinctArguments = new AtomicBoolean(false);
+        for (NamedExpression outputExpression : getOutputExpressions()) {
+            boolean distinctArgumentSizeNotOne = 
outputExpression.anyMatch(expr -> {
+                if (expr instanceof AggregateFunction) {
+                    AggregateFunction aggFun = (AggregateFunction) expr;
+                    if (aggFun.isDistinct()) {
+                        hasDistinctArguments.set(true);
+                        return aggFun.getDistinctArguments().size() != 1;
+                    }
+                }
+                return false;
+            });
+            if (distinctArgumentSizeNotOne) {
+                return false;
+            }
+        }
+        return hasDistinctArguments.get();
+    }
 }
diff --git a/regression-test/data/nereids_p0/aggregate/aggregate.out 
b/regression-test/data/nereids_p0/aggregate/aggregate.out
index 9578f6bff09..9264ba994e8 100644
--- a/regression-test/data/nereids_p0/aggregate/aggregate.out
+++ b/regression-test/data/nereids_p0/aggregate/aggregate.out
@@ -685,3 +685,7 @@ TESTING     AGAIN
 
 -- !having_with_limit --
 7      -32767.0
+
+-- !four_phase_full_distribute --
+hello  1       1
+world  1       1
diff --git a/regression-test/suites/nereids_p0/aggregate/aggregate.groovy 
b/regression-test/suites/nereids_p0/aggregate/aggregate.groovy
index 179146650a4..0558c00c181 100644
--- a/regression-test/suites/nereids_p0/aggregate/aggregate.groovy
+++ b/regression-test/suites/nereids_p0/aggregate/aggregate.groovy
@@ -347,4 +347,26 @@ suite("aggregate") {
     sql "insert into 
table_10_undef_partitions2_keys3_properties4_distributed_by5(pk,col_bigint_undef_signed,col_varchar_10__undef_signed,col_varchar_64__undef_signed)
 values 
(0,111,'from','t'),(1,null,'h','out'),(2,3814,'get','q'),(3,5166561111626303305,'s','right'),(4,2688963514917402600,'b','hey'),(5,-5065987944147755706,'p','mean'),(6,31061,'v','d'),(7,122,'the','t'),(8,-2882446,'going','a'),(9,-43,'y','a');"
 
     sql "SELECT MIN( `pk` ) FROM 
table_10_undef_partitions2_keys3_properties4_distributed_by5  WHERE ( 
col_varchar_64__undef_signed  LIKE CONCAT ('come' , '%' ) OR 
col_varchar_10__undef_signed  IN ( 'could' , 'was' , 'that' ) ) OR ( `pk` IS  
NULL OR  ( `pk` <> 186 ) ) AND ( `pk` IS NOT NULL OR `pk`  BETWEEN 255 AND -99 
+ 8 ) AND (  ( `pk` != 6 ) OR `pk` IS  NULL );"
+
+    sql "drop table if exists test_four_phase_full_distribute"
+    sql """CREATE TABLE `test_four_phase_full_distribute` (
+          `id` INT NULL,
+          `age` INT NULL,
+          `name` VARCHAR(65533) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );"""
+
+    sql "insert into test_four_phase_full_distribute values(1, 21, 'hello'), 
(2, 22, 'world')"
+    sql " sync "
+    order_qt_four_phase_full_distribute """select
+        
/*+SET_VAR(disable_nereids_rules='TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT,THREE_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI,THREE_PHASE_AGGREGATE_WITH_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT')*/
+        name, count(distinct name), count(distinct age)
+        from test_four_phase_full_distribute
+        group by name
+        """
 }
diff --git a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy 
b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
index 19cac99c153..d3b418660ab 100644
--- a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
+++ b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
@@ -58,4 +58,6 @@ suite("agg_4_phase") {
     qt_4phase (test_sql)
 
     sql """select GROUP_CONCAT(distinct name, " ") from agg_4_phase_tbl;"""
-}
\ No newline at end of file
+
+    sql """select 
/*+SET_VAR(disable_nereids_rules='TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,THREE_PHASE_AGGREGATE_WITH_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT')*/
 GROUP_CONCAT(distinct name, " ") from agg_4_phase_tbl group by gender;"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to