924060929 commented on code in PR #45209:
URL: https://github.com/apache/doris/pull/45209#discussion_r1896497689


##########
regression-test/suites/nereids_rules_p0/mv/dimension/dimension_2_4.groovy:
##########
@@ -577,7 +577,7 @@ suite("partition_mv_rewrite_dimension_2_4") {
             count(distinct case when O_SHIPPRIORITY > 2 and o_orderkey IN (2) 
then o_custkey else null end) as cnt_2 
             from orders_2_4  
             where o_orderkey > (-3) + 5 """
-    mv_rewrite_success(sql_stmt_13, mv_name_13)
+    mv_rewrite_fail(sql_stmt_13, mv_name_13)

Review Comment:
   this case meet rollback, or expect fail?



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctSplitTest.java:
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.util.MatchingUtils;
+import org.apache.doris.nereids.util.MemoPatternMatchSupported;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Test;
+
+public class DistinctSplitTest extends TestWithFeService implements 
MemoPatternMatchSupported {
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+        createTable("create table test.test_distinct_multi(a int, b int, c 
int, d varchar(10), e date)"
+                + "distributed by hash(a) properties('replication_num'='1');");
+        connectContext.setDatabase("test");
+        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+    }
+
+    @Test
+    void multiCountWithoutGby() {
+        String sql = "select count(distinct b), count(distinct a) from 
test_distinct_multi";
+        PlanChecker.from(connectContext).checkExplain(sql, planner -> {
+            Plan plan = planner.getOptimizedPlan();
+            MatchingUtils.assertMatches(plan, 
physicalCTEAnchor(physicalCTEProducer(any()), 
physicalResultSink(physicalProject(physicalNestedLoopJoin(

Review Comment:
   Use line break and indentation to format code to see the shape of plan



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctSplit.java:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import 
org.apache.doris.nereids.rules.rewrite.DistinctSplit.DistinctSplitContext;
+import org.apache.doris.nereids.trees.copier.DeepCopierContext;
+import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * LogicalAggregate(output:count(distinct a) as c1, count(distinct b) as c2)
+ *   +--Plan
+ * ->
+ * LogicalCTEAnchor
+ *   +--LogicalCTEProducer
+ *     +--Plan
+ *   +--LogicalProject(c1, c2)
+ *     +--LogicalJoin
+ *       +--LogicalAggregate(output:count(distinct a))
+ *         +--LogicalCTEConsumer
+ *       +--LogicalAggregate(output:count(distinct b))
+ *         +--LogicalCTEConsumer
+ * */
+public class DistinctSplit extends DefaultPlanRewriter<DistinctSplitContext> 
implements CustomRewriter {
+    public static DistinctSplit INSTANCE = new DistinctSplit();
+
+    /**DistinctSplitContext*/
+    public static class DistinctSplitContext {
+        List<LogicalCTEProducer<? extends Plan>> cteProducerList;
+        StatementContext statementContext;
+        CascadesContext cascadesContext;
+
+        public DistinctSplitContext(StatementContext statementContext, 
CascadesContext cascadesContext) {
+            this.statementContext = statementContext;
+            this.cteProducerList = new ArrayList<>();
+            this.cascadesContext = cascadesContext;
+        }
+    }
+
+    @Override
+    public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+        DistinctSplitContext ctx = new DistinctSplitContext(
+                jobContext.getCascadesContext().getStatementContext(), 
jobContext.getCascadesContext());
+        plan = plan.accept(this, ctx);
+        for (int i = ctx.cteProducerList.size() - 1; i >= 0; i--) {
+            LogicalCTEProducer<? extends Plan> producer = 
ctx.cteProducerList.get(i);
+            plan = new LogicalCTEAnchor<>(producer.getCteId(), producer, plan);
+        }
+        return plan;
+    }
+
+    @Override
+    public Plan visitLogicalCTEAnchor(
+            LogicalCTEAnchor<? extends Plan, ? extends Plan> anchor, 
DistinctSplitContext ctx) {
+        Plan child1 = anchor.child(0).accept(this, ctx);
+        DistinctSplitContext consumerContext =
+                new DistinctSplitContext(ctx.statementContext, 
ctx.cascadesContext);
+        Plan child2 = anchor.child(1).accept(this, consumerContext);
+        for (int i = consumerContext.cteProducerList.size() - 1; i >= 0; i--) {
+            LogicalCTEProducer<? extends Plan> producer = 
consumerContext.cteProducerList.get(i);
+            child2 = new LogicalCTEAnchor<>(producer.getCteId(), producer, 
child2);
+        }
+        return anchor.withChildren(ImmutableList.of(child1, child2));
+    }
+
+    @Override
+    public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> agg, 
DistinctSplitContext ctx) {
+        List<Alias> distinctFuncWithAlias = new ArrayList<>();
+        List<Alias> otherAggFuncs = new ArrayList<>();
+        if (!needTransform((LogicalAggregate<Plan>) agg, 
distinctFuncWithAlias, otherAggFuncs)) {
+            return agg;
+        }
+
+        LogicalAggregate<Plan> cloneAgg = (LogicalAggregate<Plan>) 
LogicalPlanDeepCopier.INSTANCE
+                .deepCopy(agg, new DeepCopierContext());
+        LogicalCTEProducer<Plan> producer = new 
LogicalCTEProducer<>(ctx.statementContext.getNextCTEId(),
+                cloneAgg.child());
+        ctx.cteProducerList.add(producer);
+        Map<Slot, Slot> originToProducerSlot = new HashMap<>();
+        for (int i = 0; i < agg.child().getOutput().size(); ++i) {
+            Slot originSlot = agg.child().getOutput().get(i);
+            Slot cloneSlot = cloneAgg.child().getOutput().get(i);
+            originToProducerSlot.put(originSlot, cloneSlot);
+        }
+        distinctFuncWithAlias = ExpressionUtils.replace((List) 
distinctFuncWithAlias, originToProducerSlot);
+        otherAggFuncs = ExpressionUtils.replace((List) otherAggFuncs, 
originToProducerSlot);
+        // construct cte consumer and aggregate
+        List<LogicalAggregate<Plan>> newAggs = new ArrayList<>();
+        // All otherAggFuncs are placed in the first one
+        Map<Alias, Alias> newToOriginDistinctFuncAlias = new HashMap<>();
+        List<Expression> outputJoinGroupBys = new ArrayList<>();
+        for (int i = 0; i < distinctFuncWithAlias.size(); ++i) {
+            Expression distinctAggFunc = distinctFuncWithAlias.get(i).child(0);
+            LogicalCTEConsumer consumer = new 
LogicalCTEConsumer(ctx.statementContext.getNextRelationId(),
+                    producer.getCteId(), "", producer);
+            ctx.cascadesContext.putCTEIdToConsumer(consumer);
+            Map<Slot, Slot> producerToConsumerSlotMap = new HashMap<>();
+            for (Map.Entry<Slot, Slot> entry : 
consumer.getConsumerToProducerOutputMap().entrySet()) {
+                producerToConsumerSlotMap.put(entry.getValue(), 
entry.getKey());
+            }
+            List<Expression> replacedGroupBy = 
ExpressionUtils.replace(cloneAgg.getGroupByExpressions(),
+                    producerToConsumerSlotMap);
+            Expression newDistinctAggFunc = 
ExpressionUtils.replace(distinctAggFunc, producerToConsumerSlotMap);
+            List<NamedExpression> outputExpressions = replacedGroupBy.stream()
+                    .map(Slot.class::cast).collect(Collectors.toList());
+            Alias alias = new Alias(newDistinctAggFunc);
+            outputExpressions.add(alias);
+            if (i == 0) {
+                // save replacedGroupBy
+                outputJoinGroupBys.addAll(replacedGroupBy);
+            }
+            LogicalAggregate<Plan> newAgg = new 
LogicalAggregate<>(replacedGroupBy, outputExpressions, consumer);
+            newAggs.add(newAgg);
+            newToOriginDistinctFuncAlias.put(alias, 
distinctFuncWithAlias.get(i));

Review Comment:
   It seems like some duplicate code exists between non-distinct-aggregte and 
distinct-aggregate, we should reuse it



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctSplit.java:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import 
org.apache.doris.nereids.rules.rewrite.DistinctSplit.DistinctSplitContext;
+import org.apache.doris.nereids.trees.copier.DeepCopierContext;
+import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * LogicalAggregate(output:count(distinct a) as c1, count(distinct b) as c2)
+ *   +--Plan
+ * ->
+ * LogicalCTEAnchor
+ *   +--LogicalCTEProducer
+ *     +--Plan
+ *   +--LogicalProject(c1, c2)
+ *     +--LogicalJoin
+ *       +--LogicalAggregate(output:count(distinct a))
+ *         +--LogicalCTEConsumer
+ *       +--LogicalAggregate(output:count(distinct b))
+ *         +--LogicalCTEConsumer
+ * */
+public class DistinctSplit extends DefaultPlanRewriter<DistinctSplitContext> 
implements CustomRewriter {

Review Comment:
   `SplitMultiDistinct` is more readable



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java:
##########
@@ -550,6 +552,9 @@ private static List<RewriteJob> getWholeTreeRewriteJobs(
                     rewriteJobs.addAll(jobs(topic("or expansion",
                             custom(RuleType.OR_EXPANSION, () -> 
OrExpansion.INSTANCE))));
                 }
+                rewriteJobs.addAll(jobs(topic("distinct split",
+                        custom(RuleType.DISTINCT_SPLIT, () -> 
DistinctSplit.INSTANCE))));

Review Comment:
   change to RuleType.SPLIT_MULTI_DISTINCT



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java:
##########
@@ -444,6 +445,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
                                 new CollectCteConsumerOutput()
                         )
                 ),
+                // topic("distinct split", topDown(new DistinctSplit())),

Review Comment:
   remove this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to