morrySnow commented on code in PR #23870:
URL: https://github.com/apache/doris/pull/23870#discussion_r1322286143


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -338,8 +338,15 @@ public PhysicalProperties 
visitPhysicalRepeat(PhysicalRepeat<? extends Plan> rep
     public PhysicalProperties 
visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
             PlanContext context) {
         Preconditions.checkState(childrenOutputProperties.size() == 1);
-        PhysicalProperties childOutputProperty = 
childrenOutputProperties.get(0);
-        return new 
PhysicalProperties(childOutputProperty.getDistributionSpec());
+        if (partitionTopN.getPhase().isLocal()) {
+            return new PhysicalProperties(
+                    childrenOutputProperties.get(0).getDistributionSpec());
+        } else {
+            Preconditions.checkState(partitionTopN.getPhase().isGlobal());
+            DistributionSpec spec = 
PhysicalProperties.createHash(partitionTopN.getPartitionKeys(),
+                    ShuffleType.REQUIRE).getDistributionSpec();

Review Comment:
   return shuffle type should same with child



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -242,6 +243,19 @@ public Void 
visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort,
         return null;
     }
 
+    @Override
+    public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends 
Plan> partitionTopN, PlanContext context) {
+        if (partitionTopN.getPhase().isGlobal()) {
+            PhysicalProperties properties = 
PhysicalProperties.createHash(partitionTopN.getPartitionKeys(),
+                    ShuffleType.REQUIRE);
+            //addRequestPropertyToChildren(PhysicalProperties.GATHER);

Review Comment:
   remove this line



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Implementation rule that convert logical partition-top-n to physical 
partition-top-n.
  */
 public class LogicalPartitionTopNToPhysicalPartitionTopN extends 
OneImplementationRuleFactory {
     @Override
     public Rule build() {
-        return logicalPartitionTopN().then(partitionTopN -> {
-            List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
-                    ? partitionTopN.getOrderKeys().stream()
-                        .map(OrderExpression::getOrderKey)
-                        .collect(ImmutableList.toImmutableList()) :
-                    ImmutableList.of();
-
-            return new PhysicalPartitionTopN<>(
-                    partitionTopN.getFunction(),
-                    partitionTopN.getPartitionKeys(),
-                    orderKeys,
-                    partitionTopN.hasGlobalLimit(),
-                    partitionTopN.getPartitionLimit(),
-                    partitionTopN.getLogicalProperties(),
-                    partitionTopN.child());
-        
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+        return logicalPartitionTopN().thenApplyMulti(ctx -> 
twoPhasePartitionTopn(ctx.root))

Review Comment:
   if `twoPhasePartitionTopn` only return one candidate, just use `thenApply` 
instead of `thenApplyMulti`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java:
##########
@@ -48,12 +49,13 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> 
extends PhysicalUnar
     private final List<OrderKey> orderKeys;
     private final Boolean hasGlobalLimit;
     private final long partitionLimit;
+    private final PartitionTopnPhase phase;
 
     public PhysicalPartitionTopN(WindowFuncType function, List<Expression> 
partitionKeys, List<OrderKey> orderKeys,
                                  Boolean hasGlobalLimit, long partitionLimit,
-                                 LogicalProperties logicalProperties, 
CHILD_TYPE child) {
+                                 LogicalProperties logicalProperties, 
PartitionTopnPhase phase, CHILD_TYPE child) {

Review Comment:
   `phase` should at the front of `logicalProperties `



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -338,8 +338,15 @@ public PhysicalProperties 
visitPhysicalRepeat(PhysicalRepeat<? extends Plan> rep
     public PhysicalProperties 
visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
             PlanContext context) {
         Preconditions.checkState(childrenOutputProperties.size() == 1);
-        PhysicalProperties childOutputProperty = 
childrenOutputProperties.get(0);
-        return new 
PhysicalProperties(childOutputProperty.getDistributionSpec());
+        if (partitionTopN.getPhase().isLocal()) {
+            return new PhysicalProperties(
+                    childrenOutputProperties.get(0).getDistributionSpec());
+        } else {
+            Preconditions.checkState(partitionTopN.getPhase().isGlobal());

Review Comment:
   add check msg



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Implementation rule that convert logical partition-top-n to physical 
partition-top-n.
  */
 public class LogicalPartitionTopNToPhysicalPartitionTopN extends 
OneImplementationRuleFactory {
     @Override
     public Rule build() {
-        return logicalPartitionTopN().then(partitionTopN -> {
-            List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
-                    ? partitionTopN.getOrderKeys().stream()
-                        .map(OrderExpression::getOrderKey)
-                        .collect(ImmutableList.toImmutableList()) :
-                    ImmutableList.of();
-
-            return new PhysicalPartitionTopN<>(
-                    partitionTopN.getFunction(),
-                    partitionTopN.getPartitionKeys(),
-                    orderKeys,
-                    partitionTopN.hasGlobalLimit(),
-                    partitionTopN.getPartitionLimit(),
-                    partitionTopN.getLogicalProperties(),
-                    partitionTopN.child());
-        
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+        return logicalPartitionTopN().thenApplyMulti(ctx -> 
twoPhasePartitionTopn(ctx.root))
+                
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+    }
+
+    private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn(
+            LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
+        // for partition topn, the order keys will be set as original 
partition keys combined with
+        // orderby keys, to meet upper window operator's order requirement.
+        List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN);
+
+        PhysicalPartitionTopN<Plan> localPartitionTopN = new 
PhysicalPartitionTopN<>(
+                logicalPartitionTopN.getFunction(),
+                logicalPartitionTopN.getPartitionKeys(),
+                orderKeys,
+                logicalPartitionTopN.hasGlobalLimit(),
+                logicalPartitionTopN.getPartitionLimit(),
+                logicalPartitionTopN.getLogicalProperties(),
+                PartitionTopnPhase.LOCAL_PTOPN,
+                logicalPartitionTopN.child(0));
+
+        PhysicalPartitionTopN<Plan> globalPartitionTopN = new 
PhysicalPartitionTopN<>(
+                logicalPartitionTopN.getFunction(),
+                logicalPartitionTopN.getPartitionKeys(),
+                orderKeys,
+                logicalPartitionTopN.hasGlobalLimit(),
+                logicalPartitionTopN.getPartitionLimit(),
+                logicalPartitionTopN.getLogicalProperties(),
+                PartitionTopnPhase.GLOBAL_PTOPN,
+                localPartitionTopN);
+
+        return Lists.newArrayList(globalPartitionTopN);
+    }
+
+    private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends 
Plan> logicalPartitionTopN) {
+        List<OrderKey> allOrderKeys = Lists.newArrayList();
+        if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) {
+            
allOrderKeys.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey
 -> {
+                return new OrderKey(partitionKey, true, false);
+            }).collect(Collectors.toList()));
+        }
+        if (!logicalPartitionTopN.getOrderKeys().isEmpty()) {
+            allOrderKeys.addAll(logicalPartitionTopN.getOrderKeys().stream()
+                    .map(OrderExpression::getOrderKey)
+                    .collect(Collectors.toList())

Review Comment:
   ```suggestion
                       .collect(ImmutableList.toImmutableList())
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Implementation rule that convert logical partition-top-n to physical 
partition-top-n.
  */
 public class LogicalPartitionTopNToPhysicalPartitionTopN extends 
OneImplementationRuleFactory {
     @Override
     public Rule build() {
-        return logicalPartitionTopN().then(partitionTopN -> {
-            List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
-                    ? partitionTopN.getOrderKeys().stream()
-                        .map(OrderExpression::getOrderKey)
-                        .collect(ImmutableList.toImmutableList()) :
-                    ImmutableList.of();
-
-            return new PhysicalPartitionTopN<>(
-                    partitionTopN.getFunction(),
-                    partitionTopN.getPartitionKeys(),
-                    orderKeys,
-                    partitionTopN.hasGlobalLimit(),
-                    partitionTopN.getPartitionLimit(),
-                    partitionTopN.getLogicalProperties(),
-                    partitionTopN.child());
-        
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+        return logicalPartitionTopN().thenApplyMulti(ctx -> 
twoPhasePartitionTopn(ctx.root))
+                
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+    }
+
+    private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn(
+            LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
+        // for partition topn, the order keys will be set as original 
partition keys combined with
+        // orderby keys, to meet upper window operator's order requirement.
+        List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN);
+
+        PhysicalPartitionTopN<Plan> localPartitionTopN = new 
PhysicalPartitionTopN<>(
+                logicalPartitionTopN.getFunction(),
+                logicalPartitionTopN.getPartitionKeys(),
+                orderKeys,
+                logicalPartitionTopN.hasGlobalLimit(),
+                logicalPartitionTopN.getPartitionLimit(),
+                logicalPartitionTopN.getLogicalProperties(),
+                PartitionTopnPhase.LOCAL_PTOPN,
+                logicalPartitionTopN.child(0));
+
+        PhysicalPartitionTopN<Plan> globalPartitionTopN = new 
PhysicalPartitionTopN<>(
+                logicalPartitionTopN.getFunction(),
+                logicalPartitionTopN.getPartitionKeys(),
+                orderKeys,
+                logicalPartitionTopN.hasGlobalLimit(),
+                logicalPartitionTopN.getPartitionLimit(),
+                logicalPartitionTopN.getLogicalProperties(),
+                PartitionTopnPhase.GLOBAL_PTOPN,
+                localPartitionTopN);
+
+        return Lists.newArrayList(globalPartitionTopN);
+    }
+
+    private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends 
Plan> logicalPartitionTopN) {
+        List<OrderKey> allOrderKeys = Lists.newArrayList();
+        if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) {
+            
allOrderKeys.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey
 -> {
+                return new OrderKey(partitionKey, true, false);
+            }).collect(Collectors.toList()));

Review Comment:
   ```suggestion
               }).collect(ImmutableList.toImmutableList());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java:
##########
@@ -77,14 +80,16 @@ public PhysicalPartitionTopN(WindowFuncType function, 
List<Expression> partition
     public PhysicalPartitionTopN(WindowFuncType function, List<Expression> 
partitionKeys, List<OrderKey> orderKeys,
                                  Boolean hasGlobalLimit, long partitionLimit,
                                  Optional<GroupExpression> groupExpression, 
LogicalProperties logicalProperties,
-                                 PhysicalProperties physicalProperties, 
Statistics statistics, CHILD_TYPE child) {
+                                 PhysicalProperties physicalProperties, 
Statistics statistics,
+                                 PartitionTopnPhase phase, CHILD_TYPE child) {

Review Comment:
   `phase` should at the front of `groupExpression`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Implementation rule that convert logical partition-top-n to physical 
partition-top-n.
  */
 public class LogicalPartitionTopNToPhysicalPartitionTopN extends 
OneImplementationRuleFactory {
     @Override
     public Rule build() {
-        return logicalPartitionTopN().then(partitionTopN -> {
-            List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
-                    ? partitionTopN.getOrderKeys().stream()
-                        .map(OrderExpression::getOrderKey)
-                        .collect(ImmutableList.toImmutableList()) :
-                    ImmutableList.of();
-
-            return new PhysicalPartitionTopN<>(
-                    partitionTopN.getFunction(),
-                    partitionTopN.getPartitionKeys(),
-                    orderKeys,
-                    partitionTopN.hasGlobalLimit(),
-                    partitionTopN.getPartitionLimit(),
-                    partitionTopN.getLogicalProperties(),
-                    partitionTopN.child());
-        
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+        return logicalPartitionTopN().thenApplyMulti(ctx -> 
twoPhasePartitionTopn(ctx.root))
+                
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+    }
+
+    private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn(
+            LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
+        // for partition topn, the order keys will be set as original 
partition keys combined with
+        // orderby keys, to meet upper window operator's order requirement.
+        List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN);
+
+        PhysicalPartitionTopN<Plan> localPartitionTopN = new 
PhysicalPartitionTopN<>(
+                logicalPartitionTopN.getFunction(),
+                logicalPartitionTopN.getPartitionKeys(),
+                orderKeys,
+                logicalPartitionTopN.hasGlobalLimit(),
+                logicalPartitionTopN.getPartitionLimit(),
+                logicalPartitionTopN.getLogicalProperties(),
+                PartitionTopnPhase.LOCAL_PTOPN,
+                logicalPartitionTopN.child(0));
+
+        PhysicalPartitionTopN<Plan> globalPartitionTopN = new 
PhysicalPartitionTopN<>(
+                logicalPartitionTopN.getFunction(),
+                logicalPartitionTopN.getPartitionKeys(),
+                orderKeys,
+                logicalPartitionTopN.hasGlobalLimit(),
+                logicalPartitionTopN.getPartitionLimit(),
+                logicalPartitionTopN.getLogicalProperties(),
+                PartitionTopnPhase.GLOBAL_PTOPN,
+                localPartitionTopN);
+
+        return Lists.newArrayList(globalPartitionTopN);

Review Comment:
   use ImmutableList.of();



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Implementation rule that convert logical partition-top-n to physical 
partition-top-n.
  */
 public class LogicalPartitionTopNToPhysicalPartitionTopN extends 
OneImplementationRuleFactory {
     @Override
     public Rule build() {
-        return logicalPartitionTopN().then(partitionTopN -> {
-            List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
-                    ? partitionTopN.getOrderKeys().stream()
-                        .map(OrderExpression::getOrderKey)
-                        .collect(ImmutableList.toImmutableList()) :
-                    ImmutableList.of();
-
-            return new PhysicalPartitionTopN<>(
-                    partitionTopN.getFunction(),
-                    partitionTopN.getPartitionKeys(),
-                    orderKeys,
-                    partitionTopN.hasGlobalLimit(),
-                    partitionTopN.getPartitionLimit(),
-                    partitionTopN.getLogicalProperties(),
-                    partitionTopN.child());
-        
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+        return logicalPartitionTopN().thenApplyMulti(ctx -> 
twoPhasePartitionTopn(ctx.root))
+                
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+    }
+
+    private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn(
+            LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
+        // for partition topn, the order keys will be set as original 
partition keys combined with
+        // orderby keys, to meet upper window operator's order requirement.
+        List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN);
+
+        PhysicalPartitionTopN<Plan> localPartitionTopN = new 
PhysicalPartitionTopN<>(
+                logicalPartitionTopN.getFunction(),
+                logicalPartitionTopN.getPartitionKeys(),
+                orderKeys,
+                logicalPartitionTopN.hasGlobalLimit(),
+                logicalPartitionTopN.getPartitionLimit(),
+                logicalPartitionTopN.getLogicalProperties(),
+                PartitionTopnPhase.LOCAL_PTOPN,
+                logicalPartitionTopN.child(0));
+
+        PhysicalPartitionTopN<Plan> globalPartitionTopN = new 
PhysicalPartitionTopN<>(
+                logicalPartitionTopN.getFunction(),
+                logicalPartitionTopN.getPartitionKeys(),
+                orderKeys,
+                logicalPartitionTopN.hasGlobalLimit(),
+                logicalPartitionTopN.getPartitionLimit(),
+                logicalPartitionTopN.getLogicalProperties(),
+                PartitionTopnPhase.GLOBAL_PTOPN,
+                localPartitionTopN);
+
+        return Lists.newArrayList(globalPartitionTopN);
+    }
+
+    private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends 
Plan> logicalPartitionTopN) {
+        List<OrderKey> allOrderKeys = Lists.newArrayList();

Review Comment:
   use `ImmutableList.Builder<OrderKey>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to