This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a8d690272f [refactor](Nereids) let topn runtime filter as 
PhysicalTopN's attr (#22745)
a8d690272f is described below

commit a8d690272f8f146dad0e049d9f857e3e030d6ca1
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Wed Aug 9 12:13:21 2023 +0800

    [refactor](Nereids) let topn runtime filter as PhysicalTopN's attr (#22745)
    
    The original implement use MutableMap on PhysicalTopN.
    It is easy to lose if we rewrite the plan after this processor.
    The new implement use attr to indict whether use topn runtime filter
---
 .../glue/translator/PhysicalPlanTranslator.java    |  2 +-
 .../doris/nereids/processor/post/TopNScanOpt.java  | 34 ++++++++++----
 .../physical/PhysicalDeferMaterializeTopN.java     | 14 +++---
 .../nereids/trees/plans/physical/PhysicalTopN.java | 52 +++++++++++++---------
 .../nereids/postprocess/TopNRuntimeFilterTest.java | 11 ++---
 5 files changed, 71 insertions(+), 42 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 41be06be44..4144987bae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1698,7 +1698,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             SortNode sortNode = translateSortNode(topN, 
inputFragment.getPlanRoot(), context);
             sortNode.setOffset(topN.getOffset());
             sortNode.setLimit(topN.getLimit());
-            if 
(topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
+            if (topN.isEnableRuntimeFilter()) {
                 sortNode.setUseTopnOpt(true);
                 PlanNode child = sortNode.getChild(0);
                 Preconditions.checkArgument(child instanceof OlapScanNode,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index 0c5d8d8ce6..b221570edf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -28,6 +28,8 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTop
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * topN opt
  * refer to:
@@ -39,7 +41,29 @@ public class TopNScanOpt extends PlanPostProcessor {
 
     @Override
     public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? 
extends Plan> topN, CascadesContext ctx) {
-        topN.child().accept(this, ctx);
+        Plan child = topN.child().accept(this, ctx);
+        topN = rewriteTopN(topN);
+        if (child != topN.child()) {
+            topN.withChildren(child);
+        }
+        return topN;
+    }
+
+    @Override
+    public Plan 
visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> 
topN,
+            CascadesContext context) {
+        Plan child = topN.child().accept(this, context);
+        if (child != topN.child()) {
+            topN = topN.withChildren(ImmutableList.of(child));
+        }
+        PhysicalTopN<? extends Plan> rewrittenTopN = 
rewriteTopN(topN.getPhysicalTopN());
+        if (topN.getPhysicalTopN() != rewrittenTopN) {
+            topN = topN.withPhysicalTopN(rewrittenTopN);
+        }
+        return topN;
+    }
+
+    private PhysicalTopN<? extends Plan> rewriteTopN(PhysicalTopN<? extends 
Plan> topN) {
         Plan child = topN.child();
         if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
             return topN;
@@ -79,18 +103,12 @@ public class TopNScanOpt extends PlanPostProcessor {
         olapScan = (OlapScan) child;
 
         if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
-            topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
+            return topN.withEnableRuntimeFilter(true);
         }
 
         return topN;
     }
 
-    @Override
-    public Plan 
visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> 
topN,
-            CascadesContext context) {
-        return topN.withPhysicalTopN(visitPhysicalTopN(topN.getPhysicalTopN(), 
context));
-    }
-
     private long getTopNOptLimitThreshold() {
         if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
             return 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java
index 2c2a53761a..f5db3ff42f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java
@@ -91,13 +91,13 @@ public class PhysicalDeferMaterializeTopN<CHILD_TYPE 
extends Plan>
         return physicalTopN.getLimit();
     }
 
-    public Plan withPhysicalTopN(PhysicalTopN<? extends Plan> physicalTopN) {
+    public PhysicalDeferMaterializeTopN<? extends Plan> 
withPhysicalTopN(PhysicalTopN<? extends Plan> physicalTopN) {
         return new PhysicalDeferMaterializeTopN<>(physicalTopN, 
deferMaterializeSlotIds, columnIdSlot, groupExpression,
                 getLogicalProperties(), physicalProperties, statistics, 
physicalTopN.child());
     }
 
     @Override
-    public Plan withChildren(List<Plan> children) {
+    public PhysicalDeferMaterializeTopN<? extends Plan> 
withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1,
                 "PhysicalDeferMaterializeTopN's children size must be 1, but 
real is %s", children.size());
         return new 
PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))),
@@ -111,13 +111,14 @@ public class PhysicalDeferMaterializeTopN<CHILD_TYPE 
extends Plan>
     }
 
     @Override
-    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+    public PhysicalDeferMaterializeTopN<? extends Plan> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
         return new PhysicalDeferMaterializeTopN<>(physicalTopN, 
deferMaterializeSlotIds, columnIdSlot,
                 groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
     }
 
     @Override
-    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+    public PhysicalDeferMaterializeTopN<? extends Plan> 
withGroupExprLogicalPropChildren(
+            Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
         Preconditions.checkArgument(children.size() == 1,
                 "PhysicalDeferMaterializeTopN's children size must be 1, but 
real is %s", children.size());
@@ -127,7 +128,8 @@ public class PhysicalDeferMaterializeTopN<CHILD_TYPE 
extends Plan>
     }
 
     @Override
-    public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
+    public PhysicalDeferMaterializeTopN<? extends Plan> 
withPhysicalPropertiesAndStats(
+            PhysicalProperties physicalProperties, Statistics statistics) {
         return new PhysicalDeferMaterializeTopN<>(physicalTopN, 
deferMaterializeSlotIds, columnIdSlot,
                 groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
     }
@@ -138,7 +140,7 @@ public class PhysicalDeferMaterializeTopN<CHILD_TYPE 
extends Plan>
     }
 
     @Override
-    public PhysicalDeferMaterializeTopN<CHILD_TYPE> resetLogicalProperties() {
+    public PhysicalDeferMaterializeTopN<? extends Plan> 
resetLogicalProperties() {
         return new PhysicalDeferMaterializeTopN<>(physicalTopN, 
deferMaterializeSlotIds, columnIdSlot,
                 groupExpression, null, physicalProperties, statistics, 
child());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 7df18fd010..bccc2cad41 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -41,37 +41,38 @@ import java.util.Optional;
  */
 public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<CHILD_TYPE> implements TopN {
 
-    public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
-
     private final long limit;
     private final long offset;
+    private final boolean enableRuntimeFilter;
 
     public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
             SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE 
child) {
-        this(orderKeys, limit, offset, phase, Optional.empty(), 
logicalProperties, child);
+        this(orderKeys, limit, offset, phase, false, Optional.empty(), 
logicalProperties, child);
     }
 
     /**
      * Constructor of PhysicalHashJoinNode.
      */
     public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
-            SortPhase phase, Optional<GroupExpression> groupExpression, 
LogicalProperties logicalProperties,
-            CHILD_TYPE child) {
-        this(orderKeys, limit, offset, phase, groupExpression, 
logicalProperties,
-                null, null, child);
+            SortPhase phase, boolean enableRuntimeFilter,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties, CHILD_TYPE child) {
+        this(orderKeys, limit, offset, phase, enableRuntimeFilter,
+                groupExpression, logicalProperties, null, null, child);
     }
 
     /**
      * Constructor of PhysicalHashJoinNode.
      */
     public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
-            SortPhase phase, Optional<GroupExpression> groupExpression, 
LogicalProperties logicalProperties,
+            SortPhase phase, boolean enableRuntimeFilter,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
             PhysicalProperties physicalProperties, Statistics statistics, 
CHILD_TYPE child) {
         super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, 
logicalProperties, physicalProperties,
                 statistics, child);
         Objects.requireNonNull(orderKeys, "orderKeys should not be null in 
PhysicalTopN.");
         this.limit = limit;
         this.offset = offset;
+        this.enableRuntimeFilter = enableRuntimeFilter;
     }
 
     public long getLimit() {
@@ -82,6 +83,10 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
         return offset;
     }
 
+    public boolean isEnableRuntimeFilter() {
+        return enableRuntimeFilter;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -94,12 +99,12 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
             return false;
         }
         PhysicalTopN<?> that = (PhysicalTopN<?>) o;
-        return limit == that.limit && offset == that.offset;
+        return limit == that.limit && offset == that.offset && 
enableRuntimeFilter == that.enableRuntimeFilter;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), limit, offset);
+        return Objects.hash(super.hashCode(), limit, offset, 
enableRuntimeFilter);
     }
 
     @Override
@@ -107,33 +112,39 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> 
extends AbstractPhysicalSort<
         return visitor.visitPhysicalTopN(this, context);
     }
 
+    public PhysicalTopN<Plan> withEnableRuntimeFilter(boolean 
enableRuntimeFilter) {
+        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
enableRuntimeFilter,
+                groupExpression, getLogicalProperties(), child());
+    }
+
     @Override
     public PhysicalTopN<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1,
                 "PhysicalTopN's children size must be 1, but real is %s", 
children.size());
-        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
groupExpression,
+        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
enableRuntimeFilter, groupExpression,
                 getLogicalProperties(), physicalProperties, statistics, 
children.get(0));
     }
 
     @Override
     public PhysicalTopN<CHILD_TYPE> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
-        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
groupExpression, getLogicalProperties(), child());
+        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
enableRuntimeFilter,
+                groupExpression, getLogicalProperties(), child());
     }
 
     @Override
-    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+    public PhysicalTopN<Plan> 
withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
         Preconditions.checkArgument(children.size() == 1,
                 "PhysicalTopN's children size must be 1, but real is %s", 
children.size());
-        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
groupExpression, logicalProperties.get(),
-                children.get(0));
+        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
enableRuntimeFilter,
+                groupExpression, logicalProperties.get(), children.get(0));
     }
 
     @Override
     public PhysicalTopN<CHILD_TYPE> 
withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
             Statistics statistics) {
-        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
groupExpression,
-                getLogicalProperties(), physicalProperties, statistics, 
child());
+        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
enableRuntimeFilter,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
     }
 
     @Override
@@ -142,7 +153,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
                 "limit", limit,
                 "offset", offset,
                 "orderKeys", orderKeys,
-                "phase", phase.toString()
+                "phase", phase.toString(),
+                "enableRuntimeFilter", enableRuntimeFilter
         );
     }
 
@@ -152,8 +164,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
     }
 
     @Override
-    public PhysicalTopN<CHILD_TYPE> resetLogicalProperties() {
-        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
groupExpression,
+    public PhysicalTopN<Plan> resetLogicalProperties() {
+        return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
enableRuntimeFilter, groupExpression,
                 null, physicalProperties, statistics, child());
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
index 944ebcf3e8..f4fdf6f44f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
@@ -22,7 +22,6 @@ import 
org.apache.doris.nereids.processor.post.PlanPostProcessors;
 import org.apache.doris.nereids.trees.plans.Plan;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.nereids.util.PlanChecker;
 
 import org.junit.jupiter.api.Assertions;
@@ -41,12 +40,11 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
                 .rewrite()
                 .implement();
         PhysicalPlan plan = checker.getPhysicalPlan();
-        new PlanPostProcessors(checker.getCascadesContext()).process(plan);
+        plan = new 
PlanPostProcessors(checker.getCascadesContext()).process(plan);
         Assertions.assertTrue(plan.children().get(0).child(0) instanceof 
PhysicalDeferMaterializeTopN);
         PhysicalDeferMaterializeTopN<? extends Plan> localTopN
                 = (PhysicalDeferMaterializeTopN<? extends Plan>) 
plan.child(0).child(0);
-        Assertions.assertTrue(localTopN.getPhysicalTopN()
-                
.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
+        
Assertions.assertTrue(localTopN.getPhysicalTopN().isEnableRuntimeFilter());
     }
 
     // topn rf do not apply on string-like and float column
@@ -57,11 +55,10 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
                 .rewrite()
                 .implement();
         PhysicalPlan plan = checker.getPhysicalPlan();
-        new PlanPostProcessors(checker.getCascadesContext()).process(plan);
+        plan = new 
PlanPostProcessors(checker.getCascadesContext()).process(plan);
         Assertions.assertTrue(plan.children().get(0).child(0) instanceof 
PhysicalDeferMaterializeTopN);
         PhysicalDeferMaterializeTopN<? extends Plan> localTopN
                 = (PhysicalDeferMaterializeTopN<? extends Plan>) 
plan.child(0).child(0);
-        Assertions.assertFalse(localTopN.getPhysicalTopN()
-                
.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
+        
Assertions.assertFalse(localTopN.getPhysicalTopN().isEnableRuntimeFilter());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to