This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 18b5f70a7c0 [Bug](materialized-view) enable rewrite on select 
materialized index with aggregate mode (#24691)
18b5f70a7c0 is described below

commit 18b5f70a7c0ee565df5208182d80b7a8d6d6ece9
Author: Pxl <pxl...@qq.com>
AuthorDate: Wed Sep 27 11:30:36 2023 +0800

    [Bug](materialized-view) enable rewrite on select materialized index with 
aggregate mode (#24691)
    
    enable rewrite on select materialized index with aggregate mode
---
 .../mv/SelectMaterializedIndexWithAggregate.java   | 157 ++++++++-------------
 .../test_o2.out}                                   |   7 +-
 .../testCountDistinctToBitmap.out                  |   8 ++
 .../suites/mv_p0/test_o2/test_o2.groovy            |  60 ++++++++
 .../testCountDistinctToBitmap.groovy               |  30 ++++
 5 files changed, 156 insertions(+), 106 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java
index 398f11b3765..abc85eb3f16 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java
@@ -30,6 +30,7 @@ import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
+import 
org.apache.doris.nereids.rules.rewrite.mv.AbstractSelectMaterializedIndexRule.SlotContext;
 import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Cast;
 import org.apache.doris.nereids.trees.expressions.ExprId;
@@ -65,6 +66,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.types.BigIntType;
+import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.types.VarcharType;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.planner.PlanNode;
@@ -657,12 +659,8 @@ public class SelectMaterializedIndexWithAggregate extends 
AbstractSelectMaterial
      * 2. filter indexes that have all the required columns.
      * 3. select best index from all the candidate indexes that could use.
      */
-    private SelectResult select(
-            LogicalOlapScan scan,
-            Set<Slot> requiredScanOutput,
-            Set<Expression> predicates,
-            List<AggregateFunction> aggregateFunctions,
-            List<Expression> groupingExprs,
+    private SelectResult select(LogicalOlapScan scan, Set<Slot> 
requiredScanOutput, Set<Expression> predicates,
+            List<AggregateFunction> aggregateFunctions, List<Expression> 
groupingExprs,
             Set<? extends Expression> requiredExpr) {
         // remove virtual slot for grouping sets.
         Set<Slot> nonVirtualRequiredScanOutput = requiredScanOutput.stream()
@@ -677,105 +675,57 @@ public class SelectMaterializedIndexWithAggregate 
extends AbstractSelectMaterial
         }
 
         OlapTable table = scan.getTable();
-        switch (scan.getTable().getKeysType()) {
-            case AGG_KEYS:
-            case UNIQUE_KEYS:
-            case DUP_KEYS:
-                break;
-            default:
-                throw new RuntimeException("Not supported keys type: " + 
scan.getTable().getKeysType());
-        }
 
         Map<Boolean, List<MaterializedIndex>> indexesGroupByIsBaseOrNot = 
table.getVisibleIndex()
                 .stream()
                 .collect(Collectors.groupingBy(index -> index.getId() == 
table.getBaseIndexId()));
-        if (table.isDupKeysOrMergeOnWrite()) {
-            // Duplicate-keys table could use base index and indexes that 
pre-aggregation status is on.
-            Set<MaterializedIndex> candidatesWithoutRewriting =
-                    indexesGroupByIsBaseOrNot.getOrDefault(false, 
ImmutableList.of())
-                            .stream()
-                            .filter(index -> checkPreAggStatus(scan, 
index.getId(), predicates,
-                                    aggregateFunctions, groupingExprs).isOn())
-                            .collect(Collectors.toSet());
-
-            // try to rewrite bitmap, hll by materialized index columns.
-            List<AggRewriteResult> candidatesWithRewriting = 
indexesGroupByIsBaseOrNot.getOrDefault(false,
-                            ImmutableList.of())
-                    .stream()
-                    .filter(index -> 
!candidatesWithoutRewriting.contains(index))
-                    .map(index -> rewriteAgg(index, scan, 
nonVirtualRequiredScanOutput, predicates,
-                            aggregateFunctions,
-                            groupingExprs))
-                    .filter(aggRewriteResult -> checkPreAggStatus(scan, 
aggRewriteResult.index.getId(),
-                            predicates,
-                            // check pre-agg status of aggregate function that 
couldn't rewrite.
-                            aggFuncsDiff(aggregateFunctions, aggRewriteResult),
-                            groupingExprs).isOn())
-                    .filter(result -> result.success)
-                    .collect(Collectors.toList());
-
-            List<MaterializedIndex> haveAllRequiredColumns = Streams.concat(
-                    candidatesWithoutRewriting.stream()
-                            .filter(index -> containAllRequiredColumns(index, 
scan, nonVirtualRequiredScanOutput,
-                                    requiredExpr, predicates)),
-                    candidatesWithRewriting.stream()
-                            .filter(aggRewriteResult -> 
containAllRequiredColumns(aggRewriteResult.index, scan,
-                                    aggRewriteResult.requiredScanOutput,
-                                    requiredExpr.stream().map(e -> 
aggRewriteResult.exprRewriteMap.replaceAgg(e))
-                                            .collect(Collectors.toSet()),
-                                    predicates))
-                            .map(aggRewriteResult -> aggRewriteResult.index))
-                    .collect(Collectors.toList());
-
-            long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, 
predicates);
-            Optional<AggRewriteResult> rewriteResultOpt = 
candidatesWithRewriting.stream()
-                    .filter(aggRewriteResult -> aggRewriteResult.index.getId() 
== selectIndexId)
-                    .findAny();
-            // Pre-aggregation is set to `on` by default for duplicate-keys 
table.
-            return new SelectResult(PreAggStatus.on(), selectIndexId,
-                    rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new 
ExprRewriteMap()));
-        } else {
-            if (scan.getPreAggStatus().isOff()) {
-                return new SelectResult(scan.getPreAggStatus(),
-                        scan.getTable().getBaseIndexId(), new 
ExprRewriteMap());
-            }
-
-            Set<MaterializedIndex> candidatesWithoutRewriting = new 
HashSet<>();
 
-            for (MaterializedIndex index : 
indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of())) {
-                final PreAggStatus preAggStatus;
-                if (preAggEnabledByHint(scan)) {
-                    preAggStatus = PreAggStatus.on();
-                } else {
-                    preAggStatus = checkPreAggStatus(scan, index.getId(), 
predicates,
-                        aggregateFunctions, groupingExprs);
-                }
+        Set<MaterializedIndex> candidatesWithoutRewriting = 
indexesGroupByIsBaseOrNot
+                .getOrDefault(false, ImmutableList.of()).stream()
+                .filter(index -> preAggEnabledByHint(scan)
+                        || checkPreAggStatus(scan, index.getId(), predicates, 
aggregateFunctions, groupingExprs).isOn())
+                .collect(Collectors.toSet());
+
+        // try to rewrite bitmap, hll by materialized index columns.
+        List<AggRewriteResult> candidatesWithRewriting = 
indexesGroupByIsBaseOrNot
+                .getOrDefault(false, ImmutableList.of()).stream()
+                .filter(index -> !candidatesWithoutRewriting.contains(index))
+                .map(index -> rewriteAgg(index, scan, 
nonVirtualRequiredScanOutput, predicates, aggregateFunctions,
+                        groupingExprs))
+                .filter(aggRewriteResult -> checkPreAggStatus(scan, 
aggRewriteResult.index.getId(), predicates,
+                        // check pre-agg status of aggregate function that 
couldn't rewrite.
+                        aggFuncsDiff(aggregateFunctions, aggRewriteResult), 
groupingExprs).isOn())
+                .filter(result -> result.success).collect(Collectors.toList());
+
+        List<MaterializedIndex> haveAllRequiredColumns = Streams.concat(
+                candidatesWithoutRewriting.stream()
+                        .filter(index -> containAllRequiredColumns(index, 
scan, nonVirtualRequiredScanOutput,
+                                requiredExpr, predicates)),
+                candidatesWithRewriting.stream()
+                        .filter(aggRewriteResult -> 
containAllRequiredColumns(aggRewriteResult.index, scan,
+                                aggRewriteResult.requiredScanOutput,
+                                requiredExpr.stream().map(e -> 
aggRewriteResult.exprRewriteMap.replaceAgg(e))
+                                        .collect(Collectors.toSet()),
+                                predicates))
+                        .map(aggRewriteResult -> aggRewriteResult.index))
+                .collect(Collectors.toList());
 
-                if (preAggStatus.isOn()) {
-                    candidatesWithoutRewriting.add(index);
-                }
-            }
-            SelectResult baseIndexSelectResult = new SelectResult(
-                    checkPreAggStatus(scan, scan.getTable().getBaseIndexId(),
-                        predicates, aggregateFunctions, groupingExprs),
-                    scan.getTable().getBaseIndexId(), new ExprRewriteMap());
-            if (candidatesWithoutRewriting.isEmpty()) {
-                // return early if pre agg status if off.
-                return baseIndexSelectResult;
-            } else {
-                List<MaterializedIndex> rollupsWithAllRequiredCols =
-                        Stream.concat(candidatesWithoutRewriting.stream(), 
indexesGroupByIsBaseOrNot.get(true).stream())
-                                .filter(index -> 
containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput,
-                                        requiredExpr, predicates))
-                                .collect(Collectors.toList());
-
-                long selectedIndex = 
selectBestIndex(rollupsWithAllRequiredCols, scan, predicates);
-                if (selectedIndex == scan.getTable().getBaseIndexId()) {
-                    return baseIndexSelectResult;
-                }
-                return new SelectResult(PreAggStatus.on(), selectedIndex, new 
ExprRewriteMap());
+        long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, 
predicates);
+        // Pre-aggregation is set to `on` by default for duplicate-keys table.
+        // In other cases where mv is not hit, preagg may turn off from on.
+        if (!table.isDupKeysOrMergeOnWrite() && (new CheckContext(scan, 
selectIndexId)).isBaseIndex()) {
+            PreAggStatus preagg = scan.getPreAggStatus();
+            if (preagg.isOn()) {
+                preagg = checkPreAggStatus(scan, 
scan.getTable().getBaseIndexId(), predicates, aggregateFunctions,
+                        groupingExprs);
             }
+            return new SelectResult(preagg, selectIndexId, new 
ExprRewriteMap());
         }
+
+        Optional<AggRewriteResult> rewriteResultOpt = 
candidatesWithRewriting.stream()
+                .filter(aggRewriteResult -> aggRewriteResult.index.getId() == 
selectIndexId).findAny();
+        return new SelectResult(PreAggStatus.on(), selectIndexId,
+                rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new 
ExprRewriteMap()));
     }
 
     private List<AggregateFunction> aggFuncsDiff(List<AggregateFunction> 
aggregateFunctions,
@@ -1191,6 +1141,13 @@ public class SelectMaterializedIndexWithAggregate 
extends AbstractSelectMaterial
         }
     }
 
+    private static Expression castIfNeed(Expression expr, DataType targetType) 
{
+        if (expr.getDataType().equals(targetType)) {
+            return expr;
+        }
+        return new Cast(expr, targetType);
+    }
+
     private static class AggFuncRewriter extends 
DefaultExpressionRewriter<RewriteContext> {
         public static final AggFuncRewriter INSTANCE = new AggFuncRewriter();
 
@@ -1212,7 +1169,7 @@ public class SelectMaterializedIndexWithAggregate extends 
AbstractSelectMaterial
                 // count(distinct col) -> 
bitmap_union_count(mv_bitmap_union_col)
                 Optional<Slot> slotOpt = 
ExpressionUtils.extractSlotOrCastOnSlot(count.child(0));
 
-                Expression expr = new ToBitmapWithCheck(new 
Cast(count.child(0), BigIntType.INSTANCE));
+                Expression expr = new 
ToBitmapWithCheck(castIfNeed(count.child(0), BigIntType.INSTANCE));
                 // count distinct a value column.
                 if (slotOpt.isPresent() && 
!context.checkContext.keyNameToColumn.containsKey(
                         normalizeName(expr.toSql()))) {
@@ -1425,7 +1382,7 @@ public class SelectMaterializedIndexWithAggregate extends 
AbstractSelectMaterial
             // ndv on a value column.
             if (slotOpt.isPresent() && 
!context.checkContext.keyNameToColumn.containsKey(
                     normalizeName(slotOpt.get().toSql()))) {
-                Expression expr = new Cast(ndv.child(), 
VarcharType.SYSTEM_DEFAULT);
+                Expression expr = castIfNeed(ndv.child(), 
VarcharType.SYSTEM_DEFAULT);
                 String hllUnionColumn = normalizeName(
                         
CreateMaterializedViewStmt.mvColumnBuilder(AggregateType.HLL_UNION,
                                 CreateMaterializedViewStmt.mvColumnBuilder(new 
HllHash(expr).toSql())));
@@ -1459,7 +1416,7 @@ public class SelectMaterializedIndexWithAggregate extends 
AbstractSelectMaterial
             Optional<Slot> slotOpt = 
ExpressionUtils.extractSlotOrCastOnSlot(sum.child(0));
             if (!sum.isDistinct() && slotOpt.isPresent()
                     && 
!context.checkContext.keyNameToColumn.containsKey(normalizeName(slotOpt.get().toSql())))
 {
-                Expression expr = new Cast(sum.child(), BigIntType.INSTANCE);
+                Expression expr = castIfNeed(sum.child(), BigIntType.INSTANCE);
                 String sumColumn = 
normalizeName(CreateMaterializedViewStmt.mvColumnBuilder(AggregateType.SUM,
                         
CreateMaterializedViewStmt.mvColumnBuilder(expr.toSql())));
                 Column mvColumn = context.checkContext.getColumn(sumColumn);
diff --git 
a/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
 b/regression-test/data/mv_p0/test_o2/test_o2.out
similarity index 60%
copy from 
regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
copy to regression-test/data/mv_p0/test_o2/test_o2.out
index 88913c5b65d..af029e6857d 100644
--- 
a/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
+++ b/regression-test/data/mv_p0/test_o2/test_o2.out
@@ -1,9 +1,4 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
--- !select_star --
-2020-01-01     1       a       1
-2020-01-01     1       a       2
-2020-01-02     2       b       2
-
 -- !select_mv --
-1      2
+2023-08-16T22:27       ax      asd     2
 
diff --git 
a/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
 
b/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
index 88913c5b65d..e5696983e12 100644
--- 
a/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
+++ 
b/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
@@ -7,3 +7,11 @@
 -- !select_mv --
 1      2
 
+-- !select_star --
+2020-01-01     1       a       1
+2020-01-01     1       a       2
+2020-01-02     2       b       2
+
+-- !select_mv --
+1      2
+
diff --git a/regression-test/suites/mv_p0/test_o2/test_o2.groovy 
b/regression-test/suites/mv_p0/test_o2/test_o2.groovy
new file mode 100644
index 00000000000..c9990253584
--- /dev/null
+++ b/regression-test/suites/mv_p0/test_o2/test_o2.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite ("test_o2") {
+    sql """set enable_nereids_planner=true"""
+    sql """SET enable_fallback_to_original_planner=false"""
+    sql """ DROP TABLE IF EXISTS o2_order_events; """
+
+    sql """
+        CREATE TABLE `o2_order_events` (
+        `ts` datetime NULL,
+        `metric_name` varchar(20) NULL,
+        `city_id` int(11) NULL,
+        `platform` varchar(20) NULL,
+        `vendor_id` int(11) NULL,
+        `pos_id` int(11) NULL,
+        `is_instant_restaurant` boolean NULL,
+        `country_id` int(11) NULL,
+        `logistics_partner_id` int(11) NULL,
+        `rpf_order` int(11) NULL,
+        `rejected_message_id` int(11) NULL,
+        `count_value` int(11) SUM NULL DEFAULT "0"
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`ts`, `metric_name`, `city_id`, `platform`, `vendor_id`, 
`pos_id`, `is_instant_restaurant`, `country_id`, `logistics_partner_id`, 
`rpf_order`, `rejected_message_id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`metric_name`, `platform`) BUCKETS 2
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+        """
+
+    sql """insert into o2_order_events values ("2023-08-16 22:27:00 
","ax",1,"asd",2,1,1,1,1,1,1,1);"""
+
+    createMV ("""
+            create materialized view o2_order_events_mv as select 
ts,metric_name,platform,sum(count_value) from o2_order_events group by 
ts,metric_name,platform;;""")
+
+    sql """insert into o2_order_events values ("2023-08-16 22:27:00 
","ax",1,"asd",2,1,1,1,1,1,1,1);"""
+
+    explain {
+        sql("select ts,metric_name,platform,sum(count_value) from 
o2_order_events group by ts,metric_name,platform;")
+        contains "(o2_order_events_mv)"
+    }
+    qt_select_mv "select ts,metric_name,platform,sum(count_value) from 
o2_order_events group by ts,metric_name,platform;"
+}
diff --git 
a/regression-test/suites/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.groovy
 
b/regression-test/suites/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.groovy
index 3957b26caba..0d046aa657d 100644
--- 
a/regression-test/suites/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.groovy
+++ 
b/regression-test/suites/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.groovy
@@ -47,4 +47,34 @@ suite ("testCountDistinctToBitmap") {
         contains "(user_tags_mv)"
     }
     qt_select_mv "select user_id, count(distinct tag_id) a from user_tags 
group by user_id having a>1 order by a;"
+
+
+    sql """ DROP TABLE IF EXISTS user_tags2; """
+
+    sql """ create table user_tags2 (
+                time_col date, 
+                user_id bigint, 
+                user_name varchar(20), 
+                tag_id bigint) 
+            partition by range (time_col) (partition p1 values less than 
MAXVALUE) distributed by hash(time_col) buckets 3 properties('replication_num' 
= '1');
+        """
+
+    sql """insert into user_tags2 values("2020-01-01",1,"a",1);"""
+    sql """insert into user_tags2 values("2020-01-02",2,"b",2);"""
+
+    createMV("create materialized view user_tags_mv as select user_id, 
bitmap_union(to_bitmap(tag_id)) from user_tags2 group by user_id;")
+
+    sql """insert into user_tags2 values("2020-01-01",1,"a",2);"""
+
+    explain {
+        sql("select * from user_tags2 order by time_col;")
+        contains "(user_tags2)"
+    }
+    qt_select_star "select * from user_tags2 order by time_col,tag_id;"
+
+    explain {
+        sql("select user_id, count(distinct tag_id) a from user_tags2 group by 
user_id having a>1 order by a;")
+        contains "(user_tags_mv)"
+    }
+    qt_select_mv "select user_id, count(distinct tag_id) a from user_tags2 
group by user_id having a>1 order by a;"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to