morrySnow commented on code in PR #12182:
URL: https://github.com/apache/doris/pull/12182#discussion_r962457173


##########
fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java:
##########
@@ -92,4 +92,6 @@ public boolean isBlockQuery() {
 
     public abstract DescriptorTable getDescTable();
 
+    public abstract List<RuntimeFilter> getRuntimeFilter();

Review Comment:
   ```suggestion
       public abstract List<RuntimeFilter> getRuntimeFilters();
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()
+                .map(plan -> plan.accept(this, 
ctx)).collect(Collectors.toList()));
+    }
+
+    /**
+     * s
+     * @param join s
+     * @param node s
+     * @param ctx s
+     */

Review Comment:
   add meaningful comment. u need to explain we must use 
translateRuntimeFilterTarget before this function



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()
+                .map(plan -> plan.accept(this, 
ctx)).collect(Collectors.toList()));
+    }
+
+    /**
+     * s
+     * @param join s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilter(PhysicalHashJoin<Plan, Plan> join,
+            HashJoinNode node, PlanTranslatorContext ctx) {
+        if (!sessionVariable.isEnableNereidsRuntimeFilter()) {
+            return;
+        }
+        join.getHashJoinConjuncts().forEach(expr -> {
+            ExprId exprId = ((SlotReference) expr.child(0)).getExprId();
+            TupleId tid = 
ctx.findSlotRef(exprId).getDesc().getParent().getId();
+            if (filterTargetByExprId.containsKey(exprId) && 
filtersByExprId.containsKey(exprId)) {
+                List<RuntimeFilter.RuntimeFilterTarget> targets = 
filterTargetByExprId.get(exprId);
+                origFilters.addAll(filtersByExprId.get(exprId).stream()
+                        .filter(RuntimeFilter::isUninitialized)
+                        .map(filter -> {
+                            SlotRef src = 
ctx.findSlotRef(filter.getSrcExpr().getExprId());
+                            SlotRef target = 
exprIdToOlapScanNodeSlotRef.get(filter.getTargetExpr().getExprId());
+                            org.apache.doris.planner.RuntimeFilter origFilter
+                                    = 
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+                                    filter.getId(), node, src, 
filter.getExprOrder(), target,
+                                    ImmutableMap.of(tid, 
ImmutableList.of(target.getSlotId())),
+                                    filter.getType(), limits
+                            );
+                            targets.stream()
+                                    .map(nereidsTarget -> 
nereidsTarget.toOriginRuntimeFilterTarget(
+                                            node,
+                                            
exprIdToOlapScanNodeSlotRef.get(nereidsTarget.getExpr().getExprId())))
+                                    .forEach(origFilter::addTarget);
+                            
origFilter.setIsBroadcast(node.getDistributionMode() == 
DistributionMode.BROADCAST);
+                            origFilter.markFinalized();
+                            origFilter.assignToPlanNodes();
+                            origFilter.extractTargetsPosition();
+                            filter.setFinalized();
+                            return origFilter;
+                        }).collect(Collectors.toList()));
+            }
+        });
+    }
+
+    /**
+     * s
+     * @param scan s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilterTarget(PhysicalOlapScan scan, 
OlapScanNode node, PlanTranslatorContext ctx) {
+        scan.getOutput().stream()
+                .map(slot -> Pair.of(((SlotReference) slot), 
slotTransfer(slot.getExprId())))
+                .filter(pair -> filtersByExprId.containsKey(pair.second))
+                .peek(pair -> 
exprIdToOlapScanNodeSlotRef.put(pair.first.getExprId(),
+                        ctx.findSlotRef(pair.first.getExprId())))
+                .forEach(pair -> filtersByExprId.get(pair.second)
+                        .stream().peek(nereidsFilter -> 
filterTargetByExprId.computeIfAbsent(
+                                nereidsFilter.getTargetExpr().getExprId(),
+                                k -> new ArrayList<>()).add(
+                                        new 
RuntimeFilter.RuntimeFilterTarget(node, pair.first)))
+                        .filter(nereidsFilter -> 
!nereidsFilter.getTargetExpr().getExprId()
+                                .equals(pair.first.getExprId()))
+                        .forEach(nereidsFilter -> 
nereidsFilter.setTargetSlot(pair.first)));
+    }
+
+    public List<org.apache.doris.planner.RuntimeFilter> getRuntimeFilters() {
+        return origFilters;
+    }
+
+    /**
+     * s
+     * @return s
+     */

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()
+                .map(plan -> plan.accept(this, 
ctx)).collect(Collectors.toList()));
+    }
+
+    /**
+     * s
+     * @param join s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilter(PhysicalHashJoin<Plan, Plan> join,
+            HashJoinNode node, PlanTranslatorContext ctx) {
+        if (!sessionVariable.isEnableNereidsRuntimeFilter()) {
+            return;
+        }
+        join.getHashJoinConjuncts().forEach(expr -> {
+            ExprId exprId = ((SlotReference) expr.child(0)).getExprId();

Review Comment:
   we need do instanceof before type coercion



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java:
##########
@@ -47,6 +51,8 @@ public class PlanTranslatorContext {
 
     private final DescriptorTable descTable = new DescriptorTable();
 
+    private RuntimeFilterGenerator runtimeFilterGenerator = null;

Review Comment:
   remove `= null`, set it to final, and init it in constructor with no arg



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostprocessors.java:
##########
@@ -45,6 +45,8 @@ public PhysicalPlan process(PhysicalPlan physicalPlan) {
 
     public List<PlanPostprocessor> getProcessors() {
         // add processor if we need
-        return ImmutableList.of();
+        return ImmutableList.of(
+                cascadesContext.createRuntimeFilterGenerator()
+        );

Review Comment:
   ```suggestion
           return 
ImmutableList.of(cascadesContext.createRuntimeFilterGenerator());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java:
##########
@@ -145,6 +148,15 @@ public boolean subqueryIsAnalyzed(SubqueryExpr 
subqueryExpr) {
         return subqueryExprIsAnalyzed.get(subqueryExpr);
     }
 
+    public RuntimeFilterGenerator getRuntimeGenerator() {
+        return runtimeFilterGenerator;
+    }
+
+    public RuntimeFilterGenerator createRuntimeFilterGenerator() {
+        this.runtimeFilterGenerator = new 
RuntimeFilterGenerator(this.getConnectContext().getSessionVariable());

Review Comment:
   why not create it in constructor



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {

Review Comment:
   add a summary comment to explain how it works and what situation could be 
handled



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()
+                .map(plan -> plan.accept(this, 
ctx)).collect(Collectors.toList()));
+    }
+
+    /**
+     * s
+     * @param join s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilter(PhysicalHashJoin<Plan, Plan> join,
+            HashJoinNode node, PlanTranslatorContext ctx) {
+        if (!sessionVariable.isEnableNereidsRuntimeFilter()) {
+            return;
+        }
+        join.getHashJoinConjuncts().forEach(expr -> {
+            ExprId exprId = ((SlotReference) expr.child(0)).getExprId();
+            TupleId tid = 
ctx.findSlotRef(exprId).getDesc().getParent().getId();
+            if (filterTargetByExprId.containsKey(exprId) && 
filtersByExprId.containsKey(exprId)) {
+                List<RuntimeFilter.RuntimeFilterTarget> targets = 
filterTargetByExprId.get(exprId);
+                origFilters.addAll(filtersByExprId.get(exprId).stream()
+                        .filter(RuntimeFilter::isUninitialized)
+                        .map(filter -> {
+                            SlotRef src = 
ctx.findSlotRef(filter.getSrcExpr().getExprId());
+                            SlotRef target = 
exprIdToOlapScanNodeSlotRef.get(filter.getTargetExpr().getExprId());
+                            org.apache.doris.planner.RuntimeFilter origFilter
+                                    = 
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+                                    filter.getId(), node, src, 
filter.getExprOrder(), target,
+                                    ImmutableMap.of(tid, 
ImmutableList.of(target.getSlotId())),
+                                    filter.getType(), limits
+                            );
+                            targets.stream()
+                                    .map(nereidsTarget -> 
nereidsTarget.toOriginRuntimeFilterTarget(
+                                            node,
+                                            
exprIdToOlapScanNodeSlotRef.get(nereidsTarget.getExpr().getExprId())))
+                                    .forEach(origFilter::addTarget);
+                            
origFilter.setIsBroadcast(node.getDistributionMode() == 
DistributionMode.BROADCAST);
+                            origFilter.markFinalized();
+                            origFilter.assignToPlanNodes();
+                            origFilter.extractTargetsPosition();
+                            filter.setFinalized();
+                            return origFilter;
+                        }).collect(Collectors.toList()));
+            }
+        });
+    }
+
+    /**
+     * s
+     * @param scan s
+     * @param node s
+     * @param ctx s
+     */

Review Comment:
   write some meaning words here



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+/**
+ * runtime filter
+ */
+public class RuntimeFilter {
+
+    private final SlotReference srcSlot;
+
+    private SlotReference targetSlot;
+
+    private final RuntimeFilterId id;
+
+    private final TRuntimeFilterType type;
+
+    private final int exprOrder;
+
+    private boolean finalized = false;
+
+    private RuntimeFilter(RuntimeFilterId id, SlotReference src, SlotReference 
target, TRuntimeFilterType type,
+            int exprOrder) {
+        this.id = id;
+        this.srcSlot = src;
+        this.targetSlot = target;
+        this.type = type;
+        this.exprOrder = exprOrder;
+    }
+
+    /**
+     * s
+     *
+     * @param conjunction s
+     * @param type s
+     * @param exprOrder s
+     * @param node s
+     * @return s
+     */

Review Comment:
   write some meaning comment



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java:
##########
@@ -42,7 +42,7 @@
 
     public PhysicalHashJoin(JoinType joinType, List<Expression> 
hashJoinConjuncts,
             Optional<Expression> condition, LogicalProperties 
logicalProperties,
-            LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE rightChild) {
+                            LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE 
rightChild) {

Review Comment:
   do not change this line



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java:
##########
@@ -88,7 +88,7 @@ public Plan withGroupExpression(Optional<GroupExpression> 
groupExpression) {
 
     @Override
     public Plan withLogicalProperties(Optional<LogicalProperties> 
logicalProperties) {
-        return new PhysicalHashJoin<>(joinType, hashJoinConjuncts, 
otherJoinCondition,
-                Optional.empty(), logicalProperties.get(), left(), right());
+        return new PhysicalHashJoin<>(joinType, hashJoinConjuncts, 
otherJoinCondition, Optional.empty(),

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()
+                .map(plan -> plan.accept(this, 
ctx)).collect(Collectors.toList()));
+    }
+
+    /**
+     * s
+     * @param join s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilter(PhysicalHashJoin<Plan, Plan> join,
+            HashJoinNode node, PlanTranslatorContext ctx) {
+        if (!sessionVariable.isEnableNereidsRuntimeFilter()) {
+            return;
+        }
+        join.getHashJoinConjuncts().forEach(expr -> {
+            ExprId exprId = ((SlotReference) expr.child(0)).getExprId();
+            TupleId tid = 
ctx.findSlotRef(exprId).getDesc().getParent().getId();
+            if (filterTargetByExprId.containsKey(exprId) && 
filtersByExprId.containsKey(exprId)) {
+                List<RuntimeFilter.RuntimeFilterTarget> targets = 
filterTargetByExprId.get(exprId);
+                origFilters.addAll(filtersByExprId.get(exprId).stream()
+                        .filter(RuntimeFilter::isUninitialized)
+                        .map(filter -> {
+                            SlotRef src = 
ctx.findSlotRef(filter.getSrcExpr().getExprId());
+                            SlotRef target = 
exprIdToOlapScanNodeSlotRef.get(filter.getTargetExpr().getExprId());
+                            org.apache.doris.planner.RuntimeFilter origFilter
+                                    = 
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+                                    filter.getId(), node, src, 
filter.getExprOrder(), target,
+                                    ImmutableMap.of(tid, 
ImmutableList.of(target.getSlotId())),
+                                    filter.getType(), limits
+                            );
+                            targets.stream()
+                                    .map(nereidsTarget -> 
nereidsTarget.toOriginRuntimeFilterTarget(
+                                            node,
+                                            
exprIdToOlapScanNodeSlotRef.get(nereidsTarget.getExpr().getExprId())))
+                                    .forEach(origFilter::addTarget);
+                            
origFilter.setIsBroadcast(node.getDistributionMode() == 
DistributionMode.BROADCAST);
+                            origFilter.markFinalized();
+                            origFilter.assignToPlanNodes();
+                            origFilter.extractTargetsPosition();
+                            filter.setFinalized();
+                            return origFilter;
+                        }).collect(Collectors.toList()));
+            }
+        });
+    }
+
+    /**
+     * s
+     * @param scan s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilterTarget(PhysicalOlapScan scan, 
OlapScanNode node, PlanTranslatorContext ctx) {
+        scan.getOutput().stream()
+                .map(slot -> Pair.of(((SlotReference) slot), 
slotTransfer(slot.getExprId())))
+                .filter(pair -> filtersByExprId.containsKey(pair.second))
+                .peek(pair -> 
exprIdToOlapScanNodeSlotRef.put(pair.first.getExprId(),
+                        ctx.findSlotRef(pair.first.getExprId())))
+                .forEach(pair -> filtersByExprId.get(pair.second)
+                        .stream().peek(nereidsFilter -> 
filterTargetByExprId.computeIfAbsent(
+                                nereidsFilter.getTargetExpr().getExprId(),
+                                k -> new ArrayList<>()).add(
+                                        new 
RuntimeFilter.RuntimeFilterTarget(node, pair.first)))
+                        .filter(nereidsFilter -> 
!nereidsFilter.getTargetExpr().getExprId()
+                                .equals(pair.first.getExprId()))
+                        .forEach(nereidsFilter -> 
nereidsFilter.setTargetSlot(pair.first)));
+    }
+
+    public List<org.apache.doris.planner.RuntimeFilter> getRuntimeFilters() {
+        return origFilters;
+    }
+
+    /**
+     * s
+     * @return s
+     */
+    @VisibleForTesting
+    public List<RuntimeFilter> getNereridsRuntimeFilter() {

Review Comment:
   ```suggestion
       public List<RuntimeFilter> getNereidsRuntimeFilter() {
   ```



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java:
##########
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.postprocess;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
+import org.apache.doris.nereids.datasets.ssb.SSBUtils;
+import org.apache.doris.nereids.glue.translator.PhysicalPlanTranslator;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpressionUtil;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.PlanFragment;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Optional;
+
+public class RuntimeFilterTest extends SSBTestBase {
+
+    @Override
+    public void runBeforeEach() throws Exception {
+        NamedExpressionUtil.clear();

Review Comment:
   move it to `AnalyzeCheckTestBase#runBeforeEach`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()
+                .map(plan -> plan.accept(this, 
ctx)).collect(Collectors.toList()));
+    }
+
+    /**
+     * s
+     * @param join s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilter(PhysicalHashJoin<Plan, Plan> join,
+            HashJoinNode node, PlanTranslatorContext ctx) {
+        if (!sessionVariable.isEnableNereidsRuntimeFilter()) {
+            return;
+        }
+        join.getHashJoinConjuncts().forEach(expr -> {
+            ExprId exprId = ((SlotReference) expr.child(0)).getExprId();
+            TupleId tid = 
ctx.findSlotRef(exprId).getDesc().getParent().getId();
+            if (filterTargetByExprId.containsKey(exprId) && 
filtersByExprId.containsKey(exprId)) {
+                List<RuntimeFilter.RuntimeFilterTarget> targets = 
filterTargetByExprId.get(exprId);
+                origFilters.addAll(filtersByExprId.get(exprId).stream()
+                        .filter(RuntimeFilter::isUninitialized)
+                        .map(filter -> {
+                            SlotRef src = 
ctx.findSlotRef(filter.getSrcExpr().getExprId());
+                            SlotRef target = 
exprIdToOlapScanNodeSlotRef.get(filter.getTargetExpr().getExprId());
+                            org.apache.doris.planner.RuntimeFilter origFilter
+                                    = 
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+                                    filter.getId(), node, src, 
filter.getExprOrder(), target,
+                                    ImmutableMap.of(tid, 
ImmutableList.of(target.getSlotId())),
+                                    filter.getType(), limits
+                            );
+                            targets.stream()
+                                    .map(nereidsTarget -> 
nereidsTarget.toOriginRuntimeFilterTarget(
+                                            node,
+                                            
exprIdToOlapScanNodeSlotRef.get(nereidsTarget.getExpr().getExprId())))
+                                    .forEach(origFilter::addTarget);
+                            
origFilter.setIsBroadcast(node.getDistributionMode() == 
DistributionMode.BROADCAST);
+                            origFilter.markFinalized();
+                            origFilter.assignToPlanNodes();
+                            origFilter.extractTargetsPosition();
+                            filter.setFinalized();
+                            return origFilter;
+                        }).collect(Collectors.toList()));
+            }
+        });
+    }
+
+    /**
+     * s
+     * @param scan s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilterTarget(PhysicalOlapScan scan, 
OlapScanNode node, PlanTranslatorContext ctx) {
+        scan.getOutput().stream()
+                .map(slot -> Pair.of(((SlotReference) slot), 
slotTransfer(slot.getExprId())))
+                .filter(pair -> filtersByExprId.containsKey(pair.second))
+                .peek(pair -> 
exprIdToOlapScanNodeSlotRef.put(pair.first.getExprId(),
+                        ctx.findSlotRef(pair.first.getExprId())))
+                .forEach(pair -> filtersByExprId.get(pair.second)
+                        .stream().peek(nereidsFilter -> 
filterTargetByExprId.computeIfAbsent(
+                                nereidsFilter.getTargetExpr().getExprId(),
+                                k -> new ArrayList<>()).add(
+                                        new 
RuntimeFilter.RuntimeFilterTarget(node, pair.first)))
+                        .filter(nereidsFilter -> 
!nereidsFilter.getTargetExpr().getExprId()
+                                .equals(pair.first.getExprId()))
+                        .forEach(nereidsFilter -> 
nereidsFilter.setTargetSlot(pair.first)));
+    }
+
+    public List<org.apache.doris.planner.RuntimeFilter> getRuntimeFilters() {
+        return origFilters;
+    }
+
+    /**
+     * s
+     * @return s
+     */
+    @VisibleForTesting
+    public List<RuntimeFilter> getNereridsRuntimeFilter() {
+        List<RuntimeFilter> filters = filtersByExprId.values().stream()
+                .reduce(Lists.newArrayList(), (l, r) -> {
+                    l.addAll(r);
+                    return l;
+                });
+        filters.sort((a, b) -> a.getId().compareTo(b.getId()));

Review Comment:
   i think the lambda expression could simply to RuntimeFilter::getId



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+/**
+ * runtime filter
+ */
+public class RuntimeFilter {
+
+    private final SlotReference srcSlot;
+
+    private SlotReference targetSlot;
+
+    private final RuntimeFilterId id;
+
+    private final TRuntimeFilterType type;
+
+    private final int exprOrder;
+
+    private boolean finalized = false;
+
+    private RuntimeFilter(RuntimeFilterId id, SlotReference src, SlotReference 
target, TRuntimeFilterType type,
+            int exprOrder) {
+        this.id = id;
+        this.srcSlot = src;
+        this.targetSlot = target;
+        this.type = type;
+        this.exprOrder = exprOrder;
+    }
+
+    /**
+     * s
+     *
+     * @param conjunction s
+     * @param type s
+     * @param exprOrder s
+     * @param node s
+     * @return s
+     */
+    public static RuntimeFilter createRuntimeFilter(RuntimeFilterId id, 
EqualTo conjunction,
+            TRuntimeFilterType type, int exprOrder, PhysicalHashJoin<Plan, 
Plan> node) {
+        Pair<Expression, Expression> srcs = 
checkAndMaybeSwapChild(conjunction, node);
+        if (srcs == null) {
+            return null;
+        }
+        return new RuntimeFilter(id, ((SlotReference) srcs.second), 
((SlotReference) srcs.first), type, exprOrder);
+    }
+
+    private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo 
expr,
+            PhysicalHashJoin<Plan, Plan> join) {
+        if (expr.children().stream().anyMatch(Literal.class::isInstance)) {
+            return null;
+        }
+        if (expr.child(0).equals(expr.child(1))) {
+            return null;
+        }
+        if 
(!expr.children().stream().allMatch(SlotReference.class::isInstance)) {
+            return null;
+        }
+        //current we assume that there are certainly different slot reference 
in equal to.
+        //they are not from the same relation.
+        int exchangeTag = join.child(0).getOutput().stream().anyMatch(slot -> 
slot.getExprId().equals(
+                ((SlotReference) expr.child(1)).getExprId())) ? 1 : 0;
+        return Pair.of(expr.child(exchangeTag), expr.child(1 ^ exchangeTag));
+    }
+
+    public SlotReference getSrcExpr() {
+        return srcSlot;
+    }
+
+    public SlotReference getTargetExpr() {
+        return targetSlot;
+    }
+
+    public RuntimeFilterId getId() {
+        return id;
+    }
+
+    public TRuntimeFilterType getType() {
+        return type;
+    }
+
+    public int getExprOrder() {
+        return exprOrder;
+    }
+
+    public void setTargetSlot(SlotReference targetSlot) {
+        this.targetSlot = targetSlot;
+    }
+
+    public boolean isUninitialized() {
+        return !finalized;
+    }
+
+    public void setFinalized() {
+        this.finalized = true;
+    }
+
+    /**
+     * runtime filter target
+     */
+    public static class RuntimeFilterTarget {
+        OlapScanNode node;

Review Comment:
   why we must has Legacy planner's data structure in Nereids?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {

Review Comment:
   i think reverse if branch is better, i.e.
   ```
   if (deniedJoinType.contains(join.getJoinType())) {
       ...
   } else {
       ...
   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));

Review Comment:
   i think return original join is ok



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));

Review Comment:
   not all `Alias`' child is `SlotReference`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))

Review Comment:
   why not just use `join`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()

Review Comment:
   i think return original `project` is ok



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()

Review Comment:
   we need to use accurate variable names.
   `hashConjuncts` is better than `eqPreds`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();

Review Comment:
   ```suggestion
       private Map<ExprId, List<RuntimeFilter>> targetExprIdToFilter = 
Maps.newHashMap();
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()
+                .map(plan -> plan.accept(this, 
ctx)).collect(Collectors.toList()));
+    }
+
+    /**
+     * s
+     * @param join s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilter(PhysicalHashJoin<Plan, Plan> join,
+            HashJoinNode node, PlanTranslatorContext ctx) {
+        if (!sessionVariable.isEnableNereidsRuntimeFilter()) {
+            return;
+        }

Review Comment:
   we need to add this to each visitXxx Functions



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+/**
+ * runtime filter
+ */
+public class RuntimeFilter {
+
+    private final SlotReference srcSlot;
+
+    private SlotReference targetSlot;
+
+    private final RuntimeFilterId id;
+
+    private final TRuntimeFilterType type;
+
+    private final int exprOrder;
+
+    private boolean finalized = false;
+
+    private RuntimeFilter(RuntimeFilterId id, SlotReference src, SlotReference 
target, TRuntimeFilterType type,
+            int exprOrder) {
+        this.id = id;
+        this.srcSlot = src;
+        this.targetSlot = target;
+        this.type = type;
+        this.exprOrder = exprOrder;
+    }
+
+    /**
+     * s
+     *
+     * @param conjunction s
+     * @param type s
+     * @param exprOrder s
+     * @param node s
+     * @return s
+     */
+    public static RuntimeFilter createRuntimeFilter(RuntimeFilterId id, 
EqualTo conjunction,
+            TRuntimeFilterType type, int exprOrder, PhysicalHashJoin<Plan, 
Plan> node) {
+        Pair<Expression, Expression> srcs = 
checkAndMaybeSwapChild(conjunction, node);
+        if (srcs == null) {
+            return null;
+        }
+        return new RuntimeFilter(id, ((SlotReference) srcs.second), 
((SlotReference) srcs.first), type, exprOrder);
+    }
+
+    private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo 
expr,
+            PhysicalHashJoin<Plan, Plan> join) {
+        if (expr.children().stream().anyMatch(Literal.class::isInstance)) {
+            return null;
+        }
+        if (expr.child(0).equals(expr.child(1))) {
+            return null;
+        }
+        if 
(!expr.children().stream().allMatch(SlotReference.class::isInstance)) {
+            return null;
+        }
+        //current we assume that there are certainly different slot reference 
in equal to.
+        //they are not from the same relation.
+        int exchangeTag = join.child(0).getOutput().stream().anyMatch(slot -> 
slot.getExprId().equals(
+                ((SlotReference) expr.child(1)).getExprId())) ? 1 : 0;
+        return Pair.of(expr.child(exchangeTag), expr.child(1 ^ exchangeTag));
+    }
+
+    public SlotReference getSrcExpr() {
+        return srcSlot;
+    }
+
+    public SlotReference getTargetExpr() {
+        return targetSlot;
+    }
+
+    public RuntimeFilterId getId() {
+        return id;
+    }
+
+    public TRuntimeFilterType getType() {
+        return type;
+    }
+
+    public int getExprOrder() {
+        return exprOrder;
+    }
+
+    public void setTargetSlot(SlotReference targetSlot) {
+        this.targetSlot = targetSlot;
+    }
+
+    public boolean isUninitialized() {
+        return !finalized;
+    }
+
+    public void setFinalized() {
+        this.finalized = true;
+    }
+
+    /**
+     * runtime filter target
+     */
+    public static class RuntimeFilterTarget {
+        OlapScanNode node;
+        SlotReference expr;
+
+        public RuntimeFilterTarget(OlapScanNode node, SlotReference expr) {
+            this.node = node;
+            this.expr = expr;
+        }
+
+        /**
+         * s
+         * @param node s
+         * @param targetSlotRef s
+         * @return s
+         */
+        public org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget 
toOriginRuntimeFilterTarget(

Review Comment:
   ```suggestion
           public org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget 
toLegacy RuntimeFilterTarget(
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    private Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByExprId = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, ExprId> aliasToChild = Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (!deniedJoinType.contains(join.getJoinType())) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;
+            eqPreds.forEach(expr -> {
+                runtimeFilters.addAll(legalTypes.stream()
+                        .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                type, cnt.get(), joinReplica))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList()));
+                cnt.getAndIncrement();
+            });
+            runtimeFilters.forEach(filter -> {
+                
filtersByExprId.computeIfAbsent(filter.getTargetExpr().getExprId(), k -> new 
ArrayList<>()).add(filter);
+            });
+            left = left.accept(this, ctx);
+            right = right.accept(this, ctx);
+        } else {
+            join.getOutput().forEach(slot -> 
filtersByExprId.remove(slot.getExprId()));
+            left = join.left().accept(this, ctx);
+            right = join.right().accept(this, ctx);
+        }
+        return join.withChildren(ImmutableList.of(left, right));
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .forEach(expr -> aliasToChild.put(((SlotReference) 
expr.child()).getExprId(), expr.getExprId()));
+        return project.withChildren(project.children().stream()
+                .map(plan -> plan.accept(this, 
ctx)).collect(Collectors.toList()));
+    }
+
+    /**
+     * s
+     * @param join s
+     * @param node s
+     * @param ctx s
+     */
+    public void translateRuntimeFilter(PhysicalHashJoin<Plan, Plan> join,
+            HashJoinNode node, PlanTranslatorContext ctx) {
+        if (!sessionVariable.isEnableNereidsRuntimeFilter()) {
+            return;
+        }
+        join.getHashJoinConjuncts().forEach(expr -> {
+            ExprId exprId = ((SlotReference) expr.child(0)).getExprId();
+            TupleId tid = 
ctx.findSlotRef(exprId).getDesc().getParent().getId();
+            if (filterTargetByExprId.containsKey(exprId) && 
filtersByExprId.containsKey(exprId)) {
+                List<RuntimeFilter.RuntimeFilterTarget> targets = 
filterTargetByExprId.get(exprId);
+                origFilters.addAll(filtersByExprId.get(exprId).stream()
+                        .filter(RuntimeFilter::isUninitialized)
+                        .map(filter -> {
+                            SlotRef src = 
ctx.findSlotRef(filter.getSrcExpr().getExprId());
+                            SlotRef target = 
exprIdToOlapScanNodeSlotRef.get(filter.getTargetExpr().getExprId());
+                            org.apache.doris.planner.RuntimeFilter origFilter
+                                    = 
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+                                    filter.getId(), node, src, 
filter.getExprOrder(), target,
+                                    ImmutableMap.of(tid, 
ImmutableList.of(target.getSlotId())),
+                                    filter.getType(), limits
+                            );
+                            targets.stream()
+                                    .map(nereidsTarget -> 
nereidsTarget.toOriginRuntimeFilterTarget(
+                                            node,
+                                            
exprIdToOlapScanNodeSlotRef.get(nereidsTarget.getExpr().getExprId())))
+                                    .forEach(origFilter::addTarget);
+                            
origFilter.setIsBroadcast(node.getDistributionMode() == 
DistributionMode.BROADCAST);
+                            origFilter.markFinalized();
+                            origFilter.assignToPlanNodes();
+                            origFilter.extractTargetsPosition();
+                            filter.setFinalized();
+                            return origFilter;

Review Comment:
   wrap it with another function for easy reading



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to