morrySnow commented on code in PR #13949: URL: https://github.com/apache/doris/pull/13949#discussion_r1015655477
########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java: ########## @@ -49,7 +56,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of( Review Comment: static ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java: ########## @@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - if (deniedJoinType.contains(join.getJoinType())) { - /* TODO: translate left outer join to inner join if there are inner join ancestors - * if it has encountered inner join, like - * a=b - * / \ - * / \ - * / \ - * / \ - * left join-->a=c b - * / \ - * / \ - * / \ - * / \ - * a c - * runtime filter whose src expr is b can take effect on c. - * but now checking the inner join is unsupported. we may support it at later version. - */ - join.getOutput().forEach(slot -> ctx.removeFilters(slot.getExprId())); - } else { + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); + if (!deniedJoinType.contains(join.getJoinType())) { Review Comment: nit ```suggestion if (deniedJoinType.contains(join.getJoinType())) { ... } else { ... } ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java: ########## @@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - if (deniedJoinType.contains(join.getJoinType())) { - /* TODO: translate left outer join to inner join if there are inner join ancestors - * if it has encountered inner join, like - * a=b - * / \ - * / \ - * / \ - * / \ - * left join-->a=c b - * / \ - * / \ - * / \ - * / \ - * a c - * runtime filter whose src expr is b can take effect on c. - * but now checking the inner join is unsupported. we may support it at later version. - */ - join.getOutput().forEach(slot -> ctx.removeFilters(slot.getExprId())); - } else { + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); + if (!deniedJoinType.contains(join.getJoinType())) { List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()).filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); Review Comment: ```suggestion List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()) .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java: ########## @@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - if (deniedJoinType.contains(join.getJoinType())) { - /* TODO: translate left outer join to inner join if there are inner join ancestors - * if it has encountered inner join, like - * a=b - * / \ - * / \ - * / \ - * / \ - * left join-->a=c b - * / \ - * / \ - * / \ - * / \ - * a c - * runtime filter whose src expr is b can take effect on c. - * but now checking the inner join is unsupported. we may support it at later version. - */ - join.getOutput().forEach(slot -> ctx.removeFilters(slot.getExprId())); - } else { + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); + if (!deniedJoinType.contains(join.getJoinType())) { List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()).filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); AtomicInteger cnt = new AtomicInteger(); join.getHashJoinConjuncts().stream() .map(EqualTo.class::cast) + // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. // TODO: we will support it in later version. - /*.peek(expr -> { - // target is always the expr at the two side of equal of hash conjunctions. - // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. - List<SlotReference> slots = expr.children().stream().filter(SlotReference.class::isInstance) - .map(SlotReference.class::cast).collect(Collectors.toList()); - if (slots.size() != 2 - || !(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) - || ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) { + .forEach(expr -> legalTypes.forEach(type -> { + Pair<Expression, Expression> exprs = checkAndMaybeSwapChild(expr, join); + if (exprs == null || !aliasTransferMap.containsKey((Slot) exprs.first)) { Review Comment: plz add some comment to explain the if statement ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java: ########## @@ -86,24 +87,19 @@ public FilterSizeLimits getLimits() { return limits; } - public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) { - Preconditions.checkArgument(Arrays.stream(filters) - .allMatch(filter -> filter.getTargetExpr().getExprId() == id)); + public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) { + Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id); this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()) - .addAll(Arrays.asList(filters)); - } - - public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) { - return targetExprIdToFilter.get(id); + .add(filter); } public void removeFilters(ExprId id) { targetExprIdToFilter.remove(id); } - public void setTargetsOnScanNode(RelationId id, Slot... slots) { + public void setTargetsOnScanNode(RelationId id, Slot slot) { this.targetOnOlapScanNodeMap.computeIfAbsent(id, k -> Lists.newArrayList()) - .addAll(Arrays.asList(slots)); + .add(slot); Review Comment: ditto ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java: ########## @@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - if (deniedJoinType.contains(join.getJoinType())) { - /* TODO: translate left outer join to inner join if there are inner join ancestors - * if it has encountered inner join, like - * a=b - * / \ - * / \ - * / \ - * / \ - * left join-->a=c b - * / \ - * / \ - * / \ - * / \ - * a c - * runtime filter whose src expr is b can take effect on c. - * but now checking the inner join is unsupported. we may support it at later version. - */ - join.getOutput().forEach(slot -> ctx.removeFilters(slot.getExprId())); - } else { + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); + if (!deniedJoinType.contains(join.getJoinType())) { List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()).filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); AtomicInteger cnt = new AtomicInteger(); join.getHashJoinConjuncts().stream() .map(EqualTo.class::cast) + // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. // TODO: we will support it in later version. - /*.peek(expr -> { - // target is always the expr at the two side of equal of hash conjunctions. - // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. - List<SlotReference> slots = expr.children().stream().filter(SlotReference.class::isInstance) - .map(SlotReference.class::cast).collect(Collectors.toList()); - if (slots.size() != 2 - || !(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) - || ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) { + .forEach(expr -> legalTypes.forEach(type -> { + Pair<Expression, Expression> exprs = checkAndMaybeSwapChild(expr, join); + if (exprs == null || !aliasTransferMap.containsKey((Slot) exprs.first)) { return; } - int tag = ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0 : 1; - // generate runtime filter to associated expr. for example, a = b and a = c, RF b -> a can - // generate RF b -> c - List<RuntimeFilter> copiedRuntimeFilter = ctx.getFiltersByTargetExprId(slots.get(tag) - .getExprId()).stream() - .map(filter -> new RuntimeFilter(generator.getNextId(), filter.getSrcExpr(), - slots.get(tag ^ 1), filter.getType(), filter.getExprOrder(), join)) - .collect(Collectors.toList()); - ctx.setTargetExprIdToFilters(slots.get(tag ^ 1).getExprId(), - copiedRuntimeFilter.toArray(new RuntimeFilter[0])); - })*/ - .forEach(expr -> legalTypes.stream() - .map(type -> RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr, - type, cnt.getAndIncrement(), join)) - .filter(Objects::nonNull) - .forEach(filter -> - ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter))); + Pair<Slot, Slot> slots = Pair.of( + aliasTransferMap.get((Slot) exprs.first).second.toSlot(), + ((Slot) exprs.second)); + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + slots.second, slots.first, type, + cnt.getAndIncrement(), join); + ctx.setTargetExprIdToFilter(slots.first.getExprId(), filter); + ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot) exprs.first)).first, slots.first); + })); + } else { + // copy to avoid bug when next call of getOutputSet() + Set<Slot> slots = join.getOutputSet(); + slots.forEach(aliasTransferMap::remove); } - join.left().accept(this, context); - join.right().accept(this, context); return join; } // TODO: support src key is agg slot. @Override public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext context) { - RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + project.child().accept(this, context); + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap + = context.getRuntimeFilterContext().getAliasTransferMap(); + // change key when encounter alias. project.getProjects().stream().filter(Alias.class::isInstance) .map(Alias.class::cast) - .filter(expr -> expr.child() instanceof SlotReference) - .forEach(expr -> ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()), expr)); - project.child().accept(this, context); + .filter(alias -> alias.child() instanceof NamedExpression + && aliasTransferMap.containsKey((NamedExpression) alias.child())) + .forEach(alias -> { + NamedExpression child = ((NamedExpression) alias.child()); + aliasTransferMap.put(alias.toSlot(), aliasTransferMap.remove(child)); + }); return project; } @Override public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, CascadesContext context) { + // add all the slots in map. RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - scan.getOutput().stream() - .filter(slot -> ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream() - .filter(expr -> ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId())) - .peek(expr -> { - if (expr.getExprId() == slot.getExprId()) { - return; - } - List<RuntimeFilter> filters = ctx.getFiltersByTargetExprId(expr.getExprId()); - ctx.removeFilters(expr.getExprId()); - filters.forEach(filter -> filter.setTargetSlot(slot)); - ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters); - }) - .count() > 0) - .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot)); + scan.getOutput().forEach(slot -> + ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot, Pair.of(scan.getId(), slot))); Review Comment: setKVInNormalMap function is very weird. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java: ########## @@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - if (deniedJoinType.contains(join.getJoinType())) { - /* TODO: translate left outer join to inner join if there are inner join ancestors - * if it has encountered inner join, like - * a=b - * / \ - * / \ - * / \ - * / \ - * left join-->a=c b - * / \ - * / \ - * / \ - * / \ - * a c - * runtime filter whose src expr is b can take effect on c. - * but now checking the inner join is unsupported. we may support it at later version. - */ - join.getOutput().forEach(slot -> ctx.removeFilters(slot.getExprId())); - } else { + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); + if (!deniedJoinType.contains(join.getJoinType())) { List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()).filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); AtomicInteger cnt = new AtomicInteger(); join.getHashJoinConjuncts().stream() .map(EqualTo.class::cast) + // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. // TODO: we will support it in later version. - /*.peek(expr -> { - // target is always the expr at the two side of equal of hash conjunctions. - // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. - List<SlotReference> slots = expr.children().stream().filter(SlotReference.class::isInstance) - .map(SlotReference.class::cast).collect(Collectors.toList()); - if (slots.size() != 2 - || !(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) - || ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) { + .forEach(expr -> legalTypes.forEach(type -> { + Pair<Expression, Expression> exprs = checkAndMaybeSwapChild(expr, join); Review Comment: give exprs a better name ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java: ########## @@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - if (deniedJoinType.contains(join.getJoinType())) { - /* TODO: translate left outer join to inner join if there are inner join ancestors - * if it has encountered inner join, like - * a=b - * / \ - * / \ - * / \ - * / \ - * left join-->a=c b - * / \ - * / \ - * / \ - * / \ - * a c - * runtime filter whose src expr is b can take effect on c. - * but now checking the inner join is unsupported. we may support it at later version. - */ - join.getOutput().forEach(slot -> ctx.removeFilters(slot.getExprId())); - } else { + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); + if (!deniedJoinType.contains(join.getJoinType())) { List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()).filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); AtomicInteger cnt = new AtomicInteger(); join.getHashJoinConjuncts().stream() .map(EqualTo.class::cast) + // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. // TODO: we will support it in later version. - /*.peek(expr -> { - // target is always the expr at the two side of equal of hash conjunctions. - // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. - List<SlotReference> slots = expr.children().stream().filter(SlotReference.class::isInstance) - .map(SlotReference.class::cast).collect(Collectors.toList()); - if (slots.size() != 2 - || !(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) - || ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) { + .forEach(expr -> legalTypes.forEach(type -> { + Pair<Expression, Expression> exprs = checkAndMaybeSwapChild(expr, join); + if (exprs == null || !aliasTransferMap.containsKey((Slot) exprs.first)) { return; } - int tag = ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0 : 1; - // generate runtime filter to associated expr. for example, a = b and a = c, RF b -> a can - // generate RF b -> c - List<RuntimeFilter> copiedRuntimeFilter = ctx.getFiltersByTargetExprId(slots.get(tag) - .getExprId()).stream() - .map(filter -> new RuntimeFilter(generator.getNextId(), filter.getSrcExpr(), - slots.get(tag ^ 1), filter.getType(), filter.getExprOrder(), join)) - .collect(Collectors.toList()); - ctx.setTargetExprIdToFilters(slots.get(tag ^ 1).getExprId(), - copiedRuntimeFilter.toArray(new RuntimeFilter[0])); - })*/ - .forEach(expr -> legalTypes.stream() - .map(type -> RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr, - type, cnt.getAndIncrement(), join)) - .filter(Objects::nonNull) - .forEach(filter -> - ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter))); + Pair<Slot, Slot> slots = Pair.of( + aliasTransferMap.get((Slot) exprs.first).second.toSlot(), + ((Slot) exprs.second)); + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + slots.second, slots.first, type, + cnt.getAndIncrement(), join); + ctx.setTargetExprIdToFilter(slots.first.getExprId(), filter); + ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot) exprs.first)).first, slots.first); + })); + } else { + // copy to avoid bug when next call of getOutputSet() + Set<Slot> slots = join.getOutputSet(); + slots.forEach(aliasTransferMap::remove); } - join.left().accept(this, context); - join.right().accept(this, context); return join; } // TODO: support src key is agg slot. @Override public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext context) { - RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + project.child().accept(this, context); + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap + = context.getRuntimeFilterContext().getAliasTransferMap(); + // change key when encounter alias. project.getProjects().stream().filter(Alias.class::isInstance) .map(Alias.class::cast) - .filter(expr -> expr.child() instanceof SlotReference) - .forEach(expr -> ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()), expr)); - project.child().accept(this, context); + .filter(alias -> alias.child() instanceof NamedExpression + && aliasTransferMap.containsKey((NamedExpression) alias.child())) + .forEach(alias -> { + NamedExpression child = ((NamedExpression) alias.child()); + aliasTransferMap.put(alias.toSlot(), aliasTransferMap.remove(child)); + }); return project; } @Override public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, CascadesContext context) { + // add all the slots in map. RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - scan.getOutput().stream() - .filter(slot -> ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream() - .filter(expr -> ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId())) - .peek(expr -> { - if (expr.getExprId() == slot.getExprId()) { - return; - } - List<RuntimeFilter> filters = ctx.getFiltersByTargetExprId(expr.getExprId()); - ctx.removeFilters(expr.getExprId()); - filters.forEach(filter -> filter.setTargetSlot(slot)); - ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters); - }) - .count() > 0) - .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot)); + scan.getOutput().forEach(slot -> + ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot, Pair.of(scan.getId(), slot))); return scan; } + + private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo expr, + PhysicalHashJoin join) { + if (expr.children().stream().anyMatch(Literal.class::isInstance) + || expr.child(0).equals(expr.child(1)) + || !expr.children().stream().allMatch(SlotReference.class::isInstance)) { Review Comment: the last condition has covered the first condition ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java: ########## @@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - if (deniedJoinType.contains(join.getJoinType())) { - /* TODO: translate left outer join to inner join if there are inner join ancestors - * if it has encountered inner join, like - * a=b - * / \ - * / \ - * / \ - * / \ - * left join-->a=c b - * / \ - * / \ - * / \ - * / \ - * a c - * runtime filter whose src expr is b can take effect on c. - * but now checking the inner join is unsupported. we may support it at later version. - */ - join.getOutput().forEach(slot -> ctx.removeFilters(slot.getExprId())); - } else { + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); + if (!deniedJoinType.contains(join.getJoinType())) { List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values()).filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); AtomicInteger cnt = new AtomicInteger(); join.getHashJoinConjuncts().stream() .map(EqualTo.class::cast) + // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. // TODO: we will support it in later version. - /*.peek(expr -> { - // target is always the expr at the two side of equal of hash conjunctions. - // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. - List<SlotReference> slots = expr.children().stream().filter(SlotReference.class::isInstance) - .map(SlotReference.class::cast).collect(Collectors.toList()); - if (slots.size() != 2 - || !(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) - || ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) { + .forEach(expr -> legalTypes.forEach(type -> { + Pair<Expression, Expression> exprs = checkAndMaybeSwapChild(expr, join); + if (exprs == null || !aliasTransferMap.containsKey((Slot) exprs.first)) { return; } - int tag = ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0 : 1; - // generate runtime filter to associated expr. for example, a = b and a = c, RF b -> a can - // generate RF b -> c - List<RuntimeFilter> copiedRuntimeFilter = ctx.getFiltersByTargetExprId(slots.get(tag) - .getExprId()).stream() - .map(filter -> new RuntimeFilter(generator.getNextId(), filter.getSrcExpr(), - slots.get(tag ^ 1), filter.getType(), filter.getExprOrder(), join)) - .collect(Collectors.toList()); - ctx.setTargetExprIdToFilters(slots.get(tag ^ 1).getExprId(), - copiedRuntimeFilter.toArray(new RuntimeFilter[0])); - })*/ - .forEach(expr -> legalTypes.stream() - .map(type -> RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr, - type, cnt.getAndIncrement(), join)) - .filter(Objects::nonNull) - .forEach(filter -> - ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter))); + Pair<Slot, Slot> slots = Pair.of( + aliasTransferMap.get((Slot) exprs.first).second.toSlot(), + ((Slot) exprs.second)); + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + slots.second, slots.first, type, + cnt.getAndIncrement(), join); + ctx.setTargetExprIdToFilter(slots.first.getExprId(), filter); + ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot) exprs.first)).first, slots.first); + })); + } else { + // copy to avoid bug when next call of getOutputSet() + Set<Slot> slots = join.getOutputSet(); + slots.forEach(aliasTransferMap::remove); } - join.left().accept(this, context); - join.right().accept(this, context); return join; } // TODO: support src key is agg slot. @Override public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext context) { - RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + project.child().accept(this, context); + Map<NamedExpression, Pair<RelationId, NamedExpression>> aliasTransferMap + = context.getRuntimeFilterContext().getAliasTransferMap(); + // change key when encounter alias. project.getProjects().stream().filter(Alias.class::isInstance) .map(Alias.class::cast) - .filter(expr -> expr.child() instanceof SlotReference) - .forEach(expr -> ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()), expr)); - project.child().accept(this, context); + .filter(alias -> alias.child() instanceof NamedExpression + && aliasTransferMap.containsKey((NamedExpression) alias.child())) + .forEach(alias -> { + NamedExpression child = ((NamedExpression) alias.child()); + aliasTransferMap.put(alias.toSlot(), aliasTransferMap.remove(child)); + }); return project; } @Override public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, CascadesContext context) { + // add all the slots in map. RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - scan.getOutput().stream() - .filter(slot -> ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream() - .filter(expr -> ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId())) - .peek(expr -> { - if (expr.getExprId() == slot.getExprId()) { - return; - } - List<RuntimeFilter> filters = ctx.getFiltersByTargetExprId(expr.getExprId()); - ctx.removeFilters(expr.getExprId()); - filters.forEach(filter -> filter.setTargetSlot(slot)); - ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters); - }) - .count() > 0) - .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot)); + scan.getOutput().forEach(slot -> + ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot, Pair.of(scan.getId(), slot))); return scan; } + + private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo expr, Review Comment: u could use JoinUtils#swapEqualToForChildrenOrder ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java: ########## @@ -86,24 +87,19 @@ public FilterSizeLimits getLimits() { return limits; } - public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) { - Preconditions.checkArgument(Arrays.stream(filters) - .allMatch(filter -> filter.getTargetExpr().getExprId() == id)); + public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) { + Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id); this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()) - .addAll(Arrays.asList(filters)); - } - - public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) { - return targetExprIdToFilter.get(id); + .add(filter); Review Comment: not need wrap line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org