This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 704faaed84 [feature](Nereids) add rule split limit into two phase 
(#16797)
704faaed84 is described below

commit 704faaed84464e6c2f3e7fe2b07602ff3ddd283f
Author: 谢健 <jianx...@gmail.com>
AuthorDate: Tue Mar 7 15:34:12 2023 +0800

    [feature](Nereids) add rule split limit into two phase (#16797)
    
    1. Add a rule split limit, like Limit(Origin) ==> Limit(Global) -> Gather 
-> Limit(Local)
    2. Add a rule: limit-> sort ==> topN
    3. fix a bug about topN
    4. make the type of limit,offset long in topN
    And because this rule is always beneficial, we add a rule in the rewrite 
phase
---
 .../glue/translator/PhysicalPlanTranslator.java    | 62 ++++++++-----------
 .../doris/nereids/jobs/batch/NereidsRewriter.java  |  2 +
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  3 +-
 .../properties/ChildOutputPropertyDeriver.java     | 12 +---
 .../nereids/properties/RequestPropertyDeriver.java | 15 ++++-
 .../org/apache/doris/nereids/rules/RuleType.java   |  4 +-
 .../LogicalLimitToPhysicalLimit.java               |  1 +
 .../implementation/LogicalTopNToPhysicalTopN.java  | 12 +++-
 .../rules/rewrite/logical/ExistsApplyToJoin.java   |  5 +-
 .../nereids/rules/rewrite/logical/MergeLimits.java | 19 +++---
 .../rules/rewrite/logical/PushdownLimit.java       | 12 ++++
 .../logical/PushdownProjectThroughLimit.java       |  4 +-
 .../logical/{MergeLimits.java => SplitLimit.java}  | 42 ++++++-------
 .../plans/{algebra/TopN.java => LimitPhase.java}   | 20 +++++--
 .../doris/nereids/trees/plans/algebra/TopN.java    |  4 +-
 .../nereids/trees/plans/logical/LogicalLimit.java  | 26 +++++---
 .../nereids/trees/plans/logical/LogicalTopN.java   | 12 ++--
 .../trees/plans/physical/PhysicalLimit.java        | 33 +++++++----
 .../nereids/trees/plans/physical/PhysicalTopN.java | 14 ++---
 .../nereids/trees/plans/visitor/PlanVisitor.java   |  2 +-
 .../org/apache/doris/nereids/memo/MemoTest.java    | 69 +++++++++++-----------
 .../properties/ChildOutputPropertyDeriverTest.java | 15 ++++-
 .../rules/implementation/ImplementationTest.java   |  3 +-
 .../rules/rewrite/logical/EliminateLimitTest.java  | 21 ++++++-
 .../rules/rewrite/logical/MergeLimitsTest.java     |  7 ++-
 .../rules/rewrite/logical/PushdownLimitTest.java   | 20 +++++--
 ...EliminateLimitTest.java => SplitLimitTest.java} | 30 ++++------
 .../doris/nereids/stats/StatsCalculatorTest.java   |  6 +-
 .../nereids/trees/plans/PlanToStringTest.java      |  2 +-
 .../doris/nereids/util/LogicalPlanBuilder.java     |  3 +-
 30 files changed, 283 insertions(+), 197 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b28db7c0f8..500f0360a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -808,9 +808,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PlanFragment inputFragment = topN.child(0).accept(this, context);
         PlanFragment currentFragment = inputFragment;
 
-        //1. generate new fragment for sort when the child is exchangeNode
-        if (inputFragment.getPlanRoot() instanceof ExchangeNode) {
-            Preconditions.checkArgument(!topN.getSortPhase().isLocal());
+        //1. Generate new fragment for sort when the child is exchangeNode, If 
the child is
+        // mergingExchange, it means we have always generated a new fragment 
when processing mergeSort
+        if (inputFragment.getPlanRoot() instanceof ExchangeNode
+                && !((ExchangeNode) 
inputFragment.getPlanRoot()).isMergingExchange()) {
+            // Except LocalTopN->MergeTopN, we don't allow localTopN's child 
is Exchange Node
+            Preconditions.checkArgument(!topN.getSortPhase().isLocal(),
+                    "local sort requires any property but child is" + 
inputFragment.getPlanRoot());
             DataPartition outputPartition = DataPartition.UNPARTITIONED;
             ExchangeNode exchangeNode = (ExchangeNode) 
inputFragment.getPlanRoot();
             inputFragment.setOutputPartition(outputPartition);
@@ -822,20 +826,28 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
 
         // 2. According to the type of sort, generate physical plan
         if (!topN.getSortPhase().isMerge()) {
-            // For localSort or Gather->Sort, we just need to add sortNode
+            // For localSort or Gather->Sort, we just need to add TopNNode
             SortNode sortNode = translateSortNode(topN, 
inputFragment.getPlanRoot(), context);
+            sortNode.setOffset(topN.getOffset());
+            sortNode.setLimit(topN.getLimit());
             currentFragment.addPlanRoot(sortNode);
         } else {
             // For mergeSort, we need to push sortInfo to exchangeNode
             if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) {
                 // if there is no exchange node for mergeSort
-                //   e.g., localSort -> mergeSort
+                //   e.g., mergeTopN -> localTopN
                 // It means the local has satisfied the Gather property. We 
can just ignore mergeSort
+                currentFragment.getPlanRoot().setOffset(topN.getOffset());
+                currentFragment.getPlanRoot().setLimit(topN.getLimit());
                 return currentFragment;
             }
-            Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof 
SortNode);
+            Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof 
SortNode,
+                    "mergeSort' child must be sortNode");
             SortNode sortNode = (SortNode) inputFragment.getPlanRoot();
-            ((ExchangeNode) 
currentFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
+            ExchangeNode exchangeNode = (ExchangeNode) 
currentFragment.getPlanRoot();
+            exchangeNode.setMergeInfo(sortNode.getSortInfo());
+            exchangeNode.setLimit(topN.getLimit());
+            exchangeNode.setOffset(topN.getOffset());
         }
         return currentFragment;
     }
@@ -1388,38 +1400,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         if (inputFragment == null) {
             return inputFragment;
         }
-
+        // For case globalLimit(l, o) -> LocalLimit(l+o, 0), that is the 
LocalLimit has already gathered
+        // The globalLimit can overwrite the limit and offset, so it's still 
correct
         PlanNode child = inputFragment.getPlanRoot();
-
-        // physical plan:  limit --> sort
-        // after translate, it could be:
-        // 1. limit->sort => set (limit and offset) on sort
-        // 2. limit->exchange->sort => set (limit and offset) on exchange, set 
sort.limit = limit+offset
-        if (child instanceof SortNode) {
-            SortNode sort = (SortNode) child;
-            sort.setLimit(physicalLimit.getLimit());
-            sort.setOffset(physicalLimit.getOffset());
-            return inputFragment;
-        }
-        if (child instanceof ExchangeNode) {
-            ExchangeNode exchangeNode = (ExchangeNode) child;
-            exchangeNode.setLimit(physicalLimit.getLimit());
-            // we do not check if this is a merging exchange here,
-            // since this guaranteed by translating logic plan to physical plan
-            exchangeNode.setOffset(physicalLimit.getOffset());
-            if (exchangeNode.getChild(0) instanceof SortNode) {
-                SortNode sort = (SortNode) exchangeNode.getChild(0);
-                sort.setLimit(physicalLimit.getLimit() + 
physicalLimit.getOffset());
-                sort.setOffset(0);
-            }
-            return inputFragment;
-        }
-        // for other PlanNode, just set limit as limit+offset
-        child.setLimit(physicalLimit.getLimit() + physicalLimit.getOffset());
-        PlanFragment planFragment = exchangeToMergeFragment(inputFragment, 
context);
-        planFragment.getPlanRoot().setLimit(physicalLimit.getLimit());
-        
planFragment.getPlanRoot().setOffSetDirectly(physicalLimit.getOffset());
-        return planFragment;
+        child.setLimit(physicalLimit.getLimit());
+        child.setOffset(physicalLimit.getOffset());
+        return inputFragment;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
index cfe29964f1..1d51420a7e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
@@ -61,6 +61,7 @@ import 
org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet;
 import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit;
 import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin;
+import org.apache.doris.nereids.rules.rewrite.logical.SplitLimit;
 
 import java.util.List;
 
@@ -192,6 +193,7 @@ public class NereidsRewriter extends BatchRewriteJob {
                     new EliminateAggregate(),
                     new MergeSetOperations(),
                     new PushdownLimit(),
+                    new SplitLimit(),
                     new BuildAggForUnion()
             )),
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 185e74368b..3947e8139b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -194,6 +194,7 @@ import 
org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
 import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
 import org.apache.doris.nereids.trees.plans.JoinHint;
 import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
 import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
@@ -1346,7 +1347,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             if (offsetToken != null) {
                 offset = Long.parseLong(offsetToken.getText());
             }
-            return new LogicalLimit<>(limit, offset, input);
+            return new LogicalLimit<>(limit, offset, LimitPhase.ORIGIN, input);
         });
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 4de8c769fe..4623ba311e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -26,6 +26,7 @@ import 
org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@@ -39,10 +40,8 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.JoinUtils;
 
@@ -105,13 +104,8 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
     }
 
     @Override
-    public PhysicalProperties visitPhysicalTopN(PhysicalTopN<? extends Plan> 
topN, PlanContext context) {
-        Preconditions.checkState(childrenOutputProperties.size() == 1);
-        return new PhysicalProperties(DistributionSpecGather.INSTANCE, new 
OrderSpec(topN.getOrderKeys()));
-    }
-
-    @Override
-    public PhysicalProperties visitPhysicalQuickSort(PhysicalQuickSort<? 
extends Plan> sort, PlanContext context) {
+    public PhysicalProperties visitAbstractPhysicalSort(AbstractPhysicalSort<? 
extends Plan> sort,
+            PlanContext context) {
         Preconditions.checkState(childrenOutputProperties.size() == 1);
         if (sort.getSortPhase().isLocal()) {
             return new PhysicalProperties(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 77d17f2701..2ca4fdc169 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -29,11 +29,12 @@ import 
org.apache.doris.nereids.trees.expressions.SlotReference;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
 import org.apache.doris.nereids.trees.plans.JoinHint;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
@@ -94,7 +95,7 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, 
PlanContext> {
     }
 
     @Override
-    public Void visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, 
PlanContext context) {
+    public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> 
sort, PlanContext context) {
         if (!sort.getSortPhase().isLocal()) {
             addRequestPropertyToChildren(PhysicalProperties.GATHER);
         } else {
@@ -103,6 +104,16 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
         return null;
     }
 
+    @Override
+    public Void visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, 
PlanContext context) {
+        if (limit.isGlobal()) {
+            addRequestPropertyToChildren(PhysicalProperties.GATHER);
+        } else {
+            addRequestPropertyToChildren(PhysicalProperties.ANY);
+        }
+        return null;
+    }
+
     @Override
     public Void visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? 
extends Plan> hashJoin, PlanContext context) {
         JoinHint hint = hashJoin.getHint();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index cf55dedddd..13c47df6a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -188,13 +188,15 @@ public enum RuleType {
     INNER_TO_CROSS_JOIN(RuleTypeClass.REWRITE),
     REWRITE_SENTINEL(RuleTypeClass.REWRITE),
 
+    // split limit
+    SPLIT_LIMIT(RuleTypeClass.REWRITE),
     // limit push down
     PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE),
     PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
     PUSH_LIMIT_THROUGH_UNION(RuleTypeClass.REWRITE),
     PUSH_LIMIT_THROUGH_ONE_ROW_RELATION(RuleTypeClass.REWRITE),
     PUSH_LIMIT_THROUGH_EMPTY_RELATION(RuleTypeClass.REWRITE),
-
+    PUSH_LIMIT_INTO_SORT(RuleTypeClass.REWRITE),
     // adjust nullable
     ADJUST_NULLABLE_ON_AGGREGATE(RuleTypeClass.REWRITE),
     ADJUST_NULLABLE_ON_ASSERT_NUM_ROWS(RuleTypeClass.REWRITE),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java
index 836d0b0344..5742cee12e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java
@@ -30,6 +30,7 @@ public class LogicalLimitToPhysicalLimit extends 
OneImplementationRuleFactory {
         return logicalLimit().then(limit -> new PhysicalLimit<>(
                 limit.getLimit(),
                 limit.getOffset(),
+                limit.getPhase(),
                 limit.getLogicalProperties(),
                 limit.child())
         ).toRule(RuleType.LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
index ac90ff8acc..bf675fe264 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
@@ -38,10 +38,16 @@ public class LogicalTopNToPhysicalTopN extends 
OneImplementationRuleFactory {
                 .toRule(RuleType.LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE);
     }
 
+    /**
+     * before: logicalTopN(off, limit)
+     * after:
+     *     gatherTopN(limit, off, require gather)
+     *     mergeTopN(limit, off, require gather) -> localTopN(off+limit, 0, 
require any)
+     */
     private List<PhysicalTopN<? extends Plan>> twoPhaseSort(LogicalTopN 
logicalTopN) {
-        PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(), 
logicalTopN.getLimit(),
-                logicalTopN.getOffset(), SortPhase.LOCAL_SORT, 
logicalTopN.getLogicalProperties(), logicalTopN.child(0)
-        );
+        PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(),
+                logicalTopN.getLimit() + logicalTopN.getOffset(), 0, 
SortPhase.LOCAL_SORT,
+                logicalTopN.getLogicalProperties(), logicalTopN.child(0));
         PhysicalTopN twoPhaseSort = new 
PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
                 logicalTopN.getOffset(), SortPhase.MERGE_SORT, 
logicalTopN.getLogicalProperties(), localSort);
         PhysicalTopN onePhaseSort = new 
PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java
index ca776b83b7..a7447f1b35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java
@@ -28,6 +28,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
 import org.apache.doris.nereids.trees.plans.JoinHint;
 import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
@@ -117,7 +118,7 @@ public class ExistsApplyToJoin extends 
OneRewriteRuleFactory {
     }
 
     private Plan unCorrelatedNotExist(LogicalApply unapply) {
-        LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan) 
unapply.right());
+        LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, 
(LogicalPlan) unapply.right());
         Alias alias = new Alias(new Count(), "count(*)");
         LogicalAggregate newAgg = new LogicalAggregate<>(new ArrayList<>(),
                 ImmutableList.of(alias), newLimit);
@@ -128,7 +129,7 @@ public class ExistsApplyToJoin extends 
OneRewriteRuleFactory {
     }
 
     private Plan unCorrelatedExist(LogicalApply unapply) {
-        LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan) 
unapply.right());
+        LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, 
(LogicalPlan) unapply.right());
         return new LogicalJoin<>(JoinType.CROSS_JOIN, (LogicalPlan) 
unapply.left(), newLimit);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
index 4b3ec22fb0..1d6b0ab48f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
@@ -42,13 +42,16 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 public class MergeLimits extends OneRewriteRuleFactory {
     @Override
     public Rule build() {
-        return logicalLimit(logicalLimit()).then(upperLimit -> {
-            LogicalLimit<? extends Plan> bottomLimit = upperLimit.child();
-            return new LogicalLimit<>(
-                    Math.min(upperLimit.getLimit(), 
Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)),
-                    bottomLimit.getOffset() + upperLimit.getOffset(),
-                    bottomLimit.child()
-            );
-        }).toRule(RuleType.MERGE_LIMITS);
+        return logicalLimit(logicalLimit())
+                .when(upperLimit -> 
upperLimit.getPhase().equals(upperLimit.child().getPhase()))
+                .then(upperLimit -> {
+                    LogicalLimit<? extends Plan> bottomLimit = 
upperLimit.child();
+                    return new LogicalLimit<>(
+                            Math.min(upperLimit.getLimit(),
+                                    Math.max(bottomLimit.getLimit() - 
upperLimit.getOffset(), 0)),
+                            bottomLimit.getOffset() + upperLimit.getOffset(),
+                            bottomLimit.getPhase(), bottomLimit.child()
+                    );
+                }).toRule(RuleType.MERGE_LIMITS);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java
index 1b2b1a4426..b5b4614410 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java
@@ -28,6 +28,8 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
 import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
 
 import com.google.common.collect.ImmutableList;
@@ -82,6 +84,16 @@ public class PushdownLimit implements RewriteRuleFactory {
                             return 
limit.withChildren(union.withChildren(newUnionChildren));
                         })
                         .toRule(RuleType.PUSH_LIMIT_THROUGH_UNION),
+                // limit -> sort ==> topN
+                logicalLimit(logicalSort())
+                        .then(limit -> {
+                            LogicalSort sort = limit.child();
+                            LogicalTopN topN = new 
LogicalTopN(sort.getOrderKeys(),
+                                    limit.getLimit(),
+                                    limit.getOffset(),
+                                    sort.child(0));
+                            return topN;
+                        }).toRule(RuleType.PUSH_LIMIT_INTO_SORT),
                 logicalLimit(logicalOneRowRelation())
                         .then(limit -> limit.getLimit() > 0
                                 ? limit.child() : new 
LogicalEmptyRelation(limit.child().getOutput()))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java
index b9f0f70d2a..c1705250a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java
@@ -51,8 +51,8 @@ public class PushdownProjectThroughLimit extends 
OneRewriteRuleFactory {
         return logicalProject(logicalLimit(any())).thenApply(ctx -> {
             LogicalProject<LogicalLimit<Plan>> logicalProject = ctx.root;
             LogicalLimit<Plan> logicalLimit = logicalProject.child();
-            return new LogicalLimit<>(logicalLimit.getLimit(),
-                    logicalLimit.getOffset(), new 
LogicalProject<>(logicalProject.getProjects(),
+            return new LogicalLimit<>(logicalLimit.getLimit(), 
logicalLimit.getOffset(),
+                    logicalLimit.getPhase(), new 
LogicalProject<>(logicalProject.getProjects(),
                     logicalLimit.child()));
         }).toRule(RuleType.PUSHDOWN_PROJECT_THROUGH_LIMIT);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java
similarity index 55%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java
index 4b3ec22fb0..abd7b0a49c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java
@@ -20,35 +20,29 @@ package org.apache.doris.nereids.rules.rewrite.logical;
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
-import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 
 /**
- * This rule aims to merge consecutive limits.
- * <pre>
- * input plan:
- *   LIMIT1(limit=10, offset=0)
- *     |
- *   LIMIT2(limit=3, offset=5)
- *
- * output plan:
- *    LIMIT(limit=3, offset=5)
- *
- * merged limit = min(LIMIT1.limit, LIMIT2.limit)
- * merged offset = LIMIT2.offset
- * </pre>
- * Note that the top limit should not have valid offset info.
+ * Split limit into two phase
+ * before:
+ *  Limit(origin) limit, offset
+ * after:
+ *  Limit(global) limit, offset
+ *      |
+ *  Limit(local) limit + offset, 0
  */
-public class MergeLimits extends OneRewriteRuleFactory {
+public class SplitLimit extends OneRewriteRuleFactory {
     @Override
     public Rule build() {
-        return logicalLimit(logicalLimit()).then(upperLimit -> {
-            LogicalLimit<? extends Plan> bottomLimit = upperLimit.child();
-            return new LogicalLimit<>(
-                    Math.min(upperLimit.getLimit(), 
Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)),
-                    bottomLimit.getOffset() + upperLimit.getOffset(),
-                    bottomLimit.child()
-            );
-        }).toRule(RuleType.MERGE_LIMITS);
+        return logicalLimit().when(limit -> !limit.isSplit())
+                .then(limit -> {
+                    long l = limit.getLimit();
+                    long o = limit.getOffset();
+                    return new LogicalLimit<>(l, o,
+                            LimitPhase.GLOBAL, new LogicalLimit<>(l + o, 0, 
LimitPhase.LOCAL, limit.child())
+                    );
+                }).toRule(RuleType.SPLIT_LIMIT);
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
 b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java
similarity index 63%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java
index d79fe003ed..705c712ef4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java
@@ -15,14 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.trees.plans.algebra;
+package org.apache.doris.nereids.trees.plans;
 
 /**
- * Common interface for logical/physical TopN.
+ * Limit phase for logical and physical limit, like
+ *          LocalLimit -> Gather -> GlobalLimit
+ * Origin is used to mark the limit operator that has not been split into 
2-phase
  */
-public interface TopN extends Sort {
+public enum LimitPhase {
+    LOCAL("LOCAL"),
+    GLOBAL("GLOBAL"),
+    ORIGIN("ORIGIN");
+    private final String name;
 
-    int getOffset();
+    LimitPhase(String name) {
+        this.name = name;
+    }
 
-    int getLimit();
+    public boolean isLocal() {
+        return this == LOCAL;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
index d79fe003ed..c214dffbbf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
@@ -22,7 +22,7 @@ package org.apache.doris.nereids.trees.plans.algebra;
  */
 public interface TopN extends Sort {
 
-    int getOffset();
+    long getOffset();
 
-    int getLimit();
+    long getLimit();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
index 75ff5f1ea9..d632b959e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
@@ -21,6 +21,7 @@ import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.algebra.Limit;
@@ -44,19 +45,28 @@ import java.util.Optional;
  * offset 100
  */
 public class LogicalLimit<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> implements Limit {
-
+    private final LimitPhase phase;
     private final long limit;
     private final long offset;
 
-    public LogicalLimit(long limit, long offset, CHILD_TYPE child) {
-        this(limit, offset, Optional.empty(), Optional.empty(), child);
+    public LogicalLimit(long limit, long offset, LimitPhase phase, CHILD_TYPE 
child) {
+        this(limit, offset, phase, Optional.empty(), Optional.empty(), child);
     }
 
-    public LogicalLimit(long limit, long offset, Optional<GroupExpression> 
groupExpression,
+    public LogicalLimit(long limit, long offset, LimitPhase phase, 
Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
         super(PlanType.LOGICAL_LIMIT, groupExpression, logicalProperties, 
child);
         this.limit = limit;
         this.offset = offset;
+        this.phase = phase;
+    }
+
+    public LimitPhase getPhase() {
+        return phase;
+    }
+
+    public boolean isSplit() {
+        return phase != LimitPhase.ORIGIN;
     }
 
     public long getLimit() {
@@ -94,7 +104,7 @@ public class LogicalLimit<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TY
             return false;
         }
         LogicalLimit that = (LogicalLimit) o;
-        return limit == that.limit && offset == that.offset;
+        return limit == that.limit && offset == that.offset && phase == 
that.phase;
     }
 
     @Override
@@ -108,17 +118,17 @@ public class LogicalLimit<CHILD_TYPE extends Plan> 
extends LogicalUnary<CHILD_TY
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new LogicalLimit<>(limit, offset, groupExpression, 
Optional.of(getLogicalProperties()), child());
+        return new LogicalLimit<>(limit, offset, phase, groupExpression, 
Optional.of(getLogicalProperties()), child());
     }
 
     @Override
     public Plan withLogicalProperties(Optional<LogicalProperties> 
logicalProperties) {
-        return new LogicalLimit<>(limit, offset, Optional.empty(), 
logicalProperties, child());
+        return new LogicalLimit<>(limit, offset, phase, Optional.empty(), 
logicalProperties, child());
     }
 
     @Override
     public LogicalLimit<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new LogicalLimit<>(limit, offset, children.get(0));
+        return new LogicalLimit<>(limit, offset, phase, children.get(0));
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
index da78e27cef..cb07601ffa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
@@ -41,17 +41,17 @@ import java.util.Optional;
 public class LogicalTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> implements TopN {
 
     private final List<OrderKey> orderKeys;
-    private final int limit;
-    private final int offset;
+    private final long limit;
+    private final long offset;
 
-    public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset, 
CHILD_TYPE child) {
+    public LogicalTopN(List<OrderKey> orderKeys, long limit, long offset, 
CHILD_TYPE child) {
         this(orderKeys, limit, offset, Optional.empty(), Optional.empty(), 
child);
     }
 
     /**
      * Constructor for LogicalSort.
      */
-    public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset, 
Optional<GroupExpression> groupExpression,
+    public LogicalTopN(List<OrderKey> orderKeys, long limit, long offset, 
Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
         super(PlanType.LOGICAL_TOP_N, groupExpression, logicalProperties, 
child);
         this.orderKeys = 
ImmutableList.copyOf(Objects.requireNonNull(orderKeys, "orderKeys can not be 
null"));
@@ -68,11 +68,11 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
         return orderKeys;
     }
 
-    public int getOffset() {
+    public long getOffset() {
         return offset;
     }
 
-    public int getLimit() {
+    public long getLimit() {
         return limit;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
index dfab80443c..8d803d0f88 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
@@ -21,6 +21,7 @@ import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.algebra.Limit;
@@ -39,14 +40,14 @@ import java.util.Optional;
  * Physical limit plan
  */
 public class PhysicalLimit<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_TYPE> implements Limit {
-
+    private final LimitPhase phase;
     private final long limit;
     private final long offset;
 
     public PhysicalLimit(long limit, long offset,
-            LogicalProperties logicalProperties,
+            LimitPhase phase, LogicalProperties logicalProperties,
             CHILD_TYPE child) {
-        this(limit, offset, Optional.empty(), logicalProperties, child);
+        this(limit, offset, phase, Optional.empty(), logicalProperties, child);
     }
 
     /**
@@ -57,11 +58,12 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_
      * @param offset the number of tuples skipped.
      */
     public PhysicalLimit(long limit, long offset,
-            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
+            LimitPhase phase, Optional<GroupExpression> groupExpression, 
LogicalProperties logicalProperties,
             CHILD_TYPE child) {
         super(PlanType.PHYSICAL_LIMIT, groupExpression, logicalProperties, 
child);
         this.limit = limit;
         this.offset = offset;
+        this.phase = phase;
     }
 
     /**
@@ -70,14 +72,16 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_
      *
      * @param limit the number of tuples retrieved.
      * @param offset the number of tuples skipped.
+     * @param phase the phase of 2-phase limit.
      */
-    public PhysicalLimit(long limit, long offset, Optional<GroupExpression> 
groupExpression,
+    public PhysicalLimit(long limit, long offset, LimitPhase phase, 
Optional<GroupExpression> groupExpression,
             LogicalProperties logicalProperties, PhysicalProperties 
physicalProperties,
             StatsDeriveResult statsDeriveResult, CHILD_TYPE child) {
         super(PlanType.PHYSICAL_LIMIT, groupExpression, logicalProperties, 
physicalProperties, statsDeriveResult,
                 child);
         this.limit = limit;
         this.offset = offset;
+        this.phase = phase;
     }
 
     public long getLimit() {
@@ -88,10 +92,18 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_
         return offset;
     }
 
+    public LimitPhase getPhase() {
+        return phase;
+    }
+
+    public boolean isGlobal() {
+        return phase == LimitPhase.GLOBAL;
+    }
+
     @Override
     public Plan withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new PhysicalLimit<>(limit, offset, getLogicalProperties(), 
children.get(0));
+        return new PhysicalLimit<>(limit, offset, phase, 
getLogicalProperties(), children.get(0));
     }
 
     @Override
@@ -101,18 +113,18 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHILD_
 
     @Override
     public PhysicalLimit<CHILD_TYPE> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
-        return new PhysicalLimit<>(limit, offset, groupExpression, 
getLogicalProperties(), child());
+        return new PhysicalLimit<>(limit, offset, phase, groupExpression, 
getLogicalProperties(), child());
     }
 
     @Override
     public PhysicalLimit<CHILD_TYPE> 
withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
-        return new PhysicalLimit<>(limit, offset, logicalProperties.get(), 
child());
+        return new PhysicalLimit<>(limit, offset, phase, 
logicalProperties.get(), child());
     }
 
     @Override
     public PhysicalLimit<CHILD_TYPE> 
withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
             StatsDeriveResult statsDeriveResult) {
-        return new PhysicalLimit<>(limit, offset, groupExpression, 
getLogicalProperties(), physicalProperties,
+        return new PhysicalLimit<>(limit, offset, phase, groupExpression, 
getLogicalProperties(), physicalProperties,
                 statsDeriveResult, child());
     }
 
@@ -125,7 +137,7 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_
             return false;
         }
         PhysicalLimit that = (PhysicalLimit) o;
-        return offset == that.offset && limit == that.limit;
+        return offset == that.offset && limit == that.limit && phase == 
that.phase;
     }
 
     @Override
@@ -143,6 +155,7 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_
         return Utils.toSqlString("PhysicalLimit",
                 "limit", limit,
                 "offset", offset,
+                "phase", phase,
                 "stats", statsDeriveResult
         );
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 13f52eb103..dc01b0332a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -40,10 +40,10 @@ import java.util.Optional;
  */
 public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<CHILD_TYPE> implements TopN {
 
-    private final int limit;
-    private final int offset;
+    private final long limit;
+    private final long offset;
 
-    public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
+    public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
             SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE 
child) {
         this(orderKeys, limit, offset, phase, Optional.empty(), 
logicalProperties, child);
     }
@@ -51,7 +51,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
     /**
      * Constructor of PhysicalHashJoinNode.
      */
-    public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
+    public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
             SortPhase phase, Optional<GroupExpression> groupExpression, 
LogicalProperties logicalProperties,
             CHILD_TYPE child) {
         super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, 
logicalProperties, child);
@@ -63,7 +63,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
     /**
      * Constructor of PhysicalHashJoinNode.
      */
-    public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
+    public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
             SortPhase phase, Optional<GroupExpression> groupExpression, 
LogicalProperties logicalProperties,
             PhysicalProperties physicalProperties, StatsDeriveResult 
statsDeriveResult, CHILD_TYPE child) {
         super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, 
logicalProperties, physicalProperties,
@@ -73,11 +73,11 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
         this.offset = offset;
     }
 
-    public int getLimit() {
+    public long getLimit() {
         return limit;
     }
 
-    public int getOffset() {
+    public long getOffset() {
         return offset;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index de6c98f2f7..3cda38d6fb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -317,7 +317,7 @@ public abstract class PlanVisitor<R, C> {
     }
 
     public R visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, C context) {
-        return visit(topN, context);
+        return visitAbstractPhysicalSort(topN, context);
     }
 
     public R visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, C 
context) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java
index e268f5a091..40d9d6289a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.FakePlan;
 import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.LeafPlan;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@@ -361,7 +362,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void a2bc() {
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, 
student);
+        LogicalLimit<? extends Plan> limit = new LogicalLimit<>(1, 0, 
LimitPhase.ORIGIN, student);
 
         PlanChecker.from(connectContext, new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")))
                 .applyBottomUp(
@@ -396,7 +397,7 @@ class MemoTest implements MemoPatternMatchSupported {
         // invalid case
         Assertions.assertThrows(IllegalStateException.class, () -> {
             UnboundRelation student = new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
-            LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, 
student);
+            LogicalLimit<? extends Plan> limit = new LogicalLimit<>(1, 0, 
LimitPhase.ORIGIN, student);
 
             PlanChecker.from(connectContext, student)
                     .applyBottomUp(
@@ -414,7 +415,7 @@ class MemoTest implements MemoPatternMatchSupported {
         UnboundRelation a = new UnboundRelation(RelationUtil.newRelationId(), 
ImmutableList.of("student"));
 
         UnboundRelation a2 = new UnboundRelation(RelationUtil.newRelationId(), 
ImmutableList.of("student"));
-        LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, a2);
+        LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, 
LimitPhase.ORIGIN, a2);
         PlanChecker.from(connectContext, a)
                 .setMaxInvokeTimesPerRule(1000)
                 .applyBottomUp(
@@ -479,8 +480,8 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void a2bcd() {
         LogicalOlapScan scan = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, scan);
-        LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new 
LogicalLimit<>(10, 0, limit5);
+        LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, 
LimitPhase.ORIGIN, scan);
+        LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new 
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -507,7 +508,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void ab2a() {
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
student);
+        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, student);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -531,7 +532,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void ab2NewA() {
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
student);
+        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, student);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -555,7 +556,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void ab2GroupB() {
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
student);
+        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, student);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -577,7 +578,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void ab2PlanB() {
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
student);
+        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, student);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -599,7 +600,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void ab2c() {
         UnboundRelation relation = new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
-        LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, 
relation);
+        LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, relation);
 
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
         PlanChecker.from(connectContext, limit10)
@@ -622,10 +623,10 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void ab2cd() {
         UnboundRelation relation = new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
-        LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, 
relation);
+        LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, relation);
 
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, 
student);
+        LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, 
LimitPhase.ORIGIN, student);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -650,8 +651,8 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void ab2cb() {
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
student);
-        LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, 
student);
+        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, student);
+        LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, 
LimitPhase.ORIGIN, student);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -681,7 +682,7 @@ class MemoTest implements MemoPatternMatchSupported {
         Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
 
             LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-            LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
student);
+            LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, student);
 
             PlanChecker.from(connectContext, limit10)
                     .setMaxInvokeTimesPerRule(1000)
@@ -707,8 +708,8 @@ class MemoTest implements MemoPatternMatchSupported {
         Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
             UnboundRelation student = new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
 
-            LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, 
student);
-            LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new 
LogicalLimit<>(10, 0, limit5);
+            LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, 
LimitPhase.ORIGIN, student);
+            LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new 
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
 
             PlanChecker.from(connectContext, limit10)
                     .applyBottomUp(
@@ -733,11 +734,11 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void ab2cde() {
         UnboundRelation student = new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
-        LogicalLimit<UnboundRelation> limit3 = new LogicalLimit<>(3, 0, 
student);
+        LogicalLimit<UnboundRelation> limit3 = new LogicalLimit<>(3, 0, 
LimitPhase.ORIGIN, student);
 
         LogicalOlapScan scan = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, scan);
-        LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new 
LogicalLimit<>(10, 0, limit5);
+        LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, 
LimitPhase.ORIGIN, scan);
+        LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new 
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
 
         PlanChecker.from(connectContext, limit3)
                 .applyBottomUp(
@@ -766,8 +767,8 @@ class MemoTest implements MemoPatternMatchSupported {
     public void abc2bac() {
         UnboundRelation student = new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
 
-        LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, 
student);
-        LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new 
LogicalLimit<>(10, 0, limit5);
+        LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, 
LimitPhase.ORIGIN, student);
+        LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new 
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -805,8 +806,8 @@ class MemoTest implements MemoPatternMatchSupported {
     public void abc2bc() {
         UnboundRelation student = new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
 
-        LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, 
student);
-        LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new 
LogicalLimit<>(10, 0, limit5);
+        LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, 
LimitPhase.ORIGIN, student);
+        LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new 
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -829,7 +830,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void testRewriteBottomPlanToOnePlan() {
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, 
student);
+        LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, 
LimitPhase.ORIGIN, student);
 
         LogicalOlapScan score = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
 
@@ -848,10 +849,10 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void testRewriteBottomPlanToMultiPlan() {
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
student);
+        LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, student);
 
         LogicalOlapScan score = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
-        LogicalLimit<LogicalOlapScan> limit1 = new LogicalLimit<>(1, 0, score);
+        LogicalLimit<LogicalOlapScan> limit1 = new LogicalLimit<>(1, 0, 
LimitPhase.ORIGIN, score);
 
         PlanChecker.from(connectContext, limit10)
                 .applyBottomUp(
@@ -892,7 +893,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void testRecomputeLogicalProperties() {
         UnboundRelation unboundTable = new 
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("score"));
-        LogicalLimit<UnboundRelation> unboundLimit = new LogicalLimit<>(1, 0, 
unboundTable);
+        LogicalLimit<UnboundRelation> unboundLimit = new LogicalLimit<>(1, 0, 
LimitPhase.ORIGIN, unboundTable);
 
         LogicalOlapScan boundTable = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
         LogicalLimit<Plan> boundLimit = 
unboundLimit.withChildren(ImmutableList.of(boundTable));
@@ -924,7 +925,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void testEliminateRootWithChildGroupInTwoLevels() {
         LogicalOlapScan scan = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
-        LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, scan);
+        LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, 
scan);
 
         PlanChecker.from(connectContext, limit)
                 .applyBottomUp(logicalLimit().then(LogicalLimit::child))
@@ -936,7 +937,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void testEliminateRootWithChildPlanInTwoLevels() {
         LogicalOlapScan scan = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
-        LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, scan);
+        LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, 
scan);
 
         PlanChecker.from(connectContext, limit)
                 .applyBottomUp(logicalLimit(any()).then(LogicalLimit::child))
@@ -948,7 +949,7 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void testEliminateTwoLevelsToOnePlan() {
         LogicalOlapScan score = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
-        LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, score);
+        LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, 
score);
 
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
 
@@ -968,10 +969,10 @@ class MemoTest implements MemoPatternMatchSupported {
     @Test
     public void testEliminateTwoLevelsToTwoPlans() {
         LogicalOlapScan score = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
-        LogicalLimit<Plan> limit1 = new LogicalLimit<>(1, 0, score);
+        LogicalLimit<Plan> limit1 = new LogicalLimit<>(1, 0, 
LimitPhase.ORIGIN, score);
 
         LogicalOlapScan student = new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
-        LogicalLimit<Plan> limit10 = new LogicalLimit<>(10, 0, student);
+        LogicalLimit<Plan> limit10 = new LogicalLimit<>(10, 0, 
LimitPhase.ORIGIN, student);
 
         PlanChecker.from(connectContext, limit1)
                 .applyBottomUp(logicalLimit(any()).when(limit1::equals).then(l 
-> limit10))
@@ -998,7 +999,7 @@ class MemoTest implements MemoPatternMatchSupported {
     public void test() {
         PlanChecker.from(MemoTestUtils.createConnectContext())
                 .analyze(new LogicalLimit<>(10, 0,
-                        new LogicalJoin<>(JoinType.LEFT_OUTER_JOIN,
+                        LimitPhase.ORIGIN, new 
LogicalJoin<>(JoinType.LEFT_OUTER_JOIN,
                                 ImmutableList.of(new EqualTo(new 
UnboundSlot("sid"), new UnboundSlot("id"))),
                                 new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score),
                                 new 
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student)
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
index 9cd2888b38..6d7b2413ff 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
@@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.AggPhase;
 import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinHint;
 import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.SortPhase;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
@@ -385,6 +386,7 @@ public class ChildOutputPropertyDeriverTest {
     public void testTopN() {
         SlotReference key = new SlotReference("col1", IntegerType.INSTANCE);
         List<OrderKey> orderKeys = Lists.newArrayList(new OrderKey(key, true, 
true));
+        // localSort require any
         PhysicalTopN<GroupPlan> sort = new PhysicalTopN<>(orderKeys, 10, 10, 
SortPhase.LOCAL_SORT, logicalProperties, groupPlan);
         GroupExpression groupExpression = new GroupExpression(sort);
         PhysicalProperties child = new 
PhysicalProperties(DistributionSpecReplicated.INSTANCE,
@@ -394,6 +396,17 @@ public class ChildOutputPropertyDeriverTest {
         ChildOutputPropertyDeriver deriver = new 
ChildOutputPropertyDeriver(Lists.newArrayList(child));
         PhysicalProperties result = 
deriver.getOutputProperties(groupExpression);
         Assertions.assertEquals(orderKeys, 
result.getOrderSpec().getOrderKeys());
+        Assertions.assertEquals(DistributionSpecReplicated.INSTANCE, 
result.getDistributionSpec());
+        // merge/gather sort requires gather
+        sort = new PhysicalTopN<>(orderKeys, 10, 10, SortPhase.MERGE_SORT, 
logicalProperties, groupPlan);
+        groupExpression = new GroupExpression(sort);
+        child = new PhysicalProperties(DistributionSpecReplicated.INSTANCE,
+                new OrderSpec(Lists.newArrayList(
+                        new OrderKey(new SlotReference("ignored", 
IntegerType.INSTANCE), true, true))));
+
+        deriver = new ChildOutputPropertyDeriver(Lists.newArrayList(child));
+        result = deriver.getOutputProperties(groupExpression);
+        Assertions.assertEquals(orderKeys, 
result.getOrderSpec().getOrderKeys());
         Assertions.assertEquals(DistributionSpecGather.INSTANCE, 
result.getDistributionSpec());
     }
 
@@ -401,7 +414,7 @@ public class ChildOutputPropertyDeriverTest {
     public void testLimit() {
         SlotReference key = new SlotReference("col1", IntegerType.INSTANCE);
         List<OrderKey> orderKeys = Lists.newArrayList(new OrderKey(key, true, 
true));
-        PhysicalLimit<GroupPlan> limit = new PhysicalLimit<>(10, 10, 
logicalProperties, groupPlan);
+        PhysicalLimit<GroupPlan> limit = new PhysicalLimit<>(10, 10, 
LimitPhase.ORIGIN, logicalProperties, groupPlan);
         GroupExpression groupExpression = new GroupExpression(limit);
         PhysicalProperties child = new 
PhysicalProperties(DistributionSpecReplicated.INSTANCE,
                 new OrderSpec(orderKeys));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
index 1e7f72dd2e..10dfea7032 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@@ -108,7 +109,7 @@ public class ImplementationTest {
     public void toPhysicalLimitTest() {
         int limit = 10;
         int offset = 100;
-        LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(limit, 
offset, groupPlan);
+        LogicalLimit<? extends Plan> logicalLimit = new LogicalLimit<>(limit, 
offset, LimitPhase.LOCAL, groupPlan);
         PhysicalPlan physicalPlan = executeImplementationRule(logicalLimit);
         Assertions.assertEquals(PlanType.PHYSICAL_LIMIT, 
physicalPlan.getType());
         PhysicalLimit<GroupPlan> physicalLimit = (PhysicalLimit<GroupPlan>) 
physicalPlan;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
index 77cb6df00f..44a88ac0c8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
@@ -18,12 +18,17 @@
 package org.apache.doris.nereids.rules.rewrite.logical;
 
 import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
 import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.nereids.util.PlanConstructor;
 
 import com.google.common.collect.Lists;
@@ -31,6 +36,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * MergeConsecutiveFilter ut
@@ -39,7 +45,7 @@ public class EliminateLimitTest {
     @Test
     public void testEliminateLimit() {
         LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
-        LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, scan);
+        LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, 
LimitPhase.ORIGIN, scan);
 
         CascadesContext cascadesContext = 
MemoTestUtils.createCascadesContext(limit);
         List<Rule> rules = Lists.newArrayList(new EliminateLimit().build());
@@ -48,4 +54,17 @@ public class EliminateLimitTest {
         Plan actual = cascadesContext.getMemo().copyOut();
         Assertions.assertTrue(actual instanceof LogicalEmptyRelation);
     }
+
+    @Test
+    public void testLimitSort() {
+        LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
+        LogicalLimit limit = new LogicalLimit<>(1, 1, LimitPhase.ORIGIN,
+                new LogicalSort<>(scan.getOutput().stream().map(c -> new 
OrderKey(c, true, true)).collect(Collectors.toList()),
+                        scan));
+
+        Plan actual = PlanChecker.from(MemoTestUtils.createConnectContext(), 
limit)
+                .rewrite()
+                .getPlan();
+        Assertions.assertTrue(actual instanceof LogicalTopN);
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java
index fa7270def9..869dec982f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.rules.rewrite.logical;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.analyzer.UnboundRelation;
 import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 import org.apache.doris.nereids.trees.plans.logical.RelationUtil;
 import org.apache.doris.nereids.util.MemoTestUtils;
@@ -33,10 +34,10 @@ import java.util.List;
 public class MergeLimitsTest {
     @Test
     public void testMergeConsecutiveLimits() {
-        LogicalLimit limit3 = new LogicalLimit<>(3, 5, new UnboundRelation(
+        LogicalLimit limit3 = new LogicalLimit<>(3, 5, LimitPhase.ORIGIN, new 
UnboundRelation(
                 RelationUtil.newRelationId(), Lists.newArrayList("db", "t")));
-        LogicalLimit limit2 = new LogicalLimit<>(2, 0, limit3);
-        LogicalLimit limit1 = new LogicalLimit<>(10, 0, limit2);
+        LogicalLimit limit2 = new LogicalLimit<>(2, 0, LimitPhase.ORIGIN, 
limit3);
+        LogicalLimit limit1 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, 
limit2);
 
         CascadesContext context = MemoTestUtils.createCascadesContext(limit1);
         List<Rule> rules = Lists.newArrayList(new MergeLimits().build());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java
index ea1fc58942..57e38b32d1 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.pattern.PatternDescriptor;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@@ -204,7 +205,6 @@ class PushdownLimitTest extends TestWithFeService 
implements MemoPatternMatchSup
                                     // plan among fragments has duplicate 
elements.
                                     (s1, s2) -> s1)
                             );
-
                     // limit is push down to left scan of `t1`.
                     Assertions.assertEquals(2, nameToScan.size());
                     Assertions.assertEquals(5, 
nameToScan.get("t1").getLimit());
@@ -212,6 +212,14 @@ class PushdownLimitTest extends TestWithFeService 
implements MemoPatternMatchSup
         );
     }
 
+    @Test
+    public void testLimitPushSort() {
+        PlanChecker.from(connectContext)
+                .analyze("select k1 from t1 order by k1 limit 1")
+                .rewrite()
+                .matches(logicalTopN());
+    }
+
     @Test
     public void testLimitPushUnion() {
         PlanChecker.from(connectContext)
@@ -229,8 +237,10 @@ class PushdownLimitTest extends TestWithFeService 
implements MemoPatternMatchSup
                                         logicalOlapScan().when(scan -> 
"t2".equals(scan.getTable().getName()))
                                 ),
                                 logicalLimit(
-                                        logicalProject(
-                                                logicalOlapScan().when(scan -> 
"t3".equals(scan.getTable().getName()))
+                                        logicalLimit(
+                                            logicalProject(
+                                                    
logicalOlapScan().when(scan -> "t3".equals(scan.getTable().getName()))
+                                            )
                                         )
                                 )
                         )
@@ -261,12 +271,12 @@ class PushdownLimitTest extends TestWithFeService 
implements MemoPatternMatchSup
 
         if (hasProject) {
             // return limit -> project -> join
-            return new LogicalLimit<>(10, 0, new LogicalProject<>(
+            return new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, new 
LogicalProject<>(
                     ImmutableList.of(new UnboundSlot("sid"), new 
UnboundSlot("id")),
                     join));
         } else {
             // return limit -> join
-            return new LogicalLimit<>(10, 0, join);
+            return new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, join);
         }
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java
similarity index 57%
copy from 
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
copy to 
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java
index 77cb6df00f..174f5a90b4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java
@@ -17,35 +17,25 @@
 
 package org.apache.doris.nereids.rules.rewrite.logical;
 
-import org.apache.doris.nereids.CascadesContext;
-import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.nereids.util.PlanConstructor;
 
-import com.google.common.collect.Lists;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.util.List;
+public class SplitLimitTest {
+    private final LogicalOlapScan scan1 = 
PlanConstructor.newLogicalOlapScan(0, "t1", 0);
 
-/**
- * MergeConsecutiveFilter ut
- */
-public class EliminateLimitTest {
     @Test
-    public void testEliminateLimit() {
-        LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
-        LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, scan);
-
-        CascadesContext cascadesContext = 
MemoTestUtils.createCascadesContext(limit);
-        List<Rule> rules = Lists.newArrayList(new EliminateLimit().build());
-        cascadesContext.topDownRewrite(rules);
-
-        Plan actual = cascadesContext.getMemo().copyOut();
-        Assertions.assertTrue(actual instanceof LogicalEmptyRelation);
+    void testSplitLimit() {
+        Plan plan = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, scan1);
+        plan = PlanChecker.from(MemoTestUtils.createConnectContext(), plan)
+                .rewrite()
+                .getPlan();
+        plan.anyMatch(x -> x instanceof LogicalLimit && ((LogicalLimit<?>) 
x).isSplit());
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
index e931957be2..94f6a07d33 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
@@ -28,6 +28,8 @@ import 
org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
 import org.apache.doris.nereids.trees.plans.FakePlan;
 import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -279,7 +281,9 @@ public class StatsCalculatorTest {
         GroupPlan groupPlan = new GroupPlan(childGroup);
         childGroup.setStatistics(childStats);
 
-        LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(1, 2, 
groupPlan);
+        LogicalLimit<? extends Plan> logicalLimit = new LogicalLimit<>(1, 2,
+                LimitPhase.GLOBAL, new LogicalLimit<>(1, 2, LimitPhase.LOCAL, 
groupPlan)
+        );
         GroupExpression groupExpression = new GroupExpression(logicalLimit, 
ImmutableList.of(childGroup));
         Group ownerGroup = newGroup();
         ownerGroup.addGroupExpression(groupExpression);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java
index aed7bbbbd5..be62fc278b 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java
@@ -46,7 +46,7 @@ public class PlanToStringTest {
 
     @Test
     public void testLogicalLimit(@Mocked Plan child) {
-        LogicalLimit<Plan> plan = new LogicalLimit<>(0, 0, child);
+        LogicalLimit<Plan> plan = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, 
child);
 
         Assertions.assertEquals("LogicalLimit ( limit=0, offset=0 )", 
plan.toString());
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java
index c719302ac5..2bca62bab1 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.plans.JoinHint;
 import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@@ -122,7 +123,7 @@ public class LogicalPlanBuilder {
     }
 
     public LogicalPlanBuilder limit(long limit, long offset) {
-        LogicalLimit<LogicalPlan> limitPlan = new LogicalLimit<>(limit, 
offset, this.plan);
+        LogicalLimit<LogicalPlan> limitPlan = new LogicalLimit<>(limit, 
offset, LimitPhase.ORIGIN, this.plan);
         return from(limitPlan);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to