This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 06cd000ed3c [cherry-pick](agg)support push down min/max on unique table (#29242) (#29507) 06cd000ed3c is described below commit 06cd000ed3c246ab2ae9f156b788e481f17195fb Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Thu Jan 4 18:12:13 2024 +0800 [cherry-pick](agg)support push down min/max on unique table (#29242) (#29507) * [improve](agg)support push down min/max on unique table (#29242) * update --- .../org/apache/doris/nereids/rules/RuleType.java | 2 + .../rules/implementation/AggregateStrategies.java | 166 ++++++++++++++++++++ .../java/org/apache/doris/qe/SessionVariable.java | 15 ++ .../nereids_p0/explain/test_pushdown_explain.out | 117 ++++++++++++++ .../explain/test_pushdown_explain.groovy | 169 +++++++++++++++++++++ 5 files changed, 469 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index b7710982d41..3bdcf18fd44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -352,6 +352,8 @@ public enum RuleType { STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION), STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION), STORAGE_LAYER_AGGREGATE_WITH_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION), + STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE(RuleTypeClass.IMPLEMENTATION), + STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION), COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION), COUNT_ON_INDEX_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION), ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 89373cc95c3..b4fce67beb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -40,12 +40,15 @@ import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.IsNull; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; import org.apache.doris.nereids.trees.expressions.functions.agg.Count; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupConcat; +import org.apache.doris.nereids.trees.expressions.functions.agg.Max; +import org.apache.doris.nereids.trees.expressions.functions.agg.Min; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctSum; import org.apache.doris.nereids.trees.expressions.functions.agg.Sum; @@ -140,6 +143,72 @@ public class AggregateStrategies implements ImplementationRuleFactory { return pushdownCountOnIndex(agg, project, filter, olapScan, ctx.cascadesContext); }) ), + RuleType.STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE_WITHOUT_PROJECT.build( + logicalAggregate( + logicalFilter( + logicalOlapScan().when(this::isUniqueKeyTable)) + .when(filter -> { + if (filter.getConjuncts().size() != 1) { + return false; + } + Expression childExpr = filter.getConjuncts().iterator().next().children().get(0); + if (childExpr instanceof SlotReference) { + Optional<Column> column = ((SlotReference) childExpr).getColumn(); + return column.isPresent() ? column.get().isDeleteSignColumn() : false; + } + return false; + }) + ) + .when(agg -> enablePushDownMinMaxOnUnique()) + .when(agg -> agg.getGroupByExpressions().isEmpty()) + .when(agg -> { + Set<AggregateFunction> funcs = agg.getAggregateFunctions(); + return !funcs.isEmpty() && funcs.stream() + .allMatch(f -> (f instanceof Min) || (f instanceof Max)); + }) + .thenApply(ctx -> { + LogicalAggregate<LogicalFilter<LogicalOlapScan>> agg = ctx.root; + LogicalFilter<LogicalOlapScan> filter = agg.child(); + LogicalOlapScan olapScan = filter.child(); + return pushdownMinMaxOnUniqueTable(agg, null, filter, olapScan, + ctx.cascadesContext); + }) + ), + RuleType.STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE.build( + logicalAggregate( + logicalProject( + logicalFilter( + logicalOlapScan().when(this::isUniqueKeyTable)) + .when(filter -> { + if (filter.getConjuncts().size() != 1) { + return false; + } + Expression childExpr = filter.getConjuncts().iterator().next() + .children().get(0); + if (childExpr instanceof SlotReference) { + Optional<Column> column = ((SlotReference) childExpr).getColumn(); + return column.isPresent() ? column.get().isDeleteSignColumn() + : false; + } + return false; + })) + ) + .when(agg -> enablePushDownMinMaxOnUnique()) + .when(agg -> agg.getGroupByExpressions().isEmpty()) + .when(agg -> { + Set<AggregateFunction> funcs = agg.getAggregateFunctions(); + return !funcs.isEmpty() + && funcs.stream().allMatch(f -> (f instanceof Min) || (f instanceof Max)); + }) + .thenApply(ctx -> { + LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root; + LogicalProject<LogicalFilter<LogicalOlapScan>> project = agg.child(); + LogicalFilter<LogicalOlapScan> filter = project.child(); + LogicalOlapScan olapScan = filter.child(); + return pushdownMinMaxOnUniqueTable(agg, project, filter, olapScan, + ctx.cascadesContext); + }) + ), RuleType.STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT.build( logicalAggregate( logicalOlapScan() @@ -238,6 +307,19 @@ public class AggregateStrategies implements ImplementationRuleFactory { ); } + private boolean enablePushDownMinMaxOnUnique() { + ConnectContext connectContext = ConnectContext.get(); + return connectContext != null && connectContext.getSessionVariable().isEnablePushDownMinMaxOnUnique(); + } + + private boolean isUniqueKeyTable(LogicalOlapScan logicalScan) { + if (logicalScan != null) { + KeysType keysType = logicalScan.getTable().getKeysType(); + return keysType == KeysType.UNIQUE_KEYS; + } + return false; + } + private boolean enablePushDownCountOnIndex() { ConnectContext connectContext = ConnectContext.get(); return connectContext != null && connectContext.getSessionVariable().isEnablePushDownCountOnIndex(); @@ -314,6 +396,90 @@ public class AggregateStrategies implements ImplementationRuleFactory { } } + //select /*+SET_VAR(enable_pushdown_minmax_on_unique=true) */min(user_id) from table_unique; + //push pushAggOp=MINMAX to scan node + private LogicalAggregate<? extends Plan> pushdownMinMaxOnUniqueTable( + LogicalAggregate<? extends Plan> aggregate, + @Nullable LogicalProject<? extends Plan> project, + LogicalFilter<? extends Plan> filter, + LogicalOlapScan olapScan, + CascadesContext cascadesContext) { + final LogicalAggregate<? extends Plan> canNotPush = aggregate; + Set<AggregateFunction> aggregateFunctions = aggregate.getAggregateFunctions(); + if (checkWhetherPushDownMinMax(aggregateFunctions, project, olapScan.getOutput())) { + PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan() + .build() + .transform(olapScan, cascadesContext) + .get(0); + if (project != null) { + return aggregate.withChildren(ImmutableList.of( + project.withChildren(ImmutableList.of( + filter.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate( + physicalOlapScan, + PushDownAggOp.MIN_MAX))))))); + } else { + return aggregate.withChildren(ImmutableList.of( + filter.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate( + physicalOlapScan, + PushDownAggOp.MIN_MAX))))); + } + } else { + return canNotPush; + } + } + + private boolean checkWhetherPushDownMinMax(Set<AggregateFunction> aggregateFunctions, + @Nullable LogicalProject<? extends Plan> project, List<Slot> outPutSlots) { + boolean onlyContainsSlotOrNumericCastSlot = aggregateFunctions.stream() + .map(ExpressionTrait::getArguments) + .flatMap(List::stream) + .allMatch(argument -> { + if (argument instanceof SlotReference) { + return true; + } + return false; + }); + if (!onlyContainsSlotOrNumericCastSlot) { + return false; + } + List<Expression> argumentsOfAggregateFunction = aggregateFunctions.stream() + .flatMap(aggregateFunction -> aggregateFunction.getArguments().stream()) + .collect(ImmutableList.toImmutableList()); + + if (project != null) { + argumentsOfAggregateFunction = Project.findProject( + argumentsOfAggregateFunction, project.getProjects()) + .stream() + .map(p -> p instanceof Alias ? p.child(0) : p) + .collect(ImmutableList.toImmutableList()); + } + onlyContainsSlotOrNumericCastSlot = argumentsOfAggregateFunction + .stream() + .allMatch(argument -> { + if (argument instanceof SlotReference) { + return true; + } + return false; + }); + if (!onlyContainsSlotOrNumericCastSlot) { + return false; + } + Set<SlotReference> aggUsedSlots = ExpressionUtils.collect(argumentsOfAggregateFunction, + SlotReference.class::isInstance); + List<SlotReference> usedSlotInTable = (List<SlotReference>) Project.findProject(aggUsedSlots, + outPutSlots); + for (SlotReference slot : usedSlotInTable) { + Column column = slot.getColumn().get(); + PrimitiveType colType = column.getType().getPrimitiveType(); + if (colType.isComplexType() || colType.isHllType() || colType.isBitmapType()) { + return false; + } + } + return true; + } + /** * sql: select count(*) from tbl * <p> diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5383f04eb7b..4dcb05d1bb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -436,6 +436,8 @@ public class SessionVariable implements Serializable, Writable { public static final String TABLE_STATS_HEALTH_THRESHOLD = "table_stats_health_threshold"; + public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique"; + public static final List<String> DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -1062,6 +1064,11 @@ public class SessionVariable implements Serializable, Writable { "是否启用count_on_index pushdown。", "Set whether to pushdown count_on_index."}) public boolean enablePushDownCountOnIndex = true; + // Whether enable pushdown minmax to scan node of unique table. + @VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE, needForward = true, description = { + "是否启用pushdown minmax on unique table。", "Set whether to pushdown minmax on unique table."}) + public boolean enablePushDownMinMaxOnUnique = false; + // Whether drop table when create table as select insert data appear error. @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; @@ -2184,6 +2191,14 @@ public class SessionVariable implements Serializable, Writable { this.disableJoinReorder = disableJoinReorder; } + public boolean isEnablePushDownMinMaxOnUnique() { + return enablePushDownMinMaxOnUnique; + } + + public void setEnablePushDownMinMaxOnUnique(boolean enablePushDownMinMaxOnUnique) { + this.enablePushDownMinMaxOnUnique = enablePushDownMinMaxOnUnique; + } + /** * Nereids only support vectorized engine. * diff --git a/regression-test/data/nereids_p0/explain/test_pushdown_explain.out b/regression-test/data/nereids_p0/explain/test_pushdown_explain.out index 72d126351a3..2f364bf9bec 100644 --- a/regression-test/data/nereids_p0/explain/test_pushdown_explain.out +++ b/regression-test/data/nereids_p0/explain/test_pushdown_explain.out @@ -2,3 +2,120 @@ -- !select -- 1 +-- !select_table_unique0 -- +1 c + +-- !select_table_unique0_min -- +a + +-- !select_table_unique1 -- +1 c +2 e + +-- !select_table_unique1_max -- +g + +-- !select_table_unique2 -- +1 c +2 e + +-- !select_table_unique2_max -- +h + +-- !select_table_unique3 -- +1 c +2 e +4 k + +-- !select_table_unique3_max -- +l + +-- !select_0 -- +1 asd cc +2 qwe vvx +3 ffsd mnm +4 qdf ll +5 cvfv vff + +-- !select_1 -- +1 + +-- !select_2 -- +5 + +-- !select_3 -- +asd + +-- !select_4 -- +qwe + +-- !select_5 -- +cc + +-- !select_6 -- +vvx + +-- !select_00 -- +1 asd zzz +2 qwe vvx +3 ffsd mnm +4 qdf ll +5 cvfv vff + +-- !select_7 -- +1 + +-- !select_8 -- +5 + +-- !select_9 -- +asd + +-- !select_10 -- +qwe + +-- !select_11 -- +cc + +-- !select_12 -- +zzz + +-- !select_000 -- +1 asd zzz +3 ffsd mnm +4 qdf ll +5 cvfv vff + +-- !select_13 -- +1 + +-- !select_14 -- +5 + +-- !select_15 -- +asd + +-- !select_16 -- +qdf + +-- !select_18 -- +zzz + +-- !select_19 -- +1 + +-- !select_20 -- +5 + +-- !select_21 -- +asd + +-- !select_22 -- +qwe + +-- !select_23 -- +cc + +-- !select_24 -- +vvx + diff --git a/regression-test/suites/nereids_p0/explain/test_pushdown_explain.groovy b/regression-test/suites/nereids_p0/explain/test_pushdown_explain.groovy index 091ae98bbb3..ec8d9b5d8ae 100644 --- a/regression-test/suites/nereids_p0/explain/test_pushdown_explain.groovy +++ b/regression-test/suites/nereids_p0/explain/test_pushdown_explain.groovy @@ -58,4 +58,173 @@ suite("test_pushdown_explain") { sql("select count(cast(lo_orderkey as bigint)) from test_lineorder;") contains "pushAggOp=COUNT" } + + sql "DROP TABLE IF EXISTS table_unique0" + sql """ + CREATE TABLE `table_unique0` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `username` VARCHAR(50) NOT NULL COMMENT '\"用户昵称\"' + ) ENGINE=OLAP + UNIQUE KEY(`user_id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + // set seession variables + sql "set enable_pushdown_minmax_on_unique = true;" + + sql """ insert into table_unique0 values(1,"a"); """ + sql """ insert into table_unique0 values(1,"b"); """ + sql """ insert into table_unique0 values(1,"c"); """ + qt_select_table_unique0 "select * from table_unique0 order by user_id;" // 1, c + qt_select_table_unique0_min "select min(username) from table_unique0;" // a is read from zone map + + sql """ insert into table_unique0 values(2,"g"); """ + sql """ insert into table_unique0 values(2,"f"); """ + sql """ insert into table_unique0 values(2,"e"); """ + qt_select_table_unique1 "select * from table_unique0 order by user_id;" // 2, e + qt_select_table_unique1_max "select max(username) from table_unique0;" // g is read from zone map + + sql """ insert into table_unique0 values(3,"h"); """ + sql """ insert into table_unique0(user_id,username,__DORIS_DELETE_SIGN__) values(3,'h',1); """ // delete id = 3 + qt_select_table_unique2 "select * from table_unique0 order by user_id;" // no user_id = 3 + qt_select_table_unique2_max "select max(username) from table_unique0;" // h is read from zone map + + sql """ insert into table_unique0 values(4,"l"); """ + sql """ update table_unique0 set username = "k" where user_id = 4; """ + qt_select_table_unique3 "select * from table_unique0 order by user_id;" // 4 ,k + qt_select_table_unique3_max "select max(username) from table_unique0;" // l is read from zone map + + sql "DROP TABLE IF EXISTS table_unique" + sql """ + CREATE TABLE `table_unique` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `username` VARCHAR(50) NOT NULL COMMENT '\"用户昵称\"', + `val` VARCHAR(50) NULL + ) ENGINE=OLAP + UNIQUE KEY(`user_id`, `username`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + sql """ + insert into table_unique values(1,"asd","cc"),(2,"qwe","vvx"),(3,"ffsd","mnm"),(4,"qdf","ll"),(5,"cvfv","vff"); + """ + + sql "set enable_pushdown_minmax_on_unique = false;" + explain { + sql("select min(user_id) from table_unique;") + contains "pushAggOp=NONE" + } + explain { + sql("select max(user_id) from table_unique;") + contains "pushAggOp=NONE" + } + explain { + sql("select min(username) from table_unique;") + contains "pushAggOp=NONE" + } + explain { + sql("select max(username) from table_unique;") + contains "pushAggOp=NONE" + } + + + // set seession variables + sql "set enable_pushdown_minmax_on_unique = true;" + explain { + sql("select min(user_id) from table_unique;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select max(user_id) from table_unique;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select min(username) from table_unique;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select max(username) from table_unique;") + contains "pushAggOp=MINMAX" + } + qt_select_0 "select * from table_unique order by user_id;" + qt_select_1 "select min(user_id) from table_unique;" + qt_select_2 "select max(user_id) from table_unique;" + qt_select_3 "select min(username) from table_unique;" + qt_select_4 "select max(username) from table_unique;" + qt_select_5 "select min(val) from table_unique;" + qt_select_6 "select max(val) from table_unique;" + sql """ + update table_unique set val = "zzz" where user_id = 1; + """ + qt_select_00 "select * from table_unique order by user_id;" + qt_select_7 "select min(user_id) from table_unique;" + qt_select_8 "select max(user_id) from table_unique;" + qt_select_9 "select min(username) from table_unique;" + qt_select_10 "select max(username) from table_unique;" + qt_select_11 "select min(val) from table_unique;" + qt_select_12 "select max(val) from table_unique;" + + sql """ + delete from table_unique where user_id = 2; + """ + qt_select_000 "select * from table_unique order by user_id;" + qt_select_13 "select min(user_id) from table_unique;" + qt_select_14 "select max(user_id) from table_unique;" + qt_select_15 "select min(username) from table_unique;" + qt_select_16 "select max(username) from table_unique;" + qt_select_18 "select max(val) from table_unique;" + + + sql "DROP TABLE IF EXISTS table_agg" + sql """ + CREATE TABLE `table_agg` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `username` VARCHAR(50) NOT NULL COMMENT '\"用户昵称\"', + `val` VARCHAR(50) max NULL + ) ENGINE=OLAP + AGGREGATE KEY(`user_id`, `username`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + sql """ + insert into table_agg values(1,"asd","cc"),(2,"qwe","vvx"),(3,"ffsd","mnm"),(4,"qdf","ll"),(5,"cvfv","vff"); + """ + + explain { + sql("select min(user_id) from table_agg;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select max(user_id) from table_agg;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select min(username) from table_agg;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select max(username) from table_agg;") + contains "pushAggOp=MINMAX" + } + + qt_select_19 "select min(user_id) from table_agg;" + qt_select_20 "select max(user_id) from table_agg;" + qt_select_21 "select min(username) from table_agg;" + qt_select_22 "select max(username) from table_agg;" + qt_select_23 "select min(val) from table_agg;" + qt_select_24 "select max(val) from table_agg;" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org