This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3117ac9289 [enhancement](Nereids) use post-order to generate runtime filter in RuntimeFilterGenerator (#13949) 3117ac9289 is described below commit 3117ac9289fbfc37728f9f022289aa19b77a531d Author: mch_ucchi <41606806+sohardforan...@users.noreply.github.com> AuthorDate: Wed Nov 9 14:28:49 2022 +0800 [enhancement](Nereids) use post-order to generate runtime filter in RuntimeFilterGenerator (#13949) change runtime filter generator from pre-order to post-order, it maybe change the quantity of generated runtime filters. and the ut will be corrected. --- .../glue/translator/RuntimeFilterTranslator.java | 5 +- .../processor/post/RuntimeFilterContext.java | 62 ++-------- .../processor/post/RuntimeFilterGenerator.java | 130 ++++++++++----------- .../trees/plans/physical/RuntimeFilter.java | 44 ------- .../nereids/postprocess/RuntimeFilterTest.java | 7 +- 5 files changed, 78 insertions(+), 170 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 1dc7a0a2c1..09dbb4115b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -60,9 +60,8 @@ public class RuntimeFilterTranslator { * @param ctx plan translator context */ public void translateRuntimeFilterTarget(Slot slot, OlapScanNode node, PlanTranslatorContext ctx) { - context.setKVInNormalMap(context.getExprIdToOlapScanNodeSlotRef(), - slot.getExprId(), ctx.findSlotRef(slot.getExprId())); - context.setKVInNormalMap(context.getScanNodeOfLegacyRuntimeFilterTarget(), slot, node); + context.getExprIdToOlapScanNodeSlotRef().put(slot.getExprId(), ctx.findSlotRef(slot.getExprId())); + context.getScanNodeOfLegacyRuntimeFilterTarget().put(slot, node); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index 5565dcf928..03c7be5f17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -19,6 +19,7 @@ package org.apache.doris.nereids.processor.post; import org.apache.doris.analysis.SlotRef; import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.Pair; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; @@ -32,12 +33,9 @@ import org.apache.doris.qe.SessionVariable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.jetbrains.annotations.NotNull; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -62,8 +60,10 @@ public class RuntimeFilterContext { private final Map<PhysicalHashJoin, List<RuntimeFilter>> runtimeFilterOnHashJoinNode = Maps.newHashMap(); - // Alias's child to itself. - private final Map<Slot, NamedExpression> aliasChildToSelf = Maps.newHashMap(); + // alias -> alias's child, if there's a key that is alias's child, the key-value will change by this way + // Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv will be C -> A. + // you can see disjoint set data structure to learn the processing detailed. + private final Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = Maps.newHashMap(); private final Map<Slot, OlapScanNode> scanNodeOfLegacyRuntimeFilterTarget = Maps.newHashMap(); @@ -86,36 +86,21 @@ public class RuntimeFilterContext { return limits; } - public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) { - Preconditions.checkArgument(Arrays.stream(filters) - .allMatch(filter -> filter.getTargetExpr().getExprId() == id)); - this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()) - .addAll(Arrays.asList(filters)); + public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) { + Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id); + this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()).add(filter); } - public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) { - return targetExprIdToFilter.get(id); - } - - public void removeFilters(ExprId id) { - targetExprIdToFilter.remove(id); - } - - public void setTargetsOnScanNode(RelationId id, Slot... slots) { - this.targetOnOlapScanNodeMap.computeIfAbsent(id, k -> Lists.newArrayList()) - .addAll(Arrays.asList(slots)); - } - - public <K, V> void setKVInNormalMap(@NotNull Map<K, V> map, K key, V value) { - map.put(key, value); + public void setTargetsOnScanNode(RelationId id, Slot slot) { + this.targetOnOlapScanNodeMap.computeIfAbsent(id, k -> Lists.newArrayList()).add(slot); } public Map<ExprId, SlotRef> getExprIdToOlapScanNodeSlotRef() { return exprIdToOlapScanNodeSlotRef; } - public Map<Slot, NamedExpression> getAliasChildToSelf() { - return aliasChildToSelf; + public Map<NamedExpression, Pair<RelationId, NamedExpression>> getAliasTransferMap() { + return aliasTransferMap; } public Map<Slot, OlapScanNode> getScanNodeOfLegacyRuntimeFilterTarget() { @@ -143,14 +128,6 @@ public class RuntimeFilterContext { return legacyFilters; } - public void setLegacyFilter(org.apache.doris.planner.RuntimeFilter filter) { - this.legacyFilters.add(filter); - } - - public <K, V> boolean checkExistKey(@NotNull Map<K, V> map, K key) { - return map.containsKey(key); - } - /** * get nereids runtime filters * @return nereids runtime filters @@ -166,21 +143,6 @@ public class RuntimeFilterContext { return filters; } - /** - * get the slot list of the same olap scan node of the input slot. - * @param slot slot - * @return slot list - */ - public List<NamedExpression> getSlotListOfTheSameSlotAtOlapScanNode(Slot slot) { - ImmutableList.Builder<NamedExpression> builder = ImmutableList.builder(); - NamedExpression expr = slot; - do { - builder.add(expr); - expr = aliasChildToSelf.get(expr.toSlot()); - } while (expr != null); - return builder.build(); - } - public void setTargetNullCount() { targetNullCount++; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index cc8531bafc..51c84f2dd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -18,17 +18,23 @@ package org.apache.doris.nereids.processor.post; import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.Pair; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; +import org.apache.doris.nereids.util.JoinUtils; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.thrift.TRuntimeFilterType; @@ -36,7 +42,8 @@ import com.google.common.collect.ImmutableSet; import java.util.Arrays; import java.util.List; -import java.util.Objects; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -44,15 +51,12 @@ import java.util.stream.Collectors; * generate runtime filter */ public class RuntimeFilterGenerator extends PlanPostProcessor { - - private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator(); - - private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of( + private static final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of( JoinType.LEFT_ANTI_JOIN, - JoinType.RIGHT_ANTI_JOIN, JoinType.FULL_OUTER_JOIN, JoinType.LEFT_OUTER_JOIN ); + private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator(); /** * the runtime filter generator run at the phase of post process and plan translation of nereids planner. @@ -72,94 +76,80 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); if (deniedJoinType.contains(join.getJoinType())) { - /* TODO: translate left outer join to inner join if there are inner join ancestors - * if it has encountered inner join, like - * a=b - * / \ - * / \ - * / \ - * / \ - * left join-->a=c b - * / \ - * / \ - * / \ - * / \ - * a c - * runtime filter whose src expr is b can take effect on c. - * but now checking the inner join is unsupported. we may support it at later version. - */ - join.getOutput().forEach(slot -> ctx.removeFilters(slot.getExprId())); + // copy to avoid bug when next call of getOutputSet() + Set<Slot> slots = join.getOutputSet(); + slots.forEach(aliasTransferMap::remove); } else { - List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()).filter(type -> - (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) + List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()) + .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); AtomicInteger cnt = new AtomicInteger(); join.getHashJoinConjuncts().stream() .map(EqualTo.class::cast) + // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. // TODO: we will support it in later version. - /*.peek(expr -> { - // target is always the expr at the two side of equal of hash conjunctions. - // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. - List<SlotReference> slots = expr.children().stream().filter(SlotReference.class::isInstance) - .map(SlotReference.class::cast).collect(Collectors.toList()); - if (slots.size() != 2 - || !(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) - || ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) { + .forEach(expr -> legalTypes.forEach(type -> { + Pair<Expression, Expression> normalizedChildren = checkAndMaybeSwapChild(expr, join); + // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // contains join with denied join type. for example: a left join b on a.id = b.id + if (normalizedChildren == null + || !aliasTransferMap.containsKey((Slot) normalizedChildren.first)) { return; } - int tag = ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0 : 1; - // generate runtime filter to associated expr. for example, a = b and a = c, RF b -> a can - // generate RF b -> c - List<RuntimeFilter> copiedRuntimeFilter = ctx.getFiltersByTargetExprId(slots.get(tag) - .getExprId()).stream() - .map(filter -> new RuntimeFilter(generator.getNextId(), filter.getSrcExpr(), - slots.get(tag ^ 1), filter.getType(), filter.getExprOrder(), join)) - .collect(Collectors.toList()); - ctx.setTargetExprIdToFilters(slots.get(tag ^ 1).getExprId(), - copiedRuntimeFilter.toArray(new RuntimeFilter[0])); - })*/ - .forEach(expr -> legalTypes.stream() - .map(type -> RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr, - type, cnt.getAndIncrement(), join)) - .filter(Objects::nonNull) - .forEach(filter -> - ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter))); + Pair<Slot, Slot> slots = Pair.of( + aliasTransferMap.get((Slot) normalizedChildren.first).second.toSlot(), + ((Slot) normalizedChildren.second)); + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + slots.second, slots.first, type, + cnt.getAndIncrement(), join); + ctx.setTargetExprIdToFilter(slots.first.getExprId(), filter); + ctx.setTargetsOnScanNode( + aliasTransferMap.get((Slot) normalizedChildren.first).first, + slots.first); + })); } - join.left().accept(this, context); - join.right().accept(this, context); return join; } // TODO: support src key is agg slot. @Override public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext context) { - RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + project.child().accept(this, context); + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap + = context.getRuntimeFilterContext().getAliasTransferMap(); + // change key when encounter alias. project.getProjects().stream().filter(Alias.class::isInstance) .map(Alias.class::cast) - .filter(expr -> expr.child() instanceof SlotReference) - .forEach(expr -> ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()), expr)); - project.child().accept(this, context); + .filter(alias -> alias.child() instanceof NamedExpression + && aliasTransferMap.containsKey((NamedExpression) alias.child())) + .forEach(alias -> { + NamedExpression child = ((NamedExpression) alias.child()); + aliasTransferMap.put(alias.toSlot(), aliasTransferMap.remove(child)); + }); return project; } @Override public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, CascadesContext context) { + // add all the slots in map. RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - scan.getOutput().stream() - .filter(slot -> ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream() - .filter(expr -> ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId())) - .peek(expr -> { - if (expr.getExprId() == slot.getExprId()) { - return; - } - List<RuntimeFilter> filters = ctx.getFiltersByTargetExprId(expr.getExprId()); - ctx.removeFilters(expr.getExprId()); - filters.forEach(filter -> filter.setTargetSlot(slot)); - ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters); - }) - .count() > 0) - .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot)); + scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(scan.getId(), slot))); return scan; } + + private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo expr, + PhysicalHashJoin<? extends Plan, ? extends Plan> join) { + if (expr.child(0).equals(expr.child(1)) + || !expr.children().stream().allMatch(SlotReference.class::isInstance)) { + return null; + } + // current we assume that there are certainly different slot reference in equal to. + // they are not from the same relation. + List<Expression> children = JoinUtils.swapEqualToForChildrenOrder(expr, join.left().getOutputSet()).children(); + return Pair.of(children.get(0), children.get(1)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java index 8eeb6071d7..326bc6b634 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java @@ -17,12 +17,7 @@ package org.apache.doris.nereids.trees.plans.physical; -import org.apache.doris.common.Pair; -import org.apache.doris.nereids.trees.expressions.EqualTo; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; -import org.apache.doris.nereids.trees.expressions.SlotReference; -import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.thrift.TRuntimeFilterType; @@ -58,37 +53,6 @@ public class RuntimeFilter { this.builderNode = builderNode; } - /** - * create RF - */ - public static RuntimeFilter createRuntimeFilter(RuntimeFilterId id, EqualTo conjunction, - TRuntimeFilterType type, int exprOrder, PhysicalHashJoin node) { - Pair<Expression, Expression> srcs = checkAndMaybeSwapChild(conjunction, node); - if (srcs == null) { - return null; - } - return new RuntimeFilter(id, ((SlotReference) srcs.second), ((SlotReference) srcs.first), type, exprOrder, - node); - } - - private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo expr, - PhysicalHashJoin join) { - if (expr.children().stream().anyMatch(Literal.class::isInstance)) { - return null; - } - if (expr.child(0).equals(expr.child(1))) { - return null; - } - if (!expr.children().stream().allMatch(SlotReference.class::isInstance)) { - return null; - } - // current we assume that there are certainly different slot reference in equal to. - // they are not from the same relation. - int exchangeTag = join.child(0).getOutput().stream().anyMatch(slot -> slot.getExprId().equals( - ((SlotReference) expr.child(1)).getExprId())) ? 1 : 0; - return Pair.of(expr.child(exchangeTag), expr.child(1 ^ exchangeTag)); - } - public Slot getSrcExpr() { return srcSlot; } @@ -113,14 +77,6 @@ public class RuntimeFilter { return builderNode; } - public void setTargetSlot(Slot targetSlot) { - this.targetSlot = targetSlot; - } - - public boolean isUninitialized() { - return !finalized; - } - public void setFinalized() { this.finalized = true; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java index 7b0fe88971..617ccf0f05 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java @@ -228,9 +228,10 @@ public class RuntimeFilterTest extends SSBTestBase { private void checkRuntimeFilterExprs(List<RuntimeFilter> filters, List<Pair<String, String>> colNames) { Assertions.assertEquals(filters.size(), colNames.size()); - for (int i = 0; i < filters.size(); i++) { - Assertions.assertTrue(filters.get(i).getSrcExpr().toSql().equals(colNames.get(i).first) - && filters.get(i).getTargetExpr().toSql().equals(colNames.get(i).second)); + for (RuntimeFilter filter : filters) { + Assertions.assertTrue(colNames.contains(Pair.of( + filter.getSrcExpr().getName(), + filter.getTargetExpr().getName()))); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org