This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 704faaed84 [feature](Nereids) add rule split limit into two phase (#16797) 704faaed84 is described below commit 704faaed84464e6c2f3e7fe2b07602ff3ddd283f Author: 谢健 <jianx...@gmail.com> AuthorDate: Tue Mar 7 15:34:12 2023 +0800 [feature](Nereids) add rule split limit into two phase (#16797) 1. Add a rule split limit, like Limit(Origin) ==> Limit(Global) -> Gather -> Limit(Local) 2. Add a rule: limit-> sort ==> topN 3. fix a bug about topN 4. make the type of limit,offset long in topN And because this rule is always beneficial, we add a rule in the rewrite phase --- .../glue/translator/PhysicalPlanTranslator.java | 62 ++++++++----------- .../doris/nereids/jobs/batch/NereidsRewriter.java | 2 + .../doris/nereids/parser/LogicalPlanBuilder.java | 3 +- .../properties/ChildOutputPropertyDeriver.java | 12 +--- .../nereids/properties/RequestPropertyDeriver.java | 15 ++++- .../org/apache/doris/nereids/rules/RuleType.java | 4 +- .../LogicalLimitToPhysicalLimit.java | 1 + .../implementation/LogicalTopNToPhysicalTopN.java | 12 +++- .../rules/rewrite/logical/ExistsApplyToJoin.java | 5 +- .../nereids/rules/rewrite/logical/MergeLimits.java | 19 +++--- .../rules/rewrite/logical/PushdownLimit.java | 12 ++++ .../logical/PushdownProjectThroughLimit.java | 4 +- .../logical/{MergeLimits.java => SplitLimit.java} | 42 ++++++------- .../plans/{algebra/TopN.java => LimitPhase.java} | 20 +++++-- .../doris/nereids/trees/plans/algebra/TopN.java | 4 +- .../nereids/trees/plans/logical/LogicalLimit.java | 26 +++++--- .../nereids/trees/plans/logical/LogicalTopN.java | 12 ++-- .../trees/plans/physical/PhysicalLimit.java | 33 +++++++---- .../nereids/trees/plans/physical/PhysicalTopN.java | 14 ++--- .../nereids/trees/plans/visitor/PlanVisitor.java | 2 +- .../org/apache/doris/nereids/memo/MemoTest.java | 69 +++++++++++----------- .../properties/ChildOutputPropertyDeriverTest.java | 15 ++++- .../rules/implementation/ImplementationTest.java | 3 +- .../rules/rewrite/logical/EliminateLimitTest.java | 21 ++++++- .../rules/rewrite/logical/MergeLimitsTest.java | 7 ++- .../rules/rewrite/logical/PushdownLimitTest.java | 20 +++++-- ...EliminateLimitTest.java => SplitLimitTest.java} | 30 ++++------ .../doris/nereids/stats/StatsCalculatorTest.java | 6 +- .../nereids/trees/plans/PlanToStringTest.java | 2 +- .../doris/nereids/util/LogicalPlanBuilder.java | 3 +- 30 files changed, 283 insertions(+), 197 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index b28db7c0f8..500f0360a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -808,9 +808,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanFragment inputFragment = topN.child(0).accept(this, context); PlanFragment currentFragment = inputFragment; - //1. generate new fragment for sort when the child is exchangeNode - if (inputFragment.getPlanRoot() instanceof ExchangeNode) { - Preconditions.checkArgument(!topN.getSortPhase().isLocal()); + //1. Generate new fragment for sort when the child is exchangeNode, If the child is + // mergingExchange, it means we have always generated a new fragment when processing mergeSort + if (inputFragment.getPlanRoot() instanceof ExchangeNode + && !((ExchangeNode) inputFragment.getPlanRoot()).isMergingExchange()) { + // Except LocalTopN->MergeTopN, we don't allow localTopN's child is Exchange Node + Preconditions.checkArgument(!topN.getSortPhase().isLocal(), + "local sort requires any property but child is" + inputFragment.getPlanRoot()); DataPartition outputPartition = DataPartition.UNPARTITIONED; ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot(); inputFragment.setOutputPartition(outputPartition); @@ -822,20 +826,28 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla // 2. According to the type of sort, generate physical plan if (!topN.getSortPhase().isMerge()) { - // For localSort or Gather->Sort, we just need to add sortNode + // For localSort or Gather->Sort, we just need to add TopNNode SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context); + sortNode.setOffset(topN.getOffset()); + sortNode.setLimit(topN.getLimit()); currentFragment.addPlanRoot(sortNode); } else { // For mergeSort, we need to push sortInfo to exchangeNode if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) { // if there is no exchange node for mergeSort - // e.g., localSort -> mergeSort + // e.g., mergeTopN -> localTopN // It means the local has satisfied the Gather property. We can just ignore mergeSort + currentFragment.getPlanRoot().setOffset(topN.getOffset()); + currentFragment.getPlanRoot().setLimit(topN.getLimit()); return currentFragment; } - Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof SortNode); + Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof SortNode, + "mergeSort' child must be sortNode"); SortNode sortNode = (SortNode) inputFragment.getPlanRoot(); - ((ExchangeNode) currentFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo()); + ExchangeNode exchangeNode = (ExchangeNode) currentFragment.getPlanRoot(); + exchangeNode.setMergeInfo(sortNode.getSortInfo()); + exchangeNode.setLimit(topN.getLimit()); + exchangeNode.setOffset(topN.getOffset()); } return currentFragment; } @@ -1388,38 +1400,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla if (inputFragment == null) { return inputFragment; } - + // For case globalLimit(l, o) -> LocalLimit(l+o, 0), that is the LocalLimit has already gathered + // The globalLimit can overwrite the limit and offset, so it's still correct PlanNode child = inputFragment.getPlanRoot(); - - // physical plan: limit --> sort - // after translate, it could be: - // 1. limit->sort => set (limit and offset) on sort - // 2. limit->exchange->sort => set (limit and offset) on exchange, set sort.limit = limit+offset - if (child instanceof SortNode) { - SortNode sort = (SortNode) child; - sort.setLimit(physicalLimit.getLimit()); - sort.setOffset(physicalLimit.getOffset()); - return inputFragment; - } - if (child instanceof ExchangeNode) { - ExchangeNode exchangeNode = (ExchangeNode) child; - exchangeNode.setLimit(physicalLimit.getLimit()); - // we do not check if this is a merging exchange here, - // since this guaranteed by translating logic plan to physical plan - exchangeNode.setOffset(physicalLimit.getOffset()); - if (exchangeNode.getChild(0) instanceof SortNode) { - SortNode sort = (SortNode) exchangeNode.getChild(0); - sort.setLimit(physicalLimit.getLimit() + physicalLimit.getOffset()); - sort.setOffset(0); - } - return inputFragment; - } - // for other PlanNode, just set limit as limit+offset - child.setLimit(physicalLimit.getLimit() + physicalLimit.getOffset()); - PlanFragment planFragment = exchangeToMergeFragment(inputFragment, context); - planFragment.getPlanRoot().setLimit(physicalLimit.getLimit()); - planFragment.getPlanRoot().setOffSetDirectly(physicalLimit.getOffset()); - return planFragment; + child.setLimit(physicalLimit.getLimit()); + child.setOffset(physicalLimit.getOffset()); + return inputFragment; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java index cfe29964f1..1d51420a7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java @@ -61,6 +61,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet; import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin; import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit; import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin; +import org.apache.doris.nereids.rules.rewrite.logical.SplitLimit; import java.util.List; @@ -192,6 +193,7 @@ public class NereidsRewriter extends BatchRewriteJob { new EliminateAggregate(), new MergeSetOperations(), new PushdownLimit(), + new SplitLimit(), new BuildAggForUnion() )), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 185e74368b..3947e8139b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -194,6 +194,7 @@ import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; import org.apache.doris.nereids.trees.plans.JoinHint; import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; @@ -1346,7 +1347,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { if (offsetToken != null) { offset = Long.parseLong(offsetToken.getText()); } - return new LogicalLimit<>(limit, offset, input); + return new LogicalLimit<>(limit, offset, LimitPhase.ORIGIN, input); }); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 4de8c769fe..4623ba311e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan; @@ -39,10 +40,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; -import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; -import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.JoinUtils; @@ -105,13 +104,8 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, } @Override - public PhysicalProperties visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanContext context) { - Preconditions.checkState(childrenOutputProperties.size() == 1); - return new PhysicalProperties(DistributionSpecGather.INSTANCE, new OrderSpec(topN.getOrderKeys())); - } - - @Override - public PhysicalProperties visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, PlanContext context) { + public PhysicalProperties visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, + PlanContext context) { Preconditions.checkState(childrenOutputProperties.size() == 1); if (sort.getSortPhase().isLocal()) { return new PhysicalProperties( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 77d17f2701..2ca4fdc169 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -29,11 +29,12 @@ import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.plans.JoinHint; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; -import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.JoinUtils; @@ -94,7 +95,7 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> { } @Override - public Void visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, PlanContext context) { + public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, PlanContext context) { if (!sort.getSortPhase().isLocal()) { addRequestPropertyToChildren(PhysicalProperties.GATHER); } else { @@ -103,6 +104,16 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> { return null; } + @Override + public Void visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, PlanContext context) { + if (limit.isGlobal()) { + addRequestPropertyToChildren(PhysicalProperties.GATHER); + } else { + addRequestPropertyToChildren(PhysicalProperties.ANY); + } + return null; + } + @Override public Void visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, PlanContext context) { JoinHint hint = hashJoin.getHint(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index cf55dedddd..13c47df6a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -188,13 +188,15 @@ public enum RuleType { INNER_TO_CROSS_JOIN(RuleTypeClass.REWRITE), REWRITE_SENTINEL(RuleTypeClass.REWRITE), + // split limit + SPLIT_LIMIT(RuleTypeClass.REWRITE), // limit push down PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE), PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE), PUSH_LIMIT_THROUGH_UNION(RuleTypeClass.REWRITE), PUSH_LIMIT_THROUGH_ONE_ROW_RELATION(RuleTypeClass.REWRITE), PUSH_LIMIT_THROUGH_EMPTY_RELATION(RuleTypeClass.REWRITE), - + PUSH_LIMIT_INTO_SORT(RuleTypeClass.REWRITE), // adjust nullable ADJUST_NULLABLE_ON_AGGREGATE(RuleTypeClass.REWRITE), ADJUST_NULLABLE_ON_ASSERT_NUM_ROWS(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java index 836d0b0344..5742cee12e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java @@ -30,6 +30,7 @@ public class LogicalLimitToPhysicalLimit extends OneImplementationRuleFactory { return logicalLimit().then(limit -> new PhysicalLimit<>( limit.getLimit(), limit.getOffset(), + limit.getPhase(), limit.getLogicalProperties(), limit.child()) ).toRule(RuleType.LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java index ac90ff8acc..bf675fe264 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java @@ -38,10 +38,16 @@ public class LogicalTopNToPhysicalTopN extends OneImplementationRuleFactory { .toRule(RuleType.LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE); } + /** + * before: logicalTopN(off, limit) + * after: + * gatherTopN(limit, off, require gather) + * mergeTopN(limit, off, require gather) -> localTopN(off+limit, 0, require any) + */ private List<PhysicalTopN<? extends Plan>> twoPhaseSort(LogicalTopN logicalTopN) { - PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(), logicalTopN.getLimit(), - logicalTopN.getOffset(), SortPhase.LOCAL_SORT, logicalTopN.getLogicalProperties(), logicalTopN.child(0) - ); + PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(), + logicalTopN.getLimit() + logicalTopN.getOffset(), 0, SortPhase.LOCAL_SORT, + logicalTopN.getLogicalProperties(), logicalTopN.child(0)); PhysicalTopN twoPhaseSort = new PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(), logicalTopN.getOffset(), SortPhase.MERGE_SORT, logicalTopN.getLogicalProperties(), localSort); PhysicalTopN onePhaseSort = new PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java index ca776b83b7..a7447f1b35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.Count; import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; import org.apache.doris.nereids.trees.plans.JoinHint; import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalApply; @@ -117,7 +118,7 @@ public class ExistsApplyToJoin extends OneRewriteRuleFactory { } private Plan unCorrelatedNotExist(LogicalApply unapply) { - LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan) unapply.right()); + LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, (LogicalPlan) unapply.right()); Alias alias = new Alias(new Count(), "count(*)"); LogicalAggregate newAgg = new LogicalAggregate<>(new ArrayList<>(), ImmutableList.of(alias), newLimit); @@ -128,7 +129,7 @@ public class ExistsApplyToJoin extends OneRewriteRuleFactory { } private Plan unCorrelatedExist(LogicalApply unapply) { - LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan) unapply.right()); + LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, (LogicalPlan) unapply.right()); return new LogicalJoin<>(JoinType.CROSS_JOIN, (LogicalPlan) unapply.left(), newLimit); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java index 4b3ec22fb0..1d6b0ab48f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java @@ -42,13 +42,16 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; public class MergeLimits extends OneRewriteRuleFactory { @Override public Rule build() { - return logicalLimit(logicalLimit()).then(upperLimit -> { - LogicalLimit<? extends Plan> bottomLimit = upperLimit.child(); - return new LogicalLimit<>( - Math.min(upperLimit.getLimit(), Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)), - bottomLimit.getOffset() + upperLimit.getOffset(), - bottomLimit.child() - ); - }).toRule(RuleType.MERGE_LIMITS); + return logicalLimit(logicalLimit()) + .when(upperLimit -> upperLimit.getPhase().equals(upperLimit.child().getPhase())) + .then(upperLimit -> { + LogicalLimit<? extends Plan> bottomLimit = upperLimit.child(); + return new LogicalLimit<>( + Math.min(upperLimit.getLimit(), + Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)), + bottomLimit.getOffset() + upperLimit.getOffset(), + bottomLimit.getPhase(), bottomLimit.child() + ); + }).toRule(RuleType.MERGE_LIMITS); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java index 1b2b1a4426..b5b4614410 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java @@ -28,6 +28,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalSort; +import org.apache.doris.nereids.trees.plans.logical.LogicalTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import com.google.common.collect.ImmutableList; @@ -82,6 +84,16 @@ public class PushdownLimit implements RewriteRuleFactory { return limit.withChildren(union.withChildren(newUnionChildren)); }) .toRule(RuleType.PUSH_LIMIT_THROUGH_UNION), + // limit -> sort ==> topN + logicalLimit(logicalSort()) + .then(limit -> { + LogicalSort sort = limit.child(); + LogicalTopN topN = new LogicalTopN(sort.getOrderKeys(), + limit.getLimit(), + limit.getOffset(), + sort.child(0)); + return topN; + }).toRule(RuleType.PUSH_LIMIT_INTO_SORT), logicalLimit(logicalOneRowRelation()) .then(limit -> limit.getLimit() > 0 ? limit.child() : new LogicalEmptyRelation(limit.child().getOutput())) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java index b9f0f70d2a..c1705250a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java @@ -51,8 +51,8 @@ public class PushdownProjectThroughLimit extends OneRewriteRuleFactory { return logicalProject(logicalLimit(any())).thenApply(ctx -> { LogicalProject<LogicalLimit<Plan>> logicalProject = ctx.root; LogicalLimit<Plan> logicalLimit = logicalProject.child(); - return new LogicalLimit<>(logicalLimit.getLimit(), - logicalLimit.getOffset(), new LogicalProject<>(logicalProject.getProjects(), + return new LogicalLimit<>(logicalLimit.getLimit(), logicalLimit.getOffset(), + logicalLimit.getPhase(), new LogicalProject<>(logicalProject.getProjects(), logicalLimit.child())); }).toRule(RuleType.PUSHDOWN_PROJECT_THROUGH_LIMIT); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java similarity index 55% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java index 4b3ec22fb0..abd7b0a49c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java @@ -20,35 +20,29 @@ package org.apache.doris.nereids.rules.rewrite.logical; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; -import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; /** - * This rule aims to merge consecutive limits. - * <pre> - * input plan: - * LIMIT1(limit=10, offset=0) - * | - * LIMIT2(limit=3, offset=5) - * - * output plan: - * LIMIT(limit=3, offset=5) - * - * merged limit = min(LIMIT1.limit, LIMIT2.limit) - * merged offset = LIMIT2.offset - * </pre> - * Note that the top limit should not have valid offset info. + * Split limit into two phase + * before: + * Limit(origin) limit, offset + * after: + * Limit(global) limit, offset + * | + * Limit(local) limit + offset, 0 */ -public class MergeLimits extends OneRewriteRuleFactory { +public class SplitLimit extends OneRewriteRuleFactory { @Override public Rule build() { - return logicalLimit(logicalLimit()).then(upperLimit -> { - LogicalLimit<? extends Plan> bottomLimit = upperLimit.child(); - return new LogicalLimit<>( - Math.min(upperLimit.getLimit(), Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)), - bottomLimit.getOffset() + upperLimit.getOffset(), - bottomLimit.child() - ); - }).toRule(RuleType.MERGE_LIMITS); + return logicalLimit().when(limit -> !limit.isSplit()) + .then(limit -> { + long l = limit.getLimit(); + long o = limit.getOffset(); + return new LogicalLimit<>(l, o, + LimitPhase.GLOBAL, new LogicalLimit<>(l + o, 0, LimitPhase.LOCAL, limit.child()) + ); + }).toRule(RuleType.SPLIT_LIMIT); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java similarity index 63% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java index d79fe003ed..705c712ef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java @@ -15,14 +15,24 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.trees.plans.algebra; +package org.apache.doris.nereids.trees.plans; /** - * Common interface for logical/physical TopN. + * Limit phase for logical and physical limit, like + * LocalLimit -> Gather -> GlobalLimit + * Origin is used to mark the limit operator that has not been split into 2-phase */ -public interface TopN extends Sort { +public enum LimitPhase { + LOCAL("LOCAL"), + GLOBAL("GLOBAL"), + ORIGIN("ORIGIN"); + private final String name; - int getOffset(); + LimitPhase(String name) { + this.name = name; + } - int getLimit(); + public boolean isLocal() { + return this == LOCAL; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java index d79fe003ed..c214dffbbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java @@ -22,7 +22,7 @@ package org.apache.doris.nereids.trees.plans.algebra; */ public interface TopN extends Sort { - int getOffset(); + long getOffset(); - int getLimit(); + long getLimit(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java index 75ff5f1ea9..d632b959e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java @@ -21,6 +21,7 @@ import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Limit; @@ -44,19 +45,28 @@ import java.util.Optional; * offset 100 */ public class LogicalLimit<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements Limit { - + private final LimitPhase phase; private final long limit; private final long offset; - public LogicalLimit(long limit, long offset, CHILD_TYPE child) { - this(limit, offset, Optional.empty(), Optional.empty(), child); + public LogicalLimit(long limit, long offset, LimitPhase phase, CHILD_TYPE child) { + this(limit, offset, phase, Optional.empty(), Optional.empty(), child); } - public LogicalLimit(long limit, long offset, Optional<GroupExpression> groupExpression, + public LogicalLimit(long limit, long offset, LimitPhase phase, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_LIMIT, groupExpression, logicalProperties, child); this.limit = limit; this.offset = offset; + this.phase = phase; + } + + public LimitPhase getPhase() { + return phase; + } + + public boolean isSplit() { + return phase != LimitPhase.ORIGIN; } public long getLimit() { @@ -94,7 +104,7 @@ public class LogicalLimit<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TY return false; } LogicalLimit that = (LogicalLimit) o; - return limit == that.limit && offset == that.offset; + return limit == that.limit && offset == that.offset && phase == that.phase; } @Override @@ -108,17 +118,17 @@ public class LogicalLimit<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TY @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalLimit<>(limit, offset, groupExpression, Optional.of(getLogicalProperties()), child()); + return new LogicalLimit<>(limit, offset, phase, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withLogicalProperties(Optional<LogicalProperties> logicalProperties) { - return new LogicalLimit<>(limit, offset, Optional.empty(), logicalProperties, child()); + return new LogicalLimit<>(limit, offset, phase, Optional.empty(), logicalProperties, child()); } @Override public LogicalLimit<Plan> withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalLimit<>(limit, offset, children.get(0)); + return new LogicalLimit<>(limit, offset, phase, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java index da78e27cef..cb07601ffa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java @@ -41,17 +41,17 @@ import java.util.Optional; public class LogicalTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements TopN { private final List<OrderKey> orderKeys; - private final int limit; - private final int offset; + private final long limit; + private final long offset; - public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset, CHILD_TYPE child) { + public LogicalTopN(List<OrderKey> orderKeys, long limit, long offset, CHILD_TYPE child) { this(orderKeys, limit, offset, Optional.empty(), Optional.empty(), child); } /** * Constructor for LogicalSort. */ - public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset, Optional<GroupExpression> groupExpression, + public LogicalTopN(List<OrderKey> orderKeys, long limit, long offset, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_TOP_N, groupExpression, logicalProperties, child); this.orderKeys = ImmutableList.copyOf(Objects.requireNonNull(orderKeys, "orderKeys can not be null")); @@ -68,11 +68,11 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP return orderKeys; } - public int getOffset() { + public long getOffset() { return offset; } - public int getLimit() { + public long getLimit() { return limit; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java index dfab80443c..8d803d0f88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java @@ -21,6 +21,7 @@ import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Limit; @@ -39,14 +40,14 @@ import java.util.Optional; * Physical limit plan */ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> implements Limit { - + private final LimitPhase phase; private final long limit; private final long offset; public PhysicalLimit(long limit, long offset, - LogicalProperties logicalProperties, + LimitPhase phase, LogicalProperties logicalProperties, CHILD_TYPE child) { - this(limit, offset, Optional.empty(), logicalProperties, child); + this(limit, offset, phase, Optional.empty(), logicalProperties, child); } /** @@ -57,11 +58,12 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_ * @param offset the number of tuples skipped. */ public PhysicalLimit(long limit, long offset, - Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + LimitPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { super(PlanType.PHYSICAL_LIMIT, groupExpression, logicalProperties, child); this.limit = limit; this.offset = offset; + this.phase = phase; } /** @@ -70,14 +72,16 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_ * * @param limit the number of tuples retrieved. * @param offset the number of tuples skipped. + * @param phase the phase of 2-phase limit. */ - public PhysicalLimit(long limit, long offset, Optional<GroupExpression> groupExpression, + public PhysicalLimit(long limit, long offset, LimitPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, StatsDeriveResult statsDeriveResult, CHILD_TYPE child) { super(PlanType.PHYSICAL_LIMIT, groupExpression, logicalProperties, physicalProperties, statsDeriveResult, child); this.limit = limit; this.offset = offset; + this.phase = phase; } public long getLimit() { @@ -88,10 +92,18 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_ return offset; } + public LimitPhase getPhase() { + return phase; + } + + public boolean isGlobal() { + return phase == LimitPhase.GLOBAL; + } + @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new PhysicalLimit<>(limit, offset, getLogicalProperties(), children.get(0)); + return new PhysicalLimit<>(limit, offset, phase, getLogicalProperties(), children.get(0)); } @Override @@ -101,18 +113,18 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_ @Override public PhysicalLimit<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) { - return new PhysicalLimit<>(limit, offset, groupExpression, getLogicalProperties(), child()); + return new PhysicalLimit<>(limit, offset, phase, groupExpression, getLogicalProperties(), child()); } @Override public PhysicalLimit<CHILD_TYPE> withLogicalProperties(Optional<LogicalProperties> logicalProperties) { - return new PhysicalLimit<>(limit, offset, logicalProperties.get(), child()); + return new PhysicalLimit<>(limit, offset, phase, logicalProperties.get(), child()); } @Override public PhysicalLimit<CHILD_TYPE> withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, StatsDeriveResult statsDeriveResult) { - return new PhysicalLimit<>(limit, offset, groupExpression, getLogicalProperties(), physicalProperties, + return new PhysicalLimit<>(limit, offset, phase, groupExpression, getLogicalProperties(), physicalProperties, statsDeriveResult, child()); } @@ -125,7 +137,7 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_ return false; } PhysicalLimit that = (PhysicalLimit) o; - return offset == that.offset && limit == that.limit; + return offset == that.offset && limit == that.limit && phase == that.phase; } @Override @@ -143,6 +155,7 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_ return Utils.toSqlString("PhysicalLimit", "limit", limit, "offset", offset, + "phase", phase, "stats", statsDeriveResult ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java index 13f52eb103..dc01b0332a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java @@ -40,10 +40,10 @@ import java.util.Optional; */ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN { - private final int limit; - private final int offset; + private final long limit; + private final long offset; - public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset, + public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset, SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE child) { this(orderKeys, limit, offset, phase, Optional.empty(), logicalProperties, child); } @@ -51,7 +51,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort< /** * Constructor of PhysicalHashJoinNode. */ - public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset, + public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset, SortPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, child); @@ -63,7 +63,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort< /** * Constructor of PhysicalHashJoinNode. */ - public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset, + public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset, SortPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, StatsDeriveResult statsDeriveResult, CHILD_TYPE child) { super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, physicalProperties, @@ -73,11 +73,11 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort< this.offset = offset; } - public int getLimit() { + public long getLimit() { return limit; } - public int getOffset() { + public long getOffset() { return offset; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java index de6c98f2f7..3cda38d6fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java @@ -317,7 +317,7 @@ public abstract class PlanVisitor<R, C> { } public R visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, C context) { - return visit(topN, context); + return visitAbstractPhysicalSort(topN, context); } public R visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, C context) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java index e268f5a091..40d9d6289a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.FakePlan; import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.LeafPlan; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; @@ -361,7 +362,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void a2bc() { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, student); + LogicalLimit<? extends Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"))) .applyBottomUp( @@ -396,7 +397,7 @@ class MemoTest implements MemoPatternMatchSupported { // invalid case Assertions.assertThrows(IllegalStateException.class, () -> { UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); - LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, student); + LogicalLimit<? extends Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, student) .applyBottomUp( @@ -414,7 +415,7 @@ class MemoTest implements MemoPatternMatchSupported { UnboundRelation a = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); UnboundRelation a2 = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); - LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, a2); + LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, a2); PlanChecker.from(connectContext, a) .setMaxInvokeTimesPerRule(1000) .applyBottomUp( @@ -479,8 +480,8 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void a2bcd() { LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, scan); - LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new LogicalLimit<>(10, 0, limit5); + LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, scan); + LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -507,7 +508,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void ab2a() { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student); + LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -531,7 +532,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void ab2NewA() { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student); + LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -555,7 +556,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void ab2GroupB() { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student); + LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -577,7 +578,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void ab2PlanB() { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student); + LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -599,7 +600,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void ab2c() { UnboundRelation relation = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); - LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, relation); + LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, relation); LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); PlanChecker.from(connectContext, limit10) @@ -622,10 +623,10 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void ab2cd() { UnboundRelation relation = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); - LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, relation); + LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, relation); LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, student); + LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -650,8 +651,8 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void ab2cb() { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student); - LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, student); + LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student); + LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -681,7 +682,7 @@ class MemoTest implements MemoPatternMatchSupported { Assertions.assertThrowsExactly(IllegalStateException.class, () -> { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student); + LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, limit10) .setMaxInvokeTimesPerRule(1000) @@ -707,8 +708,8 @@ class MemoTest implements MemoPatternMatchSupported { Assertions.assertThrowsExactly(IllegalStateException.class, () -> { UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); - LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, student); - LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, limit5); + LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student); + LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -733,11 +734,11 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void ab2cde() { UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); - LogicalLimit<UnboundRelation> limit3 = new LogicalLimit<>(3, 0, student); + LogicalLimit<UnboundRelation> limit3 = new LogicalLimit<>(3, 0, LimitPhase.ORIGIN, student); LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, scan); - LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new LogicalLimit<>(10, 0, limit5); + LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, scan); + LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5); PlanChecker.from(connectContext, limit3) .applyBottomUp( @@ -766,8 +767,8 @@ class MemoTest implements MemoPatternMatchSupported { public void abc2bac() { UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); - LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, student); - LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, limit5); + LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student); + LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -805,8 +806,8 @@ class MemoTest implements MemoPatternMatchSupported { public void abc2bc() { UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")); - LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, student); - LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, limit5); + LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student); + LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -829,7 +830,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void testRewriteBottomPlanToOnePlan() { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, student); + LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, student); LogicalOlapScan score = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); @@ -848,10 +849,10 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void testRewriteBottomPlanToMultiPlan() { LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student); + LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student); LogicalOlapScan score = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); - LogicalLimit<LogicalOlapScan> limit1 = new LogicalLimit<>(1, 0, score); + LogicalLimit<LogicalOlapScan> limit1 = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, score); PlanChecker.from(connectContext, limit10) .applyBottomUp( @@ -892,7 +893,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void testRecomputeLogicalProperties() { UnboundRelation unboundTable = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("score")); - LogicalLimit<UnboundRelation> unboundLimit = new LogicalLimit<>(1, 0, unboundTable); + LogicalLimit<UnboundRelation> unboundLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, unboundTable); LogicalOlapScan boundTable = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); LogicalLimit<Plan> boundLimit = unboundLimit.withChildren(ImmutableList.of(boundTable)); @@ -924,7 +925,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void testEliminateRootWithChildGroupInTwoLevels() { LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); - LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, scan); + LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, scan); PlanChecker.from(connectContext, limit) .applyBottomUp(logicalLimit().then(LogicalLimit::child)) @@ -936,7 +937,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void testEliminateRootWithChildPlanInTwoLevels() { LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); - LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, scan); + LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, scan); PlanChecker.from(connectContext, limit) .applyBottomUp(logicalLimit(any()).then(LogicalLimit::child)) @@ -948,7 +949,7 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void testEliminateTwoLevelsToOnePlan() { LogicalOlapScan score = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); - LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, score); + LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, score); LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); @@ -968,10 +969,10 @@ class MemoTest implements MemoPatternMatchSupported { @Test public void testEliminateTwoLevelsToTwoPlans() { LogicalOlapScan score = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); - LogicalLimit<Plan> limit1 = new LogicalLimit<>(1, 0, score); + LogicalLimit<Plan> limit1 = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, score); LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); - LogicalLimit<Plan> limit10 = new LogicalLimit<>(10, 0, student); + LogicalLimit<Plan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student); PlanChecker.from(connectContext, limit1) .applyBottomUp(logicalLimit(any()).when(limit1::equals).then(l -> limit10)) @@ -998,7 +999,7 @@ class MemoTest implements MemoPatternMatchSupported { public void test() { PlanChecker.from(MemoTestUtils.createConnectContext()) .analyze(new LogicalLimit<>(10, 0, - new LogicalJoin<>(JoinType.LEFT_OUTER_JOIN, + LimitPhase.ORIGIN, new LogicalJoin<>(JoinType.LEFT_OUTER_JOIN, ImmutableList.of(new EqualTo(new UnboundSlot("sid"), new UnboundSlot("id"))), new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score), new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student) diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java index 9cd2888b38..6d7b2413ff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.AggPhase; import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.JoinHint; import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.SortPhase; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; @@ -385,6 +386,7 @@ public class ChildOutputPropertyDeriverTest { public void testTopN() { SlotReference key = new SlotReference("col1", IntegerType.INSTANCE); List<OrderKey> orderKeys = Lists.newArrayList(new OrderKey(key, true, true)); + // localSort require any PhysicalTopN<GroupPlan> sort = new PhysicalTopN<>(orderKeys, 10, 10, SortPhase.LOCAL_SORT, logicalProperties, groupPlan); GroupExpression groupExpression = new GroupExpression(sort); PhysicalProperties child = new PhysicalProperties(DistributionSpecReplicated.INSTANCE, @@ -394,6 +396,17 @@ public class ChildOutputPropertyDeriverTest { ChildOutputPropertyDeriver deriver = new ChildOutputPropertyDeriver(Lists.newArrayList(child)); PhysicalProperties result = deriver.getOutputProperties(groupExpression); Assertions.assertEquals(orderKeys, result.getOrderSpec().getOrderKeys()); + Assertions.assertEquals(DistributionSpecReplicated.INSTANCE, result.getDistributionSpec()); + // merge/gather sort requires gather + sort = new PhysicalTopN<>(orderKeys, 10, 10, SortPhase.MERGE_SORT, logicalProperties, groupPlan); + groupExpression = new GroupExpression(sort); + child = new PhysicalProperties(DistributionSpecReplicated.INSTANCE, + new OrderSpec(Lists.newArrayList( + new OrderKey(new SlotReference("ignored", IntegerType.INSTANCE), true, true)))); + + deriver = new ChildOutputPropertyDeriver(Lists.newArrayList(child)); + result = deriver.getOutputProperties(groupExpression); + Assertions.assertEquals(orderKeys, result.getOrderSpec().getOrderKeys()); Assertions.assertEquals(DistributionSpecGather.INSTANCE, result.getDistributionSpec()); } @@ -401,7 +414,7 @@ public class ChildOutputPropertyDeriverTest { public void testLimit() { SlotReference key = new SlotReference("col1", IntegerType.INSTANCE); List<OrderKey> orderKeys = Lists.newArrayList(new OrderKey(key, true, true)); - PhysicalLimit<GroupPlan> limit = new PhysicalLimit<>(10, 10, logicalProperties, groupPlan); + PhysicalLimit<GroupPlan> limit = new PhysicalLimit<>(10, 10, LimitPhase.ORIGIN, logicalProperties, groupPlan); GroupExpression groupExpression = new GroupExpression(limit); PhysicalProperties child = new PhysicalProperties(DistributionSpecReplicated.INSTANCE, new OrderSpec(orderKeys)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java index 1e7f72dd2e..10dfea7032 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java @@ -22,6 +22,7 @@ import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.GroupPlan; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; @@ -108,7 +109,7 @@ public class ImplementationTest { public void toPhysicalLimitTest() { int limit = 10; int offset = 100; - LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(limit, offset, groupPlan); + LogicalLimit<? extends Plan> logicalLimit = new LogicalLimit<>(limit, offset, LimitPhase.LOCAL, groupPlan); PhysicalPlan physicalPlan = executeImplementationRule(logicalLimit); Assertions.assertEquals(PlanType.PHYSICAL_LIMIT, physicalPlan.getType()); PhysicalLimit<GroupPlan> physicalLimit = (PhysicalLimit<GroupPlan>) physicalPlan; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java index 77cb6df00f..44a88ac0c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java @@ -18,12 +18,17 @@ package org.apache.doris.nereids.rules.rewrite.logical; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalSort; +import org.apache.doris.nereids.trees.plans.logical.LogicalTopN; import org.apache.doris.nereids.util.MemoTestUtils; +import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.nereids.util.PlanConstructor; import com.google.common.collect.Lists; @@ -31,6 +36,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.stream.Collectors; /** * MergeConsecutiveFilter ut @@ -39,7 +45,7 @@ public class EliminateLimitTest { @Test public void testEliminateLimit() { LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0); - LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, scan); + LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, scan); CascadesContext cascadesContext = MemoTestUtils.createCascadesContext(limit); List<Rule> rules = Lists.newArrayList(new EliminateLimit().build()); @@ -48,4 +54,17 @@ public class EliminateLimitTest { Plan actual = cascadesContext.getMemo().copyOut(); Assertions.assertTrue(actual instanceof LogicalEmptyRelation); } + + @Test + public void testLimitSort() { + LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0); + LogicalLimit limit = new LogicalLimit<>(1, 1, LimitPhase.ORIGIN, + new LogicalSort<>(scan.getOutput().stream().map(c -> new OrderKey(c, true, true)).collect(Collectors.toList()), + scan)); + + Plan actual = PlanChecker.from(MemoTestUtils.createConnectContext(), limit) + .rewrite() + .getPlan(); + Assertions.assertTrue(actual instanceof LogicalTopN); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java index fa7270def9..869dec982f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java @@ -20,6 +20,7 @@ package org.apache.doris.nereids.rules.rewrite.logical; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.RelationUtil; import org.apache.doris.nereids.util.MemoTestUtils; @@ -33,10 +34,10 @@ import java.util.List; public class MergeLimitsTest { @Test public void testMergeConsecutiveLimits() { - LogicalLimit limit3 = new LogicalLimit<>(3, 5, new UnboundRelation( + LogicalLimit limit3 = new LogicalLimit<>(3, 5, LimitPhase.ORIGIN, new UnboundRelation( RelationUtil.newRelationId(), Lists.newArrayList("db", "t"))); - LogicalLimit limit2 = new LogicalLimit<>(2, 0, limit3); - LogicalLimit limit1 = new LogicalLimit<>(10, 0, limit2); + LogicalLimit limit2 = new LogicalLimit<>(2, 0, LimitPhase.ORIGIN, limit3); + LogicalLimit limit1 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit2); CascadesContext context = MemoTestUtils.createCascadesContext(limit1); List<Rule> rules = Lists.newArrayList(new MergeLimits().build()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java index ea1fc58942..57e38b32d1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java @@ -22,6 +22,7 @@ import org.apache.doris.nereids.pattern.PatternDescriptor; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; @@ -204,7 +205,6 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup // plan among fragments has duplicate elements. (s1, s2) -> s1) ); - // limit is push down to left scan of `t1`. Assertions.assertEquals(2, nameToScan.size()); Assertions.assertEquals(5, nameToScan.get("t1").getLimit()); @@ -212,6 +212,14 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup ); } + @Test + public void testLimitPushSort() { + PlanChecker.from(connectContext) + .analyze("select k1 from t1 order by k1 limit 1") + .rewrite() + .matches(logicalTopN()); + } + @Test public void testLimitPushUnion() { PlanChecker.from(connectContext) @@ -229,8 +237,10 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup logicalOlapScan().when(scan -> "t2".equals(scan.getTable().getName())) ), logicalLimit( - logicalProject( - logicalOlapScan().when(scan -> "t3".equals(scan.getTable().getName())) + logicalLimit( + logicalProject( + logicalOlapScan().when(scan -> "t3".equals(scan.getTable().getName())) + ) ) ) ) @@ -261,12 +271,12 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup if (hasProject) { // return limit -> project -> join - return new LogicalLimit<>(10, 0, new LogicalProject<>( + return new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, new LogicalProject<>( ImmutableList.of(new UnboundSlot("sid"), new UnboundSlot("id")), join)); } else { // return limit -> join - return new LogicalLimit<>(10, 0, join); + return new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, join); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java similarity index 57% copy from fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java copy to fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java index 77cb6df00f..174f5a90b4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java @@ -17,35 +17,25 @@ package org.apache.doris.nereids.rules.rewrite.logical; -import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.util.MemoTestUtils; +import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.nereids.util.PlanConstructor; -import com.google.common.collect.Lists; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.List; +public class SplitLimitTest { + private final LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0); -/** - * MergeConsecutiveFilter ut - */ -public class EliminateLimitTest { @Test - public void testEliminateLimit() { - LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0); - LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, scan); - - CascadesContext cascadesContext = MemoTestUtils.createCascadesContext(limit); - List<Rule> rules = Lists.newArrayList(new EliminateLimit().build()); - cascadesContext.topDownRewrite(rules); - - Plan actual = cascadesContext.getMemo().copyOut(); - Assertions.assertTrue(actual instanceof LogicalEmptyRelation); + void testSplitLimit() { + Plan plan = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, scan1); + plan = PlanChecker.from(MemoTestUtils.createConnectContext(), plan) + .rewrite() + .getPlan(); + plan.anyMatch(x -> x instanceof LogicalLimit && ((LogicalLimit<?>) x).isSplit()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java index e931957be2..94f6a07d33 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java @@ -28,6 +28,8 @@ import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; import org.apache.doris.nereids.trees.plans.FakePlan; import org.apache.doris.nereids.trees.plans.GroupPlan; +import org.apache.doris.nereids.trees.plans.LimitPhase; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; @@ -279,7 +281,9 @@ public class StatsCalculatorTest { GroupPlan groupPlan = new GroupPlan(childGroup); childGroup.setStatistics(childStats); - LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(1, 2, groupPlan); + LogicalLimit<? extends Plan> logicalLimit = new LogicalLimit<>(1, 2, + LimitPhase.GLOBAL, new LogicalLimit<>(1, 2, LimitPhase.LOCAL, groupPlan) + ); GroupExpression groupExpression = new GroupExpression(logicalLimit, ImmutableList.of(childGroup)); Group ownerGroup = newGroup(); ownerGroup.addGroupExpression(groupExpression); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java index aed7bbbbd5..be62fc278b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java @@ -46,7 +46,7 @@ public class PlanToStringTest { @Test public void testLogicalLimit(@Mocked Plan child) { - LogicalLimit<Plan> plan = new LogicalLimit<>(0, 0, child); + LogicalLimit<Plan> plan = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, child); Assertions.assertEquals("LogicalLimit ( limit=0, offset=0 )", plan.toString()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java index c719302ac5..2bca62bab1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.JoinHint; import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; @@ -122,7 +123,7 @@ public class LogicalPlanBuilder { } public LogicalPlanBuilder limit(long limit, long offset) { - LogicalLimit<LogicalPlan> limitPlan = new LogicalLimit<>(limit, offset, this.plan); + LogicalLimit<LogicalPlan> limitPlan = new LogicalLimit<>(limit, offset, LimitPhase.ORIGIN, this.plan); return from(limitPlan); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org