This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1e7ef35741 [fix](Nereids) two phase read for topn only support simple case (#18955) 1e7ef35741 is described below commit 1e7ef357414132a4848836c33df6f27856fdc5eb Author: morrySnow <101034200+morrys...@users.noreply.github.com> AuthorDate: Sun Apr 23 21:32:23 2023 +0800 [fix](Nereids) two phase read for topn only support simple case (#18955) 1. topn must has merge node 2. topn must the top node of plan --- .../glue/translator/PhysicalPlanTranslator.java | 6 +-- .../nereids/processor/post/PlanPostProcessor.java | 5 ++ .../nereids/processor/post/PlanPostProcessors.java | 2 +- .../doris/nereids/processor/post/TopNScanOpt.java | 2 +- .../nereids/processor/post/TwoPhaseReadOpt.java | 55 +++++++++++++++------- .../nereids/trees/plans/physical/PhysicalTopN.java | 2 +- .../nereids/postprocess/TopNRuntimeFilterTest.java | 4 +- 7 files changed, 49 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index e51e5469b7..7961fddd57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -884,7 +884,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context); sortNode.setOffset(topN.getOffset()); sortNode.setLimit(topN.getLimit()); - if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) { + if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) { sortNode.setUseTopnOpt(true); PlanNode child = sortNode.getChild(0); Preconditions.checkArgument(child instanceof OlapScanNode, @@ -928,9 +928,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla }); List<Expr> sortTupleOutputList = new ArrayList<>(); List<Slot> outputList = sort.getOutput(); - outputList.forEach(k -> { - sortTupleOutputList.add(ExpressionTranslator.translate(k, context)); - }); + outputList.forEach(k -> sortTupleOutputList.add(ExpressionTranslator.translate(k, context))); // 2. Generate new Tuple and get current slotRef for newOrderingExprList List<Expr> newOrderingExprList = Lists.newArrayList(); TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java index fa6a9deaa9..5090acedf4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java @@ -18,10 +18,15 @@ package org.apache.doris.nereids.processor.post; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; /** * PlanPostprocessor: a PlanVisitor to rewrite PhysicalPlan to new PhysicalPlan. */ public class PlanPostProcessor extends DefaultPlanRewriter<CascadesContext> { + + public Plan processRoot(Plan plan, CascadesContext ctx) { + return plan.accept(this, ctx); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index b96e7bbf3e..0843d1e04b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -47,7 +47,7 @@ public class PlanPostProcessors { public PhysicalPlan process(PhysicalPlan physicalPlan) { PhysicalPlan resultPlan = physicalPlan; for (PlanPostProcessor processor : getProcessors()) { - resultPlan = (PhysicalPlan) resultPlan.accept(processor, cascadesContext); + resultPlan = (PhysicalPlan) processor.processRoot(resultPlan, cascadesContext); } return resultPlan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java index bb00983d2d..a938a231ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java @@ -78,7 +78,7 @@ public class TopNScanOpt extends PlanPostProcessor { olapScan = (PhysicalOlapScan) child; if (olapScan.getTable().isDupKeysOrMergeOnWrite()) { - topN.setMutableState(PhysicalTopN.TOPN_OPT, true); + topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true); } return topN; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java index 454caae435..543f908456 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.SortPhase; import org.apache.doris.nereids.trees.plans.algebra.Filter; import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; @@ -52,23 +53,40 @@ import java.util.Set; public class TwoPhaseReadOpt extends PlanPostProcessor { @Override - public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) { - topN.child().accept(this, ctx); - Plan child = topN.child(); - if (topN.getSortPhase() != SortPhase.LOCAL_SORT) { - return topN; + public Plan processRoot(Plan plan, CascadesContext ctx) { + if (plan instanceof PhysicalTopN) { + PhysicalTopN<Plan> physicalTopN = (PhysicalTopN<Plan>) plan; + if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) { + return plan.accept(this, ctx); + } } - if (topN.getOrderKeys().isEmpty()) { - return topN; + return plan; + } + + @Override + public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> mergeTopN, CascadesContext ctx) { + mergeTopN.child().accept(this, ctx); + Plan child = mergeTopN.child(); + if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || !(mergeTopN.child() instanceof PhysicalDistribute)) { + return mergeTopN; + } + PhysicalDistribute<Plan> distribute = (PhysicalDistribute<Plan>) mergeTopN.child(); + if (!(distribute.child() instanceof PhysicalTopN)) { + return mergeTopN; + } + PhysicalTopN<Plan> localTopN = (PhysicalTopN<Plan>) distribute.child(); + + if (localTopN.getOrderKeys().isEmpty()) { + return mergeTopN; } // topn opt long topNOptLimitThreshold = getTopNOptLimitThreshold(); - if (topNOptLimitThreshold < 0 || topN.getLimit() > topNOptLimitThreshold) { - return topN; + if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > topNOptLimitThreshold) { + return mergeTopN; } - if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) { - return topN; + if (!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) { + return mergeTopN; } PhysicalOlapScan olapScan; @@ -81,18 +99,18 @@ public class TwoPhaseReadOpt extends PlanPostProcessor { if (child instanceof Project) { project = (PhysicalProject<Plan>) child; // TODO: remove this after fix two phase read on project core - return topN; + return mergeTopN; } child = child.child(0); } if (!(child instanceof PhysicalOlapScan)) { - return topN; + return mergeTopN; } olapScan = (PhysicalOlapScan) child; // all order key must column from table if (!olapScan.getTable().getEnableLightSchemaChange()) { - return topN; + return mergeTopN; } Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap(); @@ -114,22 +132,23 @@ public class TwoPhaseReadOpt extends PlanPostProcessor { if (filter != null) { filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds())); } - topN.getOrderKeys().stream() + localTopN.getOrderKeys().stream() .map(OrderKey::getExpr) .map(Slot.class::cast) .map(NamedExpression::getExprId) .map(projectRevertedMap::get) .filter(Objects::nonNull) .forEach(deferredMaterializedExprIds::remove); - topN.getOrderKeys().stream() + localTopN.getOrderKeys().stream() .map(OrderKey::getExpr) .map(Slot.class::cast) .map(NamedExpression::getExprId) .forEach(deferredMaterializedExprIds::remove); - topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true); + localTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true); + mergeTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true); olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds); - return topN; + return mergeTopN; } private long getTopNOptLimitThreshold() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java index 2ca9a4c51c..4a58f5d9e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java @@ -40,7 +40,7 @@ import java.util.Optional; */ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN { - public static final String TOPN_OPT = "topn_opt"; + public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter"; public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt"; private final long limit; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java index 9da544143a..6cbbcbf071 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java @@ -42,7 +42,7 @@ public class TopNRuntimeFilterTest extends SSBTestBase { new PlanPostProcessors(checker.getCascadesContext()).process(plan); Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN); PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0); - Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()); + Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); } // topn rf do not apply on string-like and float column @@ -56,6 +56,6 @@ public class TopNRuntimeFilterTest extends SSBTestBase { new PlanPostProcessors(checker.getCascadesContext()).process(plan); Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN); PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0); - Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()); + Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org