This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e7ef35741 [fix](Nereids) two phase read for topn only support simple 
case (#18955)
1e7ef35741 is described below

commit 1e7ef357414132a4848836c33df6f27856fdc5eb
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Sun Apr 23 21:32:23 2023 +0800

    [fix](Nereids) two phase read for topn only support simple case (#18955)
    
    1. topn must has merge node
    2. topn must the top node of plan
---
 .../glue/translator/PhysicalPlanTranslator.java    |  6 +--
 .../nereids/processor/post/PlanPostProcessor.java  |  5 ++
 .../nereids/processor/post/PlanPostProcessors.java |  2 +-
 .../doris/nereids/processor/post/TopNScanOpt.java  |  2 +-
 .../nereids/processor/post/TwoPhaseReadOpt.java    | 55 +++++++++++++++-------
 .../nereids/trees/plans/physical/PhysicalTopN.java |  2 +-
 .../nereids/postprocess/TopNRuntimeFilterTest.java |  4 +-
 7 files changed, 49 insertions(+), 27 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index e51e5469b7..7961fddd57 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -884,7 +884,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             SortNode sortNode = translateSortNode(topN, 
inputFragment.getPlanRoot(), context);
             sortNode.setOffset(topN.getOffset());
             sortNode.setLimit(topN.getLimit());
-            if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
+            if 
(topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
                 sortNode.setUseTopnOpt(true);
                 PlanNode child = sortNode.getChild(0);
                 Preconditions.checkArgument(child instanceof OlapScanNode,
@@ -928,9 +928,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         });
         List<Expr> sortTupleOutputList = new ArrayList<>();
         List<Slot> outputList = sort.getOutput();
-        outputList.forEach(k -> {
-            sortTupleOutputList.add(ExpressionTranslator.translate(k, 
context));
-        });
+        outputList.forEach(k -> 
sortTupleOutputList.add(ExpressionTranslator.translate(k, context)));
         // 2. Generate new Tuple and get current slotRef for 
newOrderingExprList
         List<Expr> newOrderingExprList = Lists.newArrayList();
         TupleDescriptor tupleDesc = generateTupleDesc(outputList, 
orderKeyList, newOrderingExprList, context, null);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
index fa6a9deaa9..5090acedf4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
@@ -18,10 +18,15 @@
 package org.apache.doris.nereids.processor.post;
 
 import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
 
 /**
  * PlanPostprocessor: a PlanVisitor to rewrite PhysicalPlan to new 
PhysicalPlan.
  */
 public class PlanPostProcessor extends DefaultPlanRewriter<CascadesContext> {
+
+    public Plan processRoot(Plan plan, CascadesContext ctx) {
+        return plan.accept(this, ctx);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index b96e7bbf3e..0843d1e04b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -47,7 +47,7 @@ public class PlanPostProcessors {
     public PhysicalPlan process(PhysicalPlan physicalPlan) {
         PhysicalPlan resultPlan = physicalPlan;
         for (PlanPostProcessor processor : getProcessors()) {
-            resultPlan = (PhysicalPlan) resultPlan.accept(processor, 
cascadesContext);
+            resultPlan = (PhysicalPlan) processor.processRoot(resultPlan, 
cascadesContext);
         }
         return resultPlan;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index bb00983d2d..a938a231ed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -78,7 +78,7 @@ public class TopNScanOpt extends PlanPostProcessor {
         olapScan = (PhysicalOlapScan) child;
 
         if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
-            topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
+            topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
         }
 
         return topN;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
index 454caae435..543f908456 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
@@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.SortPhase;
 import org.apache.doris.nereids.trees.plans.algebra.Filter;
 import org.apache.doris.nereids.trees.plans.algebra.Project;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
@@ -52,23 +53,40 @@ import java.util.Set;
 public class TwoPhaseReadOpt extends PlanPostProcessor {
 
     @Override
-    public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, 
CascadesContext ctx) {
-        topN.child().accept(this, ctx);
-        Plan child = topN.child();
-        if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
-            return topN;
+    public Plan processRoot(Plan plan, CascadesContext ctx) {
+        if (plan instanceof PhysicalTopN) {
+            PhysicalTopN<Plan> physicalTopN = (PhysicalTopN<Plan>) plan;
+            if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) {
+                return plan.accept(this, ctx);
+            }
         }
-        if (topN.getOrderKeys().isEmpty()) {
-            return topN;
+        return plan;
+    }
+
+    @Override
+    public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> 
mergeTopN, CascadesContext ctx) {
+        mergeTopN.child().accept(this, ctx);
+        Plan child = mergeTopN.child();
+        if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || 
!(mergeTopN.child() instanceof PhysicalDistribute)) {
+            return mergeTopN;
+        }
+        PhysicalDistribute<Plan> distribute = (PhysicalDistribute<Plan>) 
mergeTopN.child();
+        if (!(distribute.child() instanceof PhysicalTopN)) {
+            return mergeTopN;
+        }
+        PhysicalTopN<Plan> localTopN = (PhysicalTopN<Plan>) distribute.child();
+
+        if (localTopN.getOrderKeys().isEmpty()) {
+            return mergeTopN;
         }
 
         // topn opt
         long topNOptLimitThreshold = getTopNOptLimitThreshold();
-        if (topNOptLimitThreshold < 0 || topN.getLimit() > 
topNOptLimitThreshold) {
-            return topN;
+        if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > 
topNOptLimitThreshold) {
+            return mergeTopN;
         }
-        if 
(!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable))
 {
-            return topN;
+        if 
(!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable))
 {
+            return mergeTopN;
         }
 
         PhysicalOlapScan olapScan;
@@ -81,18 +99,18 @@ public class TwoPhaseReadOpt extends PlanPostProcessor {
             if (child instanceof Project) {
                 project = (PhysicalProject<Plan>) child;
                 // TODO: remove this after fix two phase read on project core
-                return topN;
+                return mergeTopN;
             }
             child = child.child(0);
         }
         if (!(child instanceof PhysicalOlapScan)) {
-            return topN;
+            return mergeTopN;
         }
         olapScan = (PhysicalOlapScan) child;
 
         // all order key must column from table
         if (!olapScan.getTable().getEnableLightSchemaChange()) {
-            return topN;
+            return mergeTopN;
         }
 
         Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
@@ -114,22 +132,23 @@ public class TwoPhaseReadOpt extends PlanPostProcessor {
         if (filter != null) {
             filter.getConjuncts().forEach(e -> 
deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
         }
-        topN.getOrderKeys().stream()
+        localTopN.getOrderKeys().stream()
                 .map(OrderKey::getExpr)
                 .map(Slot.class::cast)
                 .map(NamedExpression::getExprId)
                 .map(projectRevertedMap::get)
                 .filter(Objects::nonNull)
                 .forEach(deferredMaterializedExprIds::remove);
-        topN.getOrderKeys().stream()
+        localTopN.getOrderKeys().stream()
                 .map(OrderKey::getExpr)
                 .map(Slot.class::cast)
                 .map(NamedExpression::getExprId)
                 .forEach(deferredMaterializedExprIds::remove);
-        topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
+        localTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
+        mergeTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
         olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, 
deferredMaterializedExprIds);
 
-        return topN;
+        return mergeTopN;
     }
 
     private long getTopNOptLimitThreshold() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 2ca9a4c51c..4a58f5d9e9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -40,7 +40,7 @@ import java.util.Optional;
  */
 public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<CHILD_TYPE> implements TopN {
 
-    public static final String TOPN_OPT = "topn_opt";
+    public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
     public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt";
 
     private final long limit;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
index 9da544143a..6cbbcbf071 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
@@ -42,7 +42,7 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
         new PlanPostProcessors(checker.getCascadesContext()).process(plan);
         Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
         PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
-        
Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
+        
Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
     }
 
     // topn rf do not apply on string-like and float column
@@ -56,6 +56,6 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
         new PlanPostProcessors(checker.getCascadesContext()).process(plan);
         Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
         PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
-        
Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
+        
Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to