morrySnow commented on code in PR #45209:
URL: https://github.com/apache/doris/pull/45209#discussion_r1893703596


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java:
##########
@@ -1810,8 +1809,7 @@ private List<Expression> 
getHashAggregatePartitionExpressions(
 
     private AggregateFunction tryConvertToMultiDistinct(AggregateFunction 
function) {
         if (function instanceof Count && function.isDistinct()) {

Review Comment:
   ```java
   if (function instanceOf SupportMultiDistinct && function.isDistinct()) {
       return function.convertToMultiDistinct();
   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SupportMultiDistinct.java:
##########
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.agg;
+
+/** MultiDistinctTrait*/

Review Comment:
   could u add more detail comment to explain what is multi distinct aggregate 
function?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctSplit.java:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import 
org.apache.doris.nereids.rules.rewrite.DistinctSplit.DistinctSplitContext;
+import org.apache.doris.nereids.trees.copier.DeepCopierContext;
+import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * LogicalAggregate(output:count(distinct a) as c1, count(distinct b) as c2)
+ *   +--Plan
+ * ->
+ * LogicalCTEAnchor
+ *   +--LogicalCTEProducer
+ *     +--Plan
+ *   +--LogicalProject(c1, c2)
+ *     +--LogicalJoin
+ *       +--LogicalAggregate(output:count(distinct a))
+ *         +--LogicalCTEConsumer
+ *       +--LogicalAggregate(output:count(distinct b))
+ *         +--LogicalCTEConsumer
+ * */
+public class DistinctSplit extends DefaultPlanRewriter<DistinctSplitContext> 
implements CustomRewriter {
+    public static DistinctSplit INSTANCE = new DistinctSplit();
+
+    /**DistinctSplitContext*/
+    public static class DistinctSplitContext {
+        List<LogicalCTEProducer<? extends Plan>> cteProducerList;
+        StatementContext statementContext;
+        CascadesContext cascadesContext;
+
+        public DistinctSplitContext(StatementContext statementContext, 
CascadesContext cascadesContext) {
+            this.statementContext = statementContext;
+            this.cteProducerList = new ArrayList<>();
+            this.cascadesContext = cascadesContext;
+        }
+    }
+
+    @Override
+    public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+        DistinctSplitContext ctx = new DistinctSplitContext(
+                jobContext.getCascadesContext().getStatementContext(), 
jobContext.getCascadesContext());
+        plan = plan.accept(this, ctx);
+        for (int i = ctx.cteProducerList.size() - 1; i >= 0; i--) {
+            LogicalCTEProducer<? extends Plan> producer = 
ctx.cteProducerList.get(i);
+            plan = new LogicalCTEAnchor<>(producer.getCteId(), producer, plan);
+        }
+        return plan;
+    }
+
+    @Override
+    public Plan visitLogicalCTEAnchor(
+            LogicalCTEAnchor<? extends Plan, ? extends Plan> anchor, 
DistinctSplitContext ctx) {
+        Plan child1 = anchor.child(0).accept(this, ctx);
+        DistinctSplitContext consumerContext =
+                new DistinctSplitContext(ctx.statementContext, 
ctx.cascadesContext);
+        Plan child2 = anchor.child(1).accept(this, consumerContext);
+        for (int i = consumerContext.cteProducerList.size() - 1; i >= 0; i--) {
+            LogicalCTEProducer<? extends Plan> producer = 
consumerContext.cteProducerList.get(i);
+            child2 = new LogicalCTEAnchor<>(producer.getCteId(), producer, 
child2);
+        }
+        return anchor.withChildren(ImmutableList.of(child1, child2));
+    }
+
+    @Override
+    public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> agg, 
DistinctSplitContext ctx) {
+        List<Alias> distinctFuncWithAlias = new ArrayList<>();
+        List<Alias> otherAggFuncs = new ArrayList<>();
+        if (!needTransform((LogicalAggregate<Plan>) agg, 
distinctFuncWithAlias, otherAggFuncs)) {
+            return agg;
+        }
+
+        LogicalAggregate<Plan> cloneAgg = (LogicalAggregate<Plan>) 
LogicalPlanDeepCopier.INSTANCE
+                .deepCopy(agg, new DeepCopierContext());
+        LogicalCTEProducer<Plan> producer = new 
LogicalCTEProducer<>(ctx.statementContext.getNextCTEId(),
+                cloneAgg.child());
+        ctx.cteProducerList.add(producer);
+        Map<Slot, Slot> originToProducerSlot = new HashMap<>();
+        for (int i = 0; i < agg.child().getOutput().size(); ++i) {
+            Slot originSlot = agg.child().getOutput().get(i);
+            Slot cloneSlot = cloneAgg.child().getOutput().get(i);
+            originToProducerSlot.put(originSlot, cloneSlot);
+        }
+        distinctFuncWithAlias = ExpressionUtils.replace((List) 
distinctFuncWithAlias, originToProducerSlot);
+        otherAggFuncs = ExpressionUtils.replace((List) otherAggFuncs, 
originToProducerSlot);
+        // construct cte consumer and aggregate
+        List<LogicalAggregate<Plan>> newAggs = new ArrayList<>();
+        // All otherAggFuncs are placed in the first one
+        Map<Alias, Alias> newToOriginDistinctFuncAlias = new HashMap<>();
+        List<Expression> outputJoinGroupBys = new ArrayList<>();
+        for (int i = 0; i < distinctFuncWithAlias.size(); ++i) {
+            Expression distinctAggFunc = distinctFuncWithAlias.get(i).child(0);
+            LogicalCTEConsumer consumer = new 
LogicalCTEConsumer(ctx.statementContext.getNextRelationId(),
+                    producer.getCteId(), "", producer);
+            ctx.cascadesContext.putCTEIdToConsumer(consumer);
+            Map<Slot, Slot> producerToConsumerSlotMap = new HashMap<>();
+            for (Map.Entry<Slot, Slot> entry : 
consumer.getConsumerToProducerOutputMap().entrySet()) {
+                producerToConsumerSlotMap.put(entry.getValue(), 
entry.getKey());
+            }
+            List<Expression> replacedGroupBy = 
ExpressionUtils.replace(cloneAgg.getGroupByExpressions(),
+                    producerToConsumerSlotMap);
+            Expression newDistinctAggFunc = 
ExpressionUtils.replace(distinctAggFunc, producerToConsumerSlotMap);
+            List<NamedExpression> outputExpressions = replacedGroupBy.stream()
+                    .map(Slot.class::cast).collect(Collectors.toList());
+            Alias alias = new Alias(newDistinctAggFunc);
+            outputExpressions.add(alias);
+            if (i == 0) {
+                // save replacedGroupBy
+                outputJoinGroupBys.addAll(replacedGroupBy);
+            }
+            LogicalAggregate<Plan> newAgg = new 
LogicalAggregate<>(replacedGroupBy, outputExpressions, consumer);
+            newAggs.add(newAgg);
+            newToOriginDistinctFuncAlias.put(alias, 
distinctFuncWithAlias.get(i));
+        }
+        buildOtherAggFuncAggregate(otherAggFuncs, producer, ctx, cloneAgg, 
newToOriginDistinctFuncAlias, newAggs);
+        List<Expression> groupBy = agg.getGroupByExpressions();
+        LogicalJoin<Plan, Plan> join = constructJoin(newAggs, groupBy);
+        return constructProject(groupBy, newToOriginDistinctFuncAlias, 
outputJoinGroupBys, join);
+    }
+
+    private static void buildOtherAggFuncAggregate(List<Alias> otherAggFuncs, 
LogicalCTEProducer<Plan> producer,
+            DistinctSplitContext ctx, LogicalAggregate<Plan> cloneAgg, 
Map<Alias, Alias> newToOriginDistinctFuncAlias,
+            List<LogicalAggregate<Plan>> newAggs) {
+        if (otherAggFuncs.isEmpty()) {
+            return;
+        }
+        LogicalCTEConsumer consumer = new 
LogicalCTEConsumer(ctx.statementContext.getNextRelationId(),
+                producer.getCteId(), "", producer);
+        ctx.cascadesContext.putCTEIdToConsumer(consumer);
+        Map<Slot, Slot> producerToConsumerSlotMap = new HashMap<>();
+        for (Map.Entry<Slot, Slot> entry : 
consumer.getConsumerToProducerOutputMap().entrySet()) {
+            producerToConsumerSlotMap.put(entry.getValue(), entry.getKey());
+        }
+        List<Expression> replacedGroupBy = 
ExpressionUtils.replace(cloneAgg.getGroupByExpressions(),
+                producerToConsumerSlotMap);
+        List<NamedExpression> outputExpressions = replacedGroupBy.stream()
+                .map(Slot.class::cast).collect(Collectors.toList());
+        List<Expression> otherAggFuncAliases = otherAggFuncs.stream()
+                .map(e -> ExpressionUtils.replace(e, 
producerToConsumerSlotMap)).collect(Collectors.toList());
+        for (Expression otherAggFuncAlias : otherAggFuncAliases) {
+            // otherAggFunc is instance of Alias
+            Alias outputOtherFunc = new Alias(otherAggFuncAlias.child(0));
+            outputExpressions.add(outputOtherFunc);
+            newToOriginDistinctFuncAlias.put(outputOtherFunc, (Alias) 
otherAggFuncAlias);
+        }
+        LogicalAggregate<Plan> newAgg = new 
LogicalAggregate<>(replacedGroupBy, outputExpressions, consumer);
+        newAggs.add(newAgg);
+    }
+
+    private static boolean isDistinctMultiColumns(AggregateFunction func) {
+        if (func.arity() <= 1) {
+            return false;
+        }
+        for (int i = 1; i < func.arity(); ++i) {
+            // think about group_concat(distinct col_1, ',')
+            if (!(func.child(i) instanceof OrderExpression) && 
!func.child(i).getInputSlots().isEmpty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean needTransform(LogicalAggregate<Plan> agg, 
List<Alias> aliases, List<Alias> otherAggFuncs) {
+        // TODO with source repeat aggregate need to be supported in future
+        if (agg.getSourceRepeat().isPresent()) {
+            return false;
+        }
+        Set<Expression> distinctFunc = new HashSet<>();
+        boolean distinctMultiColumns = false;
+        for (NamedExpression namedExpression : agg.getOutputExpressions()) {
+            if (!(namedExpression instanceof Alias) || 
!(namedExpression.child(0) instanceof AggregateFunction)) {
+                continue;
+            }
+            AggregateFunction aggFunc = (AggregateFunction) 
namedExpression.child(0);
+            if (aggFunc instanceof SupportMultiDistinct && 
aggFunc.isDistinct()) {
+                aliases.add((Alias) namedExpression);
+                distinctFunc.add(aggFunc);
+                distinctMultiColumns = distinctMultiColumns || 
isDistinctMultiColumns(aggFunc);
+            } else {
+                otherAggFuncs.add((Alias) namedExpression);
+            }
+        }
+        if (distinctFunc.size() <= 1) {
+            return false;
+        }
+        if (!distinctMultiColumns && !agg.getGroupByExpressions().isEmpty()) {
+            return false;
+        }

Review Comment:
   add comment to explain this if statement



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctSplit.java:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import 
org.apache.doris.nereids.rules.rewrite.DistinctSplit.DistinctSplitContext;
+import org.apache.doris.nereids.trees.copier.DeepCopierContext;
+import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * LogicalAggregate(output:count(distinct a) as c1, count(distinct b) as c2)
+ *   +--Plan
+ * ->
+ * LogicalCTEAnchor
+ *   +--LogicalCTEProducer
+ *     +--Plan
+ *   +--LogicalProject(c1, c2)
+ *     +--LogicalJoin
+ *       +--LogicalAggregate(output:count(distinct a))
+ *         +--LogicalCTEConsumer
+ *       +--LogicalAggregate(output:count(distinct b))
+ *         +--LogicalCTEConsumer
+ * */
+public class DistinctSplit extends DefaultPlanRewriter<DistinctSplitContext> 
implements CustomRewriter {
+    public static DistinctSplit INSTANCE = new DistinctSplit();
+
+    /**DistinctSplitContext*/
+    public static class DistinctSplitContext {
+        List<LogicalCTEProducer<? extends Plan>> cteProducerList;
+        StatementContext statementContext;
+        CascadesContext cascadesContext;
+
+        public DistinctSplitContext(StatementContext statementContext, 
CascadesContext cascadesContext) {
+            this.statementContext = statementContext;
+            this.cteProducerList = new ArrayList<>();
+            this.cascadesContext = cascadesContext;
+        }
+    }
+
+    @Override
+    public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+        DistinctSplitContext ctx = new DistinctSplitContext(
+                jobContext.getCascadesContext().getStatementContext(), 
jobContext.getCascadesContext());
+        plan = plan.accept(this, ctx);
+        for (int i = ctx.cteProducerList.size() - 1; i >= 0; i--) {
+            LogicalCTEProducer<? extends Plan> producer = 
ctx.cteProducerList.get(i);
+            plan = new LogicalCTEAnchor<>(producer.getCteId(), producer, plan);
+        }
+        return plan;
+    }
+
+    @Override
+    public Plan visitLogicalCTEAnchor(
+            LogicalCTEAnchor<? extends Plan, ? extends Plan> anchor, 
DistinctSplitContext ctx) {
+        Plan child1 = anchor.child(0).accept(this, ctx);
+        DistinctSplitContext consumerContext =
+                new DistinctSplitContext(ctx.statementContext, 
ctx.cascadesContext);
+        Plan child2 = anchor.child(1).accept(this, consumerContext);
+        for (int i = consumerContext.cteProducerList.size() - 1; i >= 0; i--) {
+            LogicalCTEProducer<? extends Plan> producer = 
consumerContext.cteProducerList.get(i);
+            child2 = new LogicalCTEAnchor<>(producer.getCteId(), producer, 
child2);
+        }
+        return anchor.withChildren(ImmutableList.of(child1, child2));
+    }
+
+    @Override
+    public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> agg, 
DistinctSplitContext ctx) {
+        List<Alias> distinctFuncWithAlias = new ArrayList<>();
+        List<Alias> otherAggFuncs = new ArrayList<>();
+        if (!needTransform((LogicalAggregate<Plan>) agg, 
distinctFuncWithAlias, otherAggFuncs)) {
+            return agg;
+        }
+
+        LogicalAggregate<Plan> cloneAgg = (LogicalAggregate<Plan>) 
LogicalPlanDeepCopier.INSTANCE
+                .deepCopy(agg, new DeepCopierContext());
+        LogicalCTEProducer<Plan> producer = new 
LogicalCTEProducer<>(ctx.statementContext.getNextCTEId(),
+                cloneAgg.child());
+        ctx.cteProducerList.add(producer);
+        Map<Slot, Slot> originToProducerSlot = new HashMap<>();
+        for (int i = 0; i < agg.child().getOutput().size(); ++i) {
+            Slot originSlot = agg.child().getOutput().get(i);
+            Slot cloneSlot = cloneAgg.child().getOutput().get(i);
+            originToProducerSlot.put(originSlot, cloneSlot);
+        }
+        distinctFuncWithAlias = ExpressionUtils.replace((List) 
distinctFuncWithAlias, originToProducerSlot);
+        otherAggFuncs = ExpressionUtils.replace((List) otherAggFuncs, 
originToProducerSlot);
+        // construct cte consumer and aggregate
+        List<LogicalAggregate<Plan>> newAggs = new ArrayList<>();
+        // All otherAggFuncs are placed in the first one
+        Map<Alias, Alias> newToOriginDistinctFuncAlias = new HashMap<>();
+        List<Expression> outputJoinGroupBys = new ArrayList<>();
+        for (int i = 0; i < distinctFuncWithAlias.size(); ++i) {
+            Expression distinctAggFunc = distinctFuncWithAlias.get(i).child(0);
+            LogicalCTEConsumer consumer = new 
LogicalCTEConsumer(ctx.statementContext.getNextRelationId(),
+                    producer.getCteId(), "", producer);
+            ctx.cascadesContext.putCTEIdToConsumer(consumer);
+            Map<Slot, Slot> producerToConsumerSlotMap = new HashMap<>();
+            for (Map.Entry<Slot, Slot> entry : 
consumer.getConsumerToProducerOutputMap().entrySet()) {
+                producerToConsumerSlotMap.put(entry.getValue(), 
entry.getKey());
+            }
+            List<Expression> replacedGroupBy = 
ExpressionUtils.replace(cloneAgg.getGroupByExpressions(),
+                    producerToConsumerSlotMap);
+            Expression newDistinctAggFunc = 
ExpressionUtils.replace(distinctAggFunc, producerToConsumerSlotMap);
+            List<NamedExpression> outputExpressions = replacedGroupBy.stream()
+                    .map(Slot.class::cast).collect(Collectors.toList());
+            Alias alias = new Alias(newDistinctAggFunc);
+            outputExpressions.add(alias);
+            if (i == 0) {
+                // save replacedGroupBy
+                outputJoinGroupBys.addAll(replacedGroupBy);
+            }
+            LogicalAggregate<Plan> newAgg = new 
LogicalAggregate<>(replacedGroupBy, outputExpressions, consumer);
+            newAggs.add(newAgg);
+            newToOriginDistinctFuncAlias.put(alias, 
distinctFuncWithAlias.get(i));
+        }
+        buildOtherAggFuncAggregate(otherAggFuncs, producer, ctx, cloneAgg, 
newToOriginDistinctFuncAlias, newAggs);
+        List<Expression> groupBy = agg.getGroupByExpressions();
+        LogicalJoin<Plan, Plan> join = constructJoin(newAggs, groupBy);
+        return constructProject(groupBy, newToOriginDistinctFuncAlias, 
outputJoinGroupBys, join);
+    }
+
+    private static void buildOtherAggFuncAggregate(List<Alias> otherAggFuncs, 
LogicalCTEProducer<Plan> producer,
+            DistinctSplitContext ctx, LogicalAggregate<Plan> cloneAgg, 
Map<Alias, Alias> newToOriginDistinctFuncAlias,
+            List<LogicalAggregate<Plan>> newAggs) {
+        if (otherAggFuncs.isEmpty()) {
+            return;
+        }
+        LogicalCTEConsumer consumer = new 
LogicalCTEConsumer(ctx.statementContext.getNextRelationId(),
+                producer.getCteId(), "", producer);
+        ctx.cascadesContext.putCTEIdToConsumer(consumer);
+        Map<Slot, Slot> producerToConsumerSlotMap = new HashMap<>();
+        for (Map.Entry<Slot, Slot> entry : 
consumer.getConsumerToProducerOutputMap().entrySet()) {
+            producerToConsumerSlotMap.put(entry.getValue(), entry.getKey());
+        }
+        List<Expression> replacedGroupBy = 
ExpressionUtils.replace(cloneAgg.getGroupByExpressions(),
+                producerToConsumerSlotMap);
+        List<NamedExpression> outputExpressions = replacedGroupBy.stream()
+                .map(Slot.class::cast).collect(Collectors.toList());
+        List<Expression> otherAggFuncAliases = otherAggFuncs.stream()
+                .map(e -> ExpressionUtils.replace(e, 
producerToConsumerSlotMap)).collect(Collectors.toList());
+        for (Expression otherAggFuncAlias : otherAggFuncAliases) {
+            // otherAggFunc is instance of Alias
+            Alias outputOtherFunc = new Alias(otherAggFuncAlias.child(0));
+            outputExpressions.add(outputOtherFunc);
+            newToOriginDistinctFuncAlias.put(outputOtherFunc, (Alias) 
otherAggFuncAlias);
+        }
+        LogicalAggregate<Plan> newAgg = new 
LogicalAggregate<>(replacedGroupBy, outputExpressions, consumer);
+        newAggs.add(newAgg);
+    }
+
+    private static boolean isDistinctMultiColumns(AggregateFunction func) {
+        if (func.arity() <= 1) {
+            return false;
+        }
+        for (int i = 1; i < func.arity(); ++i) {
+            // think about group_concat(distinct col_1, ',')
+            if (!(func.child(i) instanceof OrderExpression) && 
!func.child(i).getInputSlots().isEmpty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean needTransform(LogicalAggregate<Plan> agg, 
List<Alias> aliases, List<Alias> otherAggFuncs) {
+        // TODO with source repeat aggregate need to be supported in future
+        if (agg.getSourceRepeat().isPresent()) {
+            return false;

Review Comment:
   should check multiple count distinct with multi parameters and throw 
exception here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to