morrySnow commented on code in PR #18784:
URL: https://github.com/apache/doris/pull/18784#discussion_r1201464942


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation rule that convert logical partition-top-n to physical 
partition-top-n.
+ */
+public class LogicalPartitionTopNToPhysicalPartitionTopN extends 
OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalPartitionTopN().then(partitionTopN -> {
+            List<OrderKey> orderKeys = Lists.newArrayList();
+            if (!partitionTopN.getOrderKeys().isEmpty()) {
+                orderKeys.addAll(partitionTopN.getOrderKeys().stream()
+                        .map(OrderExpression::getOrderKey)
+                        .collect(Collectors.toList())
+                );
+            }

Review Comment:
   use immutableList to avoid List copy in PhysicalPartitionTopn's constructor
   ```java
   List<OrderKey> orderKeys = partitionTopN.getOrkerkeys.stream()
           .map(...)
           .collect(ImmutableList.toImmutableList());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -620,6 +622,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_BUSHY_TREE, needForward = true)
     private boolean enableBushyTree = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN)
+    private boolean enablePartitionTopN = true;

Review Comment:
   set default value to false until we do merge be code and do sufficient test



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java:
##########
@@ -159,4 +165,62 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(windowExpressions, isChecked);
     }
+
+    /**
+     * pushPartitionLimitThroughWindow is used to push the partitionLimit 
through the window
+     * and generate the partitionTopN. If the window can not meet the 
requirement,
+     * it will return null. So when we use this function, we need check the 
null in the outside.
+     */
+    public static Plan pushPartitionLimitThroughWindow(LogicalWindow<Plan> 
window, long partitionLimit,
+                                                       boolean hasGlobalLimit) 
{
+        if 
(!ConnectContext.get().getSessionVariable().isEnablePartitionTopN()) {
+            return null;

Review Comment:
   return a Optional is better



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java:
##########
@@ -159,4 +165,62 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(windowExpressions, isChecked);
     }
+
+    /**
+     * pushPartitionLimitThroughWindow is used to push the partitionLimit 
through the window
+     * and generate the partitionTopN. If the window can not meet the 
requirement,
+     * it will return null. So when we use this function, we need check the 
null in the outside.
+     */
+    public static Plan pushPartitionLimitThroughWindow(LogicalWindow<Plan> 
window, long partitionLimit,

Review Comment:
   why not implenment as a member function?



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.NotImplementedException;
+import org.apache.doris.nereids.types.WindowFuncType;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TPartitionSortNode;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TSortInfo;
+import org.apache.doris.thrift.TopNAlgorithm;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * PartitionSortNode.
+ * PartitionSortNode is only used in the Nereids.
+ */
+public class PartitionSortNode extends PlanNode {
+    private static final Logger LOG = 
LogManager.getLogger(PartitionSortNode.class);
+    private List<Expr> resolvedTupleExprs;
+    private final WindowFuncType function;
+    private final List<Expr> partitionExprs;
+    private final SortInfo info;
+    private final boolean hasGlobalLimit;
+    private final long partitionLimit;
+
+    private boolean isUnusedExprRemoved = false;
+    private ArrayList<Boolean> nullabilityChangedFlags = Lists.newArrayList();
+
+    /**
+     * Constructor.
+     */
+    public PartitionSortNode(PlanNodeId id, PlanNode input, WindowFuncType 
function, List<Expr> partitionExprs,
+                             SortInfo info, boolean hasGlobalLimit, long 
partitionLimit) {
+        super(id, "PartitionTopN", StatisticalType.PARTITION_TOPN_MODE);
+        this.function = function;
+        this.partitionExprs = partitionExprs;
+        this.info = info;
+        this.hasGlobalLimit = hasGlobalLimit;
+        this.partitionLimit = partitionLimit;
+        
this.tupleIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId()));
+        
this.tblRefIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId()));
+        this.nullableTupleIds.addAll(input.getNullableTupleIds());
+        this.children.add(input);
+        Preconditions.checkArgument(info.getOrderingExprs().size() == 
info.getIsAscOrder().size());
+    }
+
+    public SortInfo getSortInfo() {
+        return info;
+    }
+
+    @Override
+    public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) {
+        super.getMaterializedIds(analyzer, ids);
+        Expr.getIds(info.getOrderingExprs(), null, ids);
+    }
+
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
+        if (detailLevel == TExplainLevel.BRIEF) {
+            return "";
+        }
+
+        StringBuilder output = new StringBuilder();
+
+        // Add the function name.
+        String funcName;
+        if (function == WindowFuncType.ROW_NUMBER) {
+            funcName = "row_number";
+        } else if (function == WindowFuncType.RANK) {
+            funcName = "rank";
+        } else {
+            funcName = "dense_rank";
+        }
+        output.append(prefix).append("functions: 
").append(funcName).append("\n");
+
+        // Add the partition expr.
+        List<String> strings = Lists.newArrayList();
+        if (!partitionExprs.isEmpty()) {
+            output.append(prefix).append("partition by: ");
+
+            for (Expr partitionExpr : partitionExprs) {
+                strings.add(partitionExpr.toSql());
+            }
+
+            output.append(Joiner.on(", ").join(strings));
+            output.append("\n");
+        }
+
+        // Add the order by.
+        output.append(prefix).append("order by: ");
+        Iterator<Expr> expr = info.getOrderingExprs().iterator();
+        Iterator<Boolean> isAsc = info.getIsAscOrder().iterator();
+        boolean start = true;
+        while (expr.hasNext()) {
+            if (start) {
+                start = false;
+            } else {
+                output.append(", ");
+            }
+            output.append(expr.next().toSql()).append(" ");
+            output.append(isAsc.next() ? "ASC" : "DESC");
+        }
+        output.append("\n");
+
+        // Add the limit information;
+        output.append(prefix).append("has global limit: 
").append(hasGlobalLimit).append("\n");
+        output.append(prefix).append("partition limit: 
").append(partitionLimit).append("\n");
+
+        return output.toString();
+    }
+
+    private void removeUnusedExprs() {
+        if (!isUnusedExprRemoved) {
+            if (resolvedTupleExprs != null) {
+                List<SlotDescriptor> slotDescriptorList = 
this.info.getSortTupleDescriptor().getSlots();
+                for (int i = slotDescriptorList.size() - 1; i >= 0; i--) {
+                    if (!slotDescriptorList.get(i).isMaterialized()) {
+                        resolvedTupleExprs.remove(i);
+                        nullabilityChangedFlags.remove(i);
+                    }
+                }
+            }
+            isUnusedExprRemoved = true;
+        }
+    }
+
+    @Override
+    protected void toThrift(TPlanNode msg) {
+        msg.node_type = TPlanNodeType.PARTITION_SORT_NODE;
+
+        TSortInfo sortInfo = info.toThrift();
+        Preconditions.checkState(tupleIds.size() == 1, "Incorrect size for 
tupleIds in PartitionSortNode");
+        removeUnusedExprs();
+        if (resolvedTupleExprs != null) {
+            
sortInfo.setSortTupleSlotExprs(Expr.treesToThrift(resolvedTupleExprs));
+            // FIXME this is a bottom line solution for wrong nullability of 
resolvedTupleExprs
+            // remove the following line after nereids online
+            
sortInfo.setSlotExprsNullabilityChangedFlags(nullabilityChangedFlags);
+        }
+
+        TopNAlgorithm topNAlgorithm;
+        if (function == WindowFuncType.ROW_NUMBER) {
+            topNAlgorithm = TopNAlgorithm.ROW_NUMBER;
+        } else if (function == WindowFuncType.RANK) {
+            topNAlgorithm = TopNAlgorithm.RANK;
+        } else {
+            topNAlgorithm = TopNAlgorithm.DENSE_RANK;
+        }
+
+        TPartitionSortNode partitionSortNode = new TPartitionSortNode();
+        partitionSortNode.setTopNAlgorithm(topNAlgorithm);
+        
partitionSortNode.setPartitionExprs(Expr.treesToThrift(partitionExprs));
+        partitionSortNode.setSortInfo(sortInfo);
+        partitionSortNode.setHasGlobalLimit(hasGlobalLimit);
+        partitionSortNode.setPartitionInnerLimit(partitionLimit);
+        msg.partition_sort_node = partitionSortNode;
+    }
+
+    @Override
+    public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws 
NotImplementedException {
+        removeUnusedExprs();
+        List<Expr> materializedTupleExprs = new 
ArrayList<>(resolvedTupleExprs);
+        List<SlotId> result = Lists.newArrayList();
+        Expr.getIds(materializedTupleExprs, null, result);
+        return new HashSet<>(result);
+    }
+
+    /**
+     * Supplement the information needed by be for the partition sort node.
+     */
+    public void finalizeForNereids(TupleDescriptor tupleDescriptor,

Review Comment:
   could we put all things into constructor? `finalizeForNereids` will be 
removed in the future, new Node should not implement this function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to