morrySnow commented on code in PR #23870: URL: https://github.com/apache/doris/pull/23870#discussion_r1322286143
########## fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java: ########## @@ -338,8 +338,15 @@ public PhysicalProperties visitPhysicalRepeat(PhysicalRepeat<? extends Plan> rep public PhysicalProperties visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) { Preconditions.checkState(childrenOutputProperties.size() == 1); - PhysicalProperties childOutputProperty = childrenOutputProperties.get(0); - return new PhysicalProperties(childOutputProperty.getDistributionSpec()); + if (partitionTopN.getPhase().isLocal()) { + return new PhysicalProperties( + childrenOutputProperties.get(0).getDistributionSpec()); + } else { + Preconditions.checkState(partitionTopN.getPhase().isGlobal()); + DistributionSpec spec = PhysicalProperties.createHash(partitionTopN.getPartitionKeys(), + ShuffleType.REQUIRE).getDistributionSpec(); Review Comment: return shuffle type should same with child ########## fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java: ########## @@ -242,6 +243,19 @@ public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, return null; } + @Override + public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) { + if (partitionTopN.getPhase().isGlobal()) { + PhysicalProperties properties = PhysicalProperties.createHash(partitionTopN.getPartitionKeys(), + ShuffleType.REQUIRE); + //addRequestPropertyToChildren(PhysicalProperties.GATHER); Review Comment: remove this line ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java: ########## @@ -21,33 +21,68 @@ import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.List; +import java.util.stream.Collectors; /** * Implementation rule that convert logical partition-top-n to physical partition-top-n. */ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory { @Override public Rule build() { - return logicalPartitionTopN().then(partitionTopN -> { - List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty() - ? partitionTopN.getOrderKeys().stream() - .map(OrderExpression::getOrderKey) - .collect(ImmutableList.toImmutableList()) : - ImmutableList.of(); - - return new PhysicalPartitionTopN<>( - partitionTopN.getFunction(), - partitionTopN.getPartitionKeys(), - orderKeys, - partitionTopN.hasGlobalLimit(), - partitionTopN.getPartitionLimit(), - partitionTopN.getLogicalProperties(), - partitionTopN.child()); - }).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + return logicalPartitionTopN().thenApplyMulti(ctx -> twoPhasePartitionTopn(ctx.root)) Review Comment: if `twoPhasePartitionTopn` only return one candidate, just use `thenApply` instead of `thenApplyMulti` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java: ########## @@ -48,12 +49,13 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar private final List<OrderKey> orderKeys; private final Boolean hasGlobalLimit; private final long partitionLimit; + private final PartitionTopnPhase phase; public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys, Boolean hasGlobalLimit, long partitionLimit, - LogicalProperties logicalProperties, CHILD_TYPE child) { + LogicalProperties logicalProperties, PartitionTopnPhase phase, CHILD_TYPE child) { Review Comment: `phase` should at the front of `logicalProperties ` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java: ########## @@ -338,8 +338,15 @@ public PhysicalProperties visitPhysicalRepeat(PhysicalRepeat<? extends Plan> rep public PhysicalProperties visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) { Preconditions.checkState(childrenOutputProperties.size() == 1); - PhysicalProperties childOutputProperty = childrenOutputProperties.get(0); - return new PhysicalProperties(childOutputProperty.getDistributionSpec()); + if (partitionTopN.getPhase().isLocal()) { + return new PhysicalProperties( + childrenOutputProperties.get(0).getDistributionSpec()); + } else { + Preconditions.checkState(partitionTopN.getPhase().isGlobal()); Review Comment: add check msg ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java: ########## @@ -21,33 +21,68 @@ import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.List; +import java.util.stream.Collectors; /** * Implementation rule that convert logical partition-top-n to physical partition-top-n. */ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory { @Override public Rule build() { - return logicalPartitionTopN().then(partitionTopN -> { - List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty() - ? partitionTopN.getOrderKeys().stream() - .map(OrderExpression::getOrderKey) - .collect(ImmutableList.toImmutableList()) : - ImmutableList.of(); - - return new PhysicalPartitionTopN<>( - partitionTopN.getFunction(), - partitionTopN.getPartitionKeys(), - orderKeys, - partitionTopN.hasGlobalLimit(), - partitionTopN.getPartitionLimit(), - partitionTopN.getLogicalProperties(), - partitionTopN.child()); - }).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + return logicalPartitionTopN().thenApplyMulti(ctx -> twoPhasePartitionTopn(ctx.root)) + .toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + } + + private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn( + LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { + // for partition topn, the order keys will be set as original partition keys combined with + // orderby keys, to meet upper window operator's order requirement. + List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN); + + PhysicalPartitionTopN<Plan> localPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + orderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + logicalPartitionTopN.getLogicalProperties(), + PartitionTopnPhase.LOCAL_PTOPN, + logicalPartitionTopN.child(0)); + + PhysicalPartitionTopN<Plan> globalPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + orderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + logicalPartitionTopN.getLogicalProperties(), + PartitionTopnPhase.GLOBAL_PTOPN, + localPartitionTopN); + + return Lists.newArrayList(globalPartitionTopN); + } + + private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { + List<OrderKey> allOrderKeys = Lists.newArrayList(); + if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) { + allOrderKeys.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey -> { + return new OrderKey(partitionKey, true, false); + }).collect(Collectors.toList())); + } + if (!logicalPartitionTopN.getOrderKeys().isEmpty()) { + allOrderKeys.addAll(logicalPartitionTopN.getOrderKeys().stream() + .map(OrderExpression::getOrderKey) + .collect(Collectors.toList()) Review Comment: ```suggestion .collect(ImmutableList.toImmutableList()) ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java: ########## @@ -21,33 +21,68 @@ import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.List; +import java.util.stream.Collectors; /** * Implementation rule that convert logical partition-top-n to physical partition-top-n. */ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory { @Override public Rule build() { - return logicalPartitionTopN().then(partitionTopN -> { - List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty() - ? partitionTopN.getOrderKeys().stream() - .map(OrderExpression::getOrderKey) - .collect(ImmutableList.toImmutableList()) : - ImmutableList.of(); - - return new PhysicalPartitionTopN<>( - partitionTopN.getFunction(), - partitionTopN.getPartitionKeys(), - orderKeys, - partitionTopN.hasGlobalLimit(), - partitionTopN.getPartitionLimit(), - partitionTopN.getLogicalProperties(), - partitionTopN.child()); - }).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + return logicalPartitionTopN().thenApplyMulti(ctx -> twoPhasePartitionTopn(ctx.root)) + .toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + } + + private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn( + LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { + // for partition topn, the order keys will be set as original partition keys combined with + // orderby keys, to meet upper window operator's order requirement. + List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN); + + PhysicalPartitionTopN<Plan> localPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + orderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + logicalPartitionTopN.getLogicalProperties(), + PartitionTopnPhase.LOCAL_PTOPN, + logicalPartitionTopN.child(0)); + + PhysicalPartitionTopN<Plan> globalPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + orderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + logicalPartitionTopN.getLogicalProperties(), + PartitionTopnPhase.GLOBAL_PTOPN, + localPartitionTopN); + + return Lists.newArrayList(globalPartitionTopN); + } + + private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { + List<OrderKey> allOrderKeys = Lists.newArrayList(); + if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) { + allOrderKeys.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey -> { + return new OrderKey(partitionKey, true, false); + }).collect(Collectors.toList())); Review Comment: ```suggestion }).collect(ImmutableList.toImmutableList()); ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java: ########## @@ -77,14 +80,16 @@ public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partition public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys, Boolean hasGlobalLimit, long partitionLimit, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, - PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + PhysicalProperties physicalProperties, Statistics statistics, + PartitionTopnPhase phase, CHILD_TYPE child) { Review Comment: `phase` should at the front of `groupExpression` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java: ########## @@ -21,33 +21,68 @@ import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.List; +import java.util.stream.Collectors; /** * Implementation rule that convert logical partition-top-n to physical partition-top-n. */ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory { @Override public Rule build() { - return logicalPartitionTopN().then(partitionTopN -> { - List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty() - ? partitionTopN.getOrderKeys().stream() - .map(OrderExpression::getOrderKey) - .collect(ImmutableList.toImmutableList()) : - ImmutableList.of(); - - return new PhysicalPartitionTopN<>( - partitionTopN.getFunction(), - partitionTopN.getPartitionKeys(), - orderKeys, - partitionTopN.hasGlobalLimit(), - partitionTopN.getPartitionLimit(), - partitionTopN.getLogicalProperties(), - partitionTopN.child()); - }).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + return logicalPartitionTopN().thenApplyMulti(ctx -> twoPhasePartitionTopn(ctx.root)) + .toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + } + + private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn( + LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { + // for partition topn, the order keys will be set as original partition keys combined with + // orderby keys, to meet upper window operator's order requirement. + List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN); + + PhysicalPartitionTopN<Plan> localPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + orderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + logicalPartitionTopN.getLogicalProperties(), + PartitionTopnPhase.LOCAL_PTOPN, + logicalPartitionTopN.child(0)); + + PhysicalPartitionTopN<Plan> globalPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + orderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + logicalPartitionTopN.getLogicalProperties(), + PartitionTopnPhase.GLOBAL_PTOPN, + localPartitionTopN); + + return Lists.newArrayList(globalPartitionTopN); Review Comment: use ImmutableList.of(); ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java: ########## @@ -21,33 +21,68 @@ import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.List; +import java.util.stream.Collectors; /** * Implementation rule that convert logical partition-top-n to physical partition-top-n. */ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory { @Override public Rule build() { - return logicalPartitionTopN().then(partitionTopN -> { - List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty() - ? partitionTopN.getOrderKeys().stream() - .map(OrderExpression::getOrderKey) - .collect(ImmutableList.toImmutableList()) : - ImmutableList.of(); - - return new PhysicalPartitionTopN<>( - partitionTopN.getFunction(), - partitionTopN.getPartitionKeys(), - orderKeys, - partitionTopN.hasGlobalLimit(), - partitionTopN.getPartitionLimit(), - partitionTopN.getLogicalProperties(), - partitionTopN.child()); - }).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + return logicalPartitionTopN().thenApplyMulti(ctx -> twoPhasePartitionTopn(ctx.root)) + .toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + } + + private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn( + LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { + // for partition topn, the order keys will be set as original partition keys combined with + // orderby keys, to meet upper window operator's order requirement. + List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN); + + PhysicalPartitionTopN<Plan> localPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + orderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + logicalPartitionTopN.getLogicalProperties(), + PartitionTopnPhase.LOCAL_PTOPN, + logicalPartitionTopN.child(0)); + + PhysicalPartitionTopN<Plan> globalPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + orderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + logicalPartitionTopN.getLogicalProperties(), + PartitionTopnPhase.GLOBAL_PTOPN, + localPartitionTopN); + + return Lists.newArrayList(globalPartitionTopN); + } + + private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { + List<OrderKey> allOrderKeys = Lists.newArrayList(); Review Comment: use `ImmutableList.Builder<OrderKey>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org