morrySnow commented on code in PR #13949:
URL: https://github.com/apache/doris/pull/13949#discussion_r1015655477


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -49,7 +56,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor 
{
 
     private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(

Review Comment:
   static



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        if (deniedJoinType.contains(join.getJoinType())) {
-            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
-             * if it has encountered inner join, like
-             *                       a=b
-             *                      /   \
-             *                     /     \
-             *                    /       \
-             *                   /         \
-             *      left join-->a=c         b
-             *                  / \
-             *                 /   \
-             *                /     \
-             *               /       \
-             *              a         c
-             * runtime filter whose src expr is b can take effect on c.
-             * but now checking the inner join is unsupported. we may support 
it at later version.
-             */
-            join.getOutput().forEach(slot -> 
ctx.removeFilters(slot.getExprId()));
-        } else {
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = ctx.getAliasTransferMap();
+        join.right().accept(this, context);
+        join.left().accept(this, context);
+        if (!deniedJoinType.contains(join.getJoinType())) {

Review Comment:
   nit
   ```suggestion
           if (deniedJoinType.contains(join.getJoinType())) {
               ...
           } else {
               ...
           }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        if (deniedJoinType.contains(join.getJoinType())) {
-            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
-             * if it has encountered inner join, like
-             *                       a=b
-             *                      /   \
-             *                     /     \
-             *                    /       \
-             *                   /         \
-             *      left join-->a=c         b
-             *                  / \
-             *                 /   \
-             *                /     \
-             *               /       \
-             *              a         c
-             * runtime filter whose src expr is b can take effect on c.
-             * but now checking the inner join is unsupported. we may support 
it at later version.
-             */
-            join.getOutput().forEach(slot -> 
ctx.removeFilters(slot.getExprId()));
-        } else {
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = ctx.getAliasTransferMap();
+        join.right().accept(this, context);
+        join.left().accept(this, context);
+        if (!deniedJoinType.contains(join.getJoinType())) {
             List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
                     (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                     .collect(Collectors.toList());

Review Comment:
   ```suggestion
               List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values())
                       .filter(type -> (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                       .collect(Collectors.toList());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        if (deniedJoinType.contains(join.getJoinType())) {
-            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
-             * if it has encountered inner join, like
-             *                       a=b
-             *                      /   \
-             *                     /     \
-             *                    /       \
-             *                   /         \
-             *      left join-->a=c         b
-             *                  / \
-             *                 /   \
-             *                /     \
-             *               /       \
-             *              a         c
-             * runtime filter whose src expr is b can take effect on c.
-             * but now checking the inner join is unsupported. we may support 
it at later version.
-             */
-            join.getOutput().forEach(slot -> 
ctx.removeFilters(slot.getExprId()));
-        } else {
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = ctx.getAliasTransferMap();
+        join.right().accept(this, context);
+        join.left().accept(this, context);
+        if (!deniedJoinType.contains(join.getJoinType())) {
             List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
                     (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                     .collect(Collectors.toList());
             AtomicInteger cnt = new AtomicInteger();
             join.getHashJoinConjuncts().stream()
                     .map(EqualTo.class::cast)
+                    // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
                     // TODO: we will support it in later version.
-                    /*.peek(expr -> {
-                        // target is always the expr at the two side of equal 
of hash conjunctions.
-                        // TODO: some complex situation cannot be handled now, 
see testPushDownThroughJoin.
-                        List<SlotReference> slots = 
expr.children().stream().filter(SlotReference.class::isInstance)
-                                
.map(SlotReference.class::cast).collect(Collectors.toList());
-                        if (slots.size() != 2
-                                || 
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
-                                || 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+                    .forEach(expr -> legalTypes.forEach(type -> {
+                        Pair<Expression, Expression> exprs = 
checkAndMaybeSwapChild(expr, join);
+                        if (exprs == null || 
!aliasTransferMap.containsKey((Slot) exprs.first)) {

Review Comment:
   plz add some comment to explain the if statement



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java:
##########
@@ -86,24 +87,19 @@ public FilterSizeLimits getLimits() {
         return limits;
     }
 
-    public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) {
-        Preconditions.checkArgument(Arrays.stream(filters)
-                .allMatch(filter -> filter.getTargetExpr().getExprId() == id));
+    public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
+        Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id);
         this.targetExprIdToFilter.computeIfAbsent(id, k -> 
Lists.newArrayList())
-                .addAll(Arrays.asList(filters));
-    }
-
-    public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) {
-        return targetExprIdToFilter.get(id);
+                .add(filter);
     }
 
     public void removeFilters(ExprId id) {
         targetExprIdToFilter.remove(id);
     }
 
-    public void setTargetsOnScanNode(RelationId id, Slot... slots) {
+    public void setTargetsOnScanNode(RelationId id, Slot slot) {
         this.targetOnOlapScanNodeMap.computeIfAbsent(id, k -> 
Lists.newArrayList())
-                .addAll(Arrays.asList(slots));
+                .add(slot);

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        if (deniedJoinType.contains(join.getJoinType())) {
-            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
-             * if it has encountered inner join, like
-             *                       a=b
-             *                      /   \
-             *                     /     \
-             *                    /       \
-             *                   /         \
-             *      left join-->a=c         b
-             *                  / \
-             *                 /   \
-             *                /     \
-             *               /       \
-             *              a         c
-             * runtime filter whose src expr is b can take effect on c.
-             * but now checking the inner join is unsupported. we may support 
it at later version.
-             */
-            join.getOutput().forEach(slot -> 
ctx.removeFilters(slot.getExprId()));
-        } else {
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = ctx.getAliasTransferMap();
+        join.right().accept(this, context);
+        join.left().accept(this, context);
+        if (!deniedJoinType.contains(join.getJoinType())) {
             List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
                     (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                     .collect(Collectors.toList());
             AtomicInteger cnt = new AtomicInteger();
             join.getHashJoinConjuncts().stream()
                     .map(EqualTo.class::cast)
+                    // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
                     // TODO: we will support it in later version.
-                    /*.peek(expr -> {
-                        // target is always the expr at the two side of equal 
of hash conjunctions.
-                        // TODO: some complex situation cannot be handled now, 
see testPushDownThroughJoin.
-                        List<SlotReference> slots = 
expr.children().stream().filter(SlotReference.class::isInstance)
-                                
.map(SlotReference.class::cast).collect(Collectors.toList());
-                        if (slots.size() != 2
-                                || 
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
-                                || 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+                    .forEach(expr -> legalTypes.forEach(type -> {
+                        Pair<Expression, Expression> exprs = 
checkAndMaybeSwapChild(expr, join);
+                        if (exprs == null || 
!aliasTransferMap.containsKey((Slot) exprs.first)) {
                             return;
                         }
-                        int tag = 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0 
: 1;
-                        // generate runtime filter to associated expr. for 
example, a = b and a = c, RF b -> a can
-                        // generate RF b -> c
-                        List<RuntimeFilter> copiedRuntimeFilter = 
ctx.getFiltersByTargetExprId(slots.get(tag)
-                                        .getExprId()).stream()
-                                .map(filter -> new 
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
-                                        slots.get(tag ^ 1), filter.getType(), 
filter.getExprOrder(), join))
-                                .collect(Collectors.toList());
-                        ctx.setTargetExprIdToFilters(slots.get(tag ^ 
1).getExprId(),
-                                copiedRuntimeFilter.toArray(new 
RuntimeFilter[0]));
-                    })*/
-                    .forEach(expr -> legalTypes.stream()
-                            .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
-                                    type, cnt.getAndIncrement(), join))
-                            .filter(Objects::nonNull)
-                            .forEach(filter ->
-                                    
ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter)));
+                        Pair<Slot, Slot> slots = Pair.of(
+                                aliasTransferMap.get((Slot) 
exprs.first).second.toSlot(),
+                                ((Slot) exprs.second));
+                        RuntimeFilter filter = new 
RuntimeFilter(generator.getNextId(),
+                                slots.second, slots.first, type,
+                                cnt.getAndIncrement(), join);
+                        ctx.setTargetExprIdToFilter(slots.first.getExprId(), 
filter);
+                        ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot) 
exprs.first)).first, slots.first);
+                    }));
+        } else {
+            // copy to avoid bug when next call of getOutputSet()
+            Set<Slot> slots = join.getOutputSet();
+            slots.forEach(aliasTransferMap::remove);
         }
-        join.left().accept(this, context);
-        join.right().accept(this, context);
         return join;
     }
 
     // TODO: support src key is agg slot.
     @Override
     public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext context) {
-        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        project.child().accept(this, context);
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap
+                = context.getRuntimeFilterContext().getAliasTransferMap();
+        // change key when encounter alias.
         project.getProjects().stream().filter(Alias.class::isInstance)
                 .map(Alias.class::cast)
-                .filter(expr -> expr.child() instanceof SlotReference)
-                .forEach(expr -> 
ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()), 
expr));
-        project.child().accept(this, context);
+                .filter(alias -> alias.child() instanceof NamedExpression
+                        && aliasTransferMap.containsKey((NamedExpression) 
alias.child()))
+                .forEach(alias -> {
+                    NamedExpression child = ((NamedExpression) alias.child());
+                    aliasTransferMap.put(alias.toSlot(), 
aliasTransferMap.remove(child));
+                });
         return project;
     }
 
     @Override
     public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, 
CascadesContext context) {
+        // add all the slots in map.
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        scan.getOutput().stream()
-                .filter(slot -> 
ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
-                        .filter(expr -> 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId()))
-                        .peek(expr -> {
-                            if (expr.getExprId() == slot.getExprId()) {
-                                return;
-                            }
-                            List<RuntimeFilter> filters = 
ctx.getFiltersByTargetExprId(expr.getExprId());
-                            ctx.removeFilters(expr.getExprId());
-                            filters.forEach(filter -> 
filter.setTargetSlot(slot));
-                            
ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters);
-                        })
-                        .count() > 0)
-                .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot));
+        scan.getOutput().forEach(slot ->
+                ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot, 
Pair.of(scan.getId(), slot)));

Review Comment:
   setKVInNormalMap function is very weird.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        if (deniedJoinType.contains(join.getJoinType())) {
-            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
-             * if it has encountered inner join, like
-             *                       a=b
-             *                      /   \
-             *                     /     \
-             *                    /       \
-             *                   /         \
-             *      left join-->a=c         b
-             *                  / \
-             *                 /   \
-             *                /     \
-             *               /       \
-             *              a         c
-             * runtime filter whose src expr is b can take effect on c.
-             * but now checking the inner join is unsupported. we may support 
it at later version.
-             */
-            join.getOutput().forEach(slot -> 
ctx.removeFilters(slot.getExprId()));
-        } else {
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = ctx.getAliasTransferMap();
+        join.right().accept(this, context);
+        join.left().accept(this, context);
+        if (!deniedJoinType.contains(join.getJoinType())) {
             List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
                     (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                     .collect(Collectors.toList());
             AtomicInteger cnt = new AtomicInteger();
             join.getHashJoinConjuncts().stream()
                     .map(EqualTo.class::cast)
+                    // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
                     // TODO: we will support it in later version.
-                    /*.peek(expr -> {
-                        // target is always the expr at the two side of equal 
of hash conjunctions.
-                        // TODO: some complex situation cannot be handled now, 
see testPushDownThroughJoin.
-                        List<SlotReference> slots = 
expr.children().stream().filter(SlotReference.class::isInstance)
-                                
.map(SlotReference.class::cast).collect(Collectors.toList());
-                        if (slots.size() != 2
-                                || 
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
-                                || 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+                    .forEach(expr -> legalTypes.forEach(type -> {
+                        Pair<Expression, Expression> exprs = 
checkAndMaybeSwapChild(expr, join);

Review Comment:
   give exprs a better name



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        if (deniedJoinType.contains(join.getJoinType())) {
-            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
-             * if it has encountered inner join, like
-             *                       a=b
-             *                      /   \
-             *                     /     \
-             *                    /       \
-             *                   /         \
-             *      left join-->a=c         b
-             *                  / \
-             *                 /   \
-             *                /     \
-             *               /       \
-             *              a         c
-             * runtime filter whose src expr is b can take effect on c.
-             * but now checking the inner join is unsupported. we may support 
it at later version.
-             */
-            join.getOutput().forEach(slot -> 
ctx.removeFilters(slot.getExprId()));
-        } else {
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = ctx.getAliasTransferMap();
+        join.right().accept(this, context);
+        join.left().accept(this, context);
+        if (!deniedJoinType.contains(join.getJoinType())) {
             List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
                     (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                     .collect(Collectors.toList());
             AtomicInteger cnt = new AtomicInteger();
             join.getHashJoinConjuncts().stream()
                     .map(EqualTo.class::cast)
+                    // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
                     // TODO: we will support it in later version.
-                    /*.peek(expr -> {
-                        // target is always the expr at the two side of equal 
of hash conjunctions.
-                        // TODO: some complex situation cannot be handled now, 
see testPushDownThroughJoin.
-                        List<SlotReference> slots = 
expr.children().stream().filter(SlotReference.class::isInstance)
-                                
.map(SlotReference.class::cast).collect(Collectors.toList());
-                        if (slots.size() != 2
-                                || 
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
-                                || 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+                    .forEach(expr -> legalTypes.forEach(type -> {
+                        Pair<Expression, Expression> exprs = 
checkAndMaybeSwapChild(expr, join);
+                        if (exprs == null || 
!aliasTransferMap.containsKey((Slot) exprs.first)) {
                             return;
                         }
-                        int tag = 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0 
: 1;
-                        // generate runtime filter to associated expr. for 
example, a = b and a = c, RF b -> a can
-                        // generate RF b -> c
-                        List<RuntimeFilter> copiedRuntimeFilter = 
ctx.getFiltersByTargetExprId(slots.get(tag)
-                                        .getExprId()).stream()
-                                .map(filter -> new 
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
-                                        slots.get(tag ^ 1), filter.getType(), 
filter.getExprOrder(), join))
-                                .collect(Collectors.toList());
-                        ctx.setTargetExprIdToFilters(slots.get(tag ^ 
1).getExprId(),
-                                copiedRuntimeFilter.toArray(new 
RuntimeFilter[0]));
-                    })*/
-                    .forEach(expr -> legalTypes.stream()
-                            .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
-                                    type, cnt.getAndIncrement(), join))
-                            .filter(Objects::nonNull)
-                            .forEach(filter ->
-                                    
ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter)));
+                        Pair<Slot, Slot> slots = Pair.of(
+                                aliasTransferMap.get((Slot) 
exprs.first).second.toSlot(),
+                                ((Slot) exprs.second));
+                        RuntimeFilter filter = new 
RuntimeFilter(generator.getNextId(),
+                                slots.second, slots.first, type,
+                                cnt.getAndIncrement(), join);
+                        ctx.setTargetExprIdToFilter(slots.first.getExprId(), 
filter);
+                        ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot) 
exprs.first)).first, slots.first);
+                    }));
+        } else {
+            // copy to avoid bug when next call of getOutputSet()
+            Set<Slot> slots = join.getOutputSet();
+            slots.forEach(aliasTransferMap::remove);
         }
-        join.left().accept(this, context);
-        join.right().accept(this, context);
         return join;
     }
 
     // TODO: support src key is agg slot.
     @Override
     public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext context) {
-        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        project.child().accept(this, context);
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap
+                = context.getRuntimeFilterContext().getAliasTransferMap();
+        // change key when encounter alias.
         project.getProjects().stream().filter(Alias.class::isInstance)
                 .map(Alias.class::cast)
-                .filter(expr -> expr.child() instanceof SlotReference)
-                .forEach(expr -> 
ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()), 
expr));
-        project.child().accept(this, context);
+                .filter(alias -> alias.child() instanceof NamedExpression
+                        && aliasTransferMap.containsKey((NamedExpression) 
alias.child()))
+                .forEach(alias -> {
+                    NamedExpression child = ((NamedExpression) alias.child());
+                    aliasTransferMap.put(alias.toSlot(), 
aliasTransferMap.remove(child));
+                });
         return project;
     }
 
     @Override
     public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, 
CascadesContext context) {
+        // add all the slots in map.
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        scan.getOutput().stream()
-                .filter(slot -> 
ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
-                        .filter(expr -> 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId()))
-                        .peek(expr -> {
-                            if (expr.getExprId() == slot.getExprId()) {
-                                return;
-                            }
-                            List<RuntimeFilter> filters = 
ctx.getFiltersByTargetExprId(expr.getExprId());
-                            ctx.removeFilters(expr.getExprId());
-                            filters.forEach(filter -> 
filter.setTargetSlot(slot));
-                            
ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters);
-                        })
-                        .count() > 0)
-                .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot));
+        scan.getOutput().forEach(slot ->
+                ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot, 
Pair.of(scan.getId(), slot)));
         return scan;
     }
+
+    private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo 
expr,
+            PhysicalHashJoin join) {
+        if (expr.children().stream().anyMatch(Literal.class::isInstance)
+                || expr.child(0).equals(expr.child(1))
+                || 
!expr.children().stream().allMatch(SlotReference.class::isInstance)) {

Review Comment:
   the last condition has covered the first condition



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        if (deniedJoinType.contains(join.getJoinType())) {
-            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
-             * if it has encountered inner join, like
-             *                       a=b
-             *                      /   \
-             *                     /     \
-             *                    /       \
-             *                   /         \
-             *      left join-->a=c         b
-             *                  / \
-             *                 /   \
-             *                /     \
-             *               /       \
-             *              a         c
-             * runtime filter whose src expr is b can take effect on c.
-             * but now checking the inner join is unsupported. we may support 
it at later version.
-             */
-            join.getOutput().forEach(slot -> 
ctx.removeFilters(slot.getExprId()));
-        } else {
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = ctx.getAliasTransferMap();
+        join.right().accept(this, context);
+        join.left().accept(this, context);
+        if (!deniedJoinType.contains(join.getJoinType())) {
             List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
                     (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                     .collect(Collectors.toList());
             AtomicInteger cnt = new AtomicInteger();
             join.getHashJoinConjuncts().stream()
                     .map(EqualTo.class::cast)
+                    // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
                     // TODO: we will support it in later version.
-                    /*.peek(expr -> {
-                        // target is always the expr at the two side of equal 
of hash conjunctions.
-                        // TODO: some complex situation cannot be handled now, 
see testPushDownThroughJoin.
-                        List<SlotReference> slots = 
expr.children().stream().filter(SlotReference.class::isInstance)
-                                
.map(SlotReference.class::cast).collect(Collectors.toList());
-                        if (slots.size() != 2
-                                || 
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
-                                || 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+                    .forEach(expr -> legalTypes.forEach(type -> {
+                        Pair<Expression, Expression> exprs = 
checkAndMaybeSwapChild(expr, join);
+                        if (exprs == null || 
!aliasTransferMap.containsKey((Slot) exprs.first)) {
                             return;
                         }
-                        int tag = 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0 
: 1;
-                        // generate runtime filter to associated expr. for 
example, a = b and a = c, RF b -> a can
-                        // generate RF b -> c
-                        List<RuntimeFilter> copiedRuntimeFilter = 
ctx.getFiltersByTargetExprId(slots.get(tag)
-                                        .getExprId()).stream()
-                                .map(filter -> new 
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
-                                        slots.get(tag ^ 1), filter.getType(), 
filter.getExprOrder(), join))
-                                .collect(Collectors.toList());
-                        ctx.setTargetExprIdToFilters(slots.get(tag ^ 
1).getExprId(),
-                                copiedRuntimeFilter.toArray(new 
RuntimeFilter[0]));
-                    })*/
-                    .forEach(expr -> legalTypes.stream()
-                            .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
-                                    type, cnt.getAndIncrement(), join))
-                            .filter(Objects::nonNull)
-                            .forEach(filter ->
-                                    
ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter)));
+                        Pair<Slot, Slot> slots = Pair.of(
+                                aliasTransferMap.get((Slot) 
exprs.first).second.toSlot(),
+                                ((Slot) exprs.second));
+                        RuntimeFilter filter = new 
RuntimeFilter(generator.getNextId(),
+                                slots.second, slots.first, type,
+                                cnt.getAndIncrement(), join);
+                        ctx.setTargetExprIdToFilter(slots.first.getExprId(), 
filter);
+                        ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot) 
exprs.first)).first, slots.first);
+                    }));
+        } else {
+            // copy to avoid bug when next call of getOutputSet()
+            Set<Slot> slots = join.getOutputSet();
+            slots.forEach(aliasTransferMap::remove);
         }
-        join.left().accept(this, context);
-        join.right().accept(this, context);
         return join;
     }
 
     // TODO: support src key is agg slot.
     @Override
     public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext context) {
-        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        project.child().accept(this, context);
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap
+                = context.getRuntimeFilterContext().getAliasTransferMap();
+        // change key when encounter alias.
         project.getProjects().stream().filter(Alias.class::isInstance)
                 .map(Alias.class::cast)
-                .filter(expr -> expr.child() instanceof SlotReference)
-                .forEach(expr -> 
ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()), 
expr));
-        project.child().accept(this, context);
+                .filter(alias -> alias.child() instanceof NamedExpression
+                        && aliasTransferMap.containsKey((NamedExpression) 
alias.child()))
+                .forEach(alias -> {
+                    NamedExpression child = ((NamedExpression) alias.child());
+                    aliasTransferMap.put(alias.toSlot(), 
aliasTransferMap.remove(child));
+                });
         return project;
     }
 
     @Override
     public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, 
CascadesContext context) {
+        // add all the slots in map.
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        scan.getOutput().stream()
-                .filter(slot -> 
ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
-                        .filter(expr -> 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId()))
-                        .peek(expr -> {
-                            if (expr.getExprId() == slot.getExprId()) {
-                                return;
-                            }
-                            List<RuntimeFilter> filters = 
ctx.getFiltersByTargetExprId(expr.getExprId());
-                            ctx.removeFilters(expr.getExprId());
-                            filters.forEach(filter -> 
filter.setTargetSlot(slot));
-                            
ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters);
-                        })
-                        .count() > 0)
-                .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot));
+        scan.getOutput().forEach(slot ->
+                ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot, 
Pair.of(scan.getId(), slot)));
         return scan;
     }
+
+    private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo 
expr,

Review Comment:
   u could use JoinUtils#swapEqualToForChildrenOrder



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java:
##########
@@ -86,24 +87,19 @@ public FilterSizeLimits getLimits() {
         return limits;
     }
 
-    public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) {
-        Preconditions.checkArgument(Arrays.stream(filters)
-                .allMatch(filter -> filter.getTargetExpr().getExprId() == id));
+    public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
+        Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id);
         this.targetExprIdToFilter.computeIfAbsent(id, k -> 
Lists.newArrayList())
-                .addAll(Arrays.asList(filters));
-    }
-
-    public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) {
-        return targetExprIdToFilter.get(id);
+                .add(filter);

Review Comment:
   not need wrap line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to