This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new dab8ed6aa2 [fix](Nereids)Runtimefilter pushdown through TopN or Window bug (#24432) dab8ed6aa2 is described below commit dab8ed6aa2fc4d7f294ef2459d54f124f58e1956 Author: minghong <engle...@gmail.com> AuthorDate: Fri Sep 15 14:42:46 2023 +0800 [fix](Nereids)Runtimefilter pushdown through TopN or Window bug (#24432) --- .../processor/post/RuntimeFilterGenerator.java | 50 ++++++++++------------ .../doris/nereids/trees/plans/algebra/Window.java | 34 +++++++++++++++ .../nereids/postprocess/RuntimeFilterTest.java | 21 +++++++++ 3 files changed, 78 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index de4d65f6c5..a572d3c7bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -51,7 +51,9 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; +import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.JoinUtils; @@ -124,33 +126,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } else { pushDownRuntimeFilterCommon(join, context); } - // - // if (DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) { - // // aliasTransMap is also used for judging whether the slot can be as rf target. - // // for denied join type, the forbidden slots will be removed from the map. - // // for example: a full outer join b on a.id = b.id, all slots will be removed out. - // // for left outer join, only remove the right side slots and leave the left side. - // // in later visit, the additional checking for the join type will be invoked for different cases: - // // case 1: a left join b on a.id = b.id, checking whether rf on b.id can be pushed to a, - // the answer is no, - // // since current join type is left outer join which is in denied list; - // // case 2: (a left join b on a.id = b.id) inner join c on a.id2 = c.id2, checking whether rf on c.id2 can - // // be pushed to a, the answer is yes, since the current join is inner join which is permitted. - // if (join.getJoinType() == JoinType.LEFT_OUTER_JOIN) { - // Set<Slot> slots = join.right().getOutputSet(); - // slots.forEach(aliasTransferMap::remove); - // } else { - // Set<Slot> slots = join.getOutputSet(); - // slots.forEach(aliasTransferMap::remove); - // } - // } else { - // collectPushDownCTEInfos(join, context); - // if (!getPushDownCTECandidates(ctx).isEmpty()) { - // pushDownRuntimeFilterIntoCTE(ctx); - // } else { - // pushDownRuntimeFilterCommon(join, context); - // } - // } return join; } @@ -168,6 +143,26 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { return producer; } + @Override + public PhysicalPlan visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext context) { + topN.child().accept(this, context); + PhysicalPlan child = (PhysicalPlan) topN.child(); + for (Slot slot : child.getOutput()) { + context.getRuntimeFilterContext().getAliasTransferMap().remove(slot); + } + return topN; + } + + @Override + public PhysicalPlan visitPhysicalWindow(PhysicalWindow<? extends Plan> window, CascadesContext context) { + window.child().accept(this, context); + Set<SlotReference> commonPartitionKeys = window.getCommonPartitionKeyFromWindowExpressions(); + window.child().getOutput().stream().filter(slot -> !commonPartitionKeys.contains(slot)).forEach( + slot -> context.getRuntimeFilterContext().getAliasTransferMap().remove(slot) + ); + return window; + } + @Override public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { @@ -326,6 +321,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { if (!checkPhysicalRelationType(scan)) { return; } + if (scan instanceof PhysicalCTEConsumer) { Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE(); CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java index 35f6547b8a..00d290940e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java @@ -22,16 +22,24 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.translator.ExpressionTranslator; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.WindowExpression; import org.apache.doris.nereids.trees.expressions.WindowFrame; import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundType; import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundary; import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameUnitsType; import org.apache.doris.nereids.trees.expressions.literal.Literal; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + import java.math.BigDecimal; import java.util.List; +import java.util.Map; +import java.util.Set; /** * interface for LogicalWindow and PhysicalWindow @@ -86,4 +94,30 @@ public interface Window { } } + /** + * + * select rank() over (partition by A, B) as r, sum(x) over(A, C) as s from T; + * A is a common partition key for all windowExpressions. + * for a common Partition key A, we could push filter A=1 through this window. + */ + default Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() { + ImmutableSet.Builder<SlotReference> commonPartitionKeySet = ImmutableSet.builder(); + Map<Expression, Integer> partitionKeyCount = Maps.newHashMap(); + for (Expression expr : getWindowExpressions()) { + if (expr instanceof Alias && expr.child(0) instanceof WindowExpression) { + WindowExpression winExpr = (WindowExpression) expr.child(0); + for (Expression partitionKey : winExpr.getPartitionKeys()) { + int count = partitionKeyCount.getOrDefault(partitionKey, 0); + partitionKeyCount.put(partitionKey, count + 1); + } + } + } + int winExprCount = getWindowExpressions().size(); + for (Map.Entry<Expression, Integer> entry : partitionKeyCount.entrySet()) { + if (entry.getValue() == winExprCount && entry.getKey() instanceof SlotReference) { + commonPartitionKeySet.add((SlotReference) entry.getKey()); + } + } + return commonPartitionKeySet.build(); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java index fc08b0c35d..b429c9defa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java @@ -255,4 +255,25 @@ public class RuntimeFilterTest extends SSBTestBase { filter.getTargetExprs().get(0).getName()))); } } + + @Test + public void testRuntimeFilterBlockByWindow() { + String sql = "SELECT * FROM (select rank() over(partition by lo_partkey), lo_custkey from lineorder) t JOIN customer on lo_custkey = c_custkey"; + List<RuntimeFilter> filters = getRuntimeFilters(sql).get(); + Assertions.assertEquals(0, filters.size()); + } + + @Test + public void testRuntimeFilterNotBlockByWindow() { + String sql = "SELECT * FROM (select rank() over(partition by lo_custkey), lo_custkey from lineorder) t JOIN customer on lo_custkey = c_custkey"; + List<RuntimeFilter> filters = getRuntimeFilters(sql).get(); + Assertions.assertEquals(1, filters.size()); + } + + @Test + public void testRuntimeFilterBlockByTopN() { + String sql = "SELECT * FROM (select lo_custkey from lineorder order by lo_custkey limit 10) t JOIN customer on lo_custkey = c_custkey"; + List<RuntimeFilter> filters = getRuntimeFilters(sql).get(); + Assertions.assertEquals(0, filters.size()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org