morrySnow commented on code in PR #12182:
URL: https://github.com/apache/doris/pull/12182#discussion_r958256630


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java:
##########
@@ -205,4 +207,9 @@ public DescriptorTable getDescTable() {
     public void appendTupleInfo(StringBuilder str) {
         str.append(descTable.getExplainString());
     }
+
+    @Override
+    public List<RuntimeFilter> getAssignedRuntimeFilter() {

Review Comment:
   ```suggestion
       public List<RuntimeFilter> getRuntimeFilters() {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    /**
+     * s
+     */
+    public static RuntimeFilterGenerator INSTANCE = new 
RuntimeFilterGenerator();
+
+    private static final IdGenerator<RuntimeFilterId> GENERATOR = 
RuntimeFilterId.createGenerator();
+
+    private final Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByTid = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private SessionVariable sessionVariable;
+
+    private FilterSizeLimits limits;
+
+    /**
+     * clear runtime filter and return the INSTANCE
+     * @param ctx connect context
+     * @return the INSTANCE
+     */
+    public static RuntimeFilterGenerator createInstance(ConnectContext ctx) {
+        INSTANCE.filterTargetByTid.clear();
+        INSTANCE.filtersByExprId.clear();
+        INSTANCE.sessionVariable = ctx.getSessionVariable();
+        INSTANCE.limits = new FilterSizeLimits(INSTANCE.sessionVariable);
+        INSTANCE.origFilters.clear();
+        return INSTANCE;
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {

Review Comment:
   add a TODO to explain what situation has supported and what need to support



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java:
##########
@@ -40,10 +41,12 @@
         RIGHT_CHILD_TYPE extends Plan>
         extends AbstractPhysicalJoin<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
 
+    private final RuntimeFilterGenerator runtimeFilter;
+
     public PhysicalHashJoin(JoinType joinType, List<Expression> 
hashJoinConjuncts,
             Optional<Expression> condition, LogicalProperties 
logicalProperties,
-            LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE rightChild) {
-        this(joinType, hashJoinConjuncts, condition, Optional.empty(), 
logicalProperties, leftChild, rightChild);
+                            LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE 
rightChild) {
+        this(joinType, hashJoinConjuncts, condition, Optional.empty(), 
logicalProperties, leftChild, rightChild, null);

Review Comment:
   do not use null, use Optional.empty() instead



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java:
##########
@@ -205,4 +207,9 @@ public DescriptorTable getDescTable() {
     public void appendTupleInfo(StringBuilder str) {
         str.append(descTable.getExplainString());
     }
+
+    @Override
+    public List<RuntimeFilter> getAssignedRuntimeFilter() {
+        return RuntimeFilterGenerator.INSTANCE.getRuntimeFilters();

Review Comment:
   we could not use singleton since fe could planner SQL concurrently



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java:
##########
@@ -40,6 +40,7 @@ public class PhysicalOlapScan extends PhysicalRelation {
     private final List<Long> selectedTabletId;
     private final List<Long> selectedPartitionId;
 
+

Review Comment:
   remove useless blank line



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRulesJobRewriter.java:
##########
@@ -35,14 +35,14 @@
 /**
  * Apply rules to normalize expressions.
  */
-public class RewriteJob extends BatchRulesJob {
+public class NereidsRulesJobRewriter extends BatchRulesJob {
 
     /**
      * Constructor.
      *
      * @param cascadesContext context for applying rules.
      */
-    public RewriteJob(CascadesContext cascadesContext) {
+    public NereidsRulesJobRewriter(CascadesContext cascadesContext) {

Review Comment:
   maybe xxxJobExecutor is better



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    /**
+     * s
+     */
+    public static RuntimeFilterGenerator INSTANCE = new 
RuntimeFilterGenerator();
+
+    private static final IdGenerator<RuntimeFilterId> GENERATOR = 
RuntimeFilterId.createGenerator();
+
+    private final Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByTid = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();

Review Comment:
   ```suggestion
       private final List<org.apache.doris.planner.RuntimeFilter> legacyFilters 
= Lists.newArrayList();
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import java.util.List;
+
+/**
+ * runtime filter
+ */
+public class RuntimeFilter {
+    private final EqualTo expr;
+
+    private final RuntimeFilterId id;
+
+    private final TRuntimeFilterType type;
+
+    private boolean finalized = false;
+
+    private RuntimeFilter(RuntimeFilterId id, EqualTo expr, TRuntimeFilterType 
type) {
+        this.id = id;
+        this.expr = expr;
+        this.type = type;
+    }
+
+    public void setFinalized() {
+        this.finalized = true;

Review Comment:
   why need this?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import java.util.List;
+
+/**
+ * runtime filter
+ */
+public class RuntimeFilter {
+    private final EqualTo expr;
+
+    private final RuntimeFilterId id;
+
+    private final TRuntimeFilterType type;
+
+    private boolean finalized = false;
+
+    private RuntimeFilter(RuntimeFilterId id, EqualTo expr, TRuntimeFilterType 
type) {
+        this.id = id;
+        this.expr = expr;
+        this.type = type;
+    }
+
+    public void setFinalized() {
+        this.finalized = true;
+    }
+
+    public boolean getFinalized() {
+        return finalized;
+    }
+
+    /**
+     * s
+     *
+     * @param conjunction s
+     * @param type s
+     * @param exprOrder s
+     * @param node s
+     * @return s
+     */
+    public static RuntimeFilter createRuntimeFilter(RuntimeFilterId id, 
EqualTo conjunction,
+            TRuntimeFilterType type, int exprOrder, PhysicalHashJoin<Plan, 
Plan> node,
+            List<Expression> newHashConjuncts) {
+        EqualTo expr = checkAndMaybeSwapChild(conjunction, node);

Review Comment:
   why swap here?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    /**
+     * s
+     */
+    public static RuntimeFilterGenerator INSTANCE = new 
RuntimeFilterGenerator();
+
+    private static final IdGenerator<RuntimeFilterId> GENERATOR = 
RuntimeFilterId.createGenerator();
+
+    private final Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByTid = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private SessionVariable sessionVariable;
+
+    private FilterSizeLimits limits;
+
+    /**
+     * clear runtime filter and return the INSTANCE
+     * @param ctx connect context
+     * @return the INSTANCE
+     */
+    public static RuntimeFilterGenerator createInstance(ConnectContext ctx) {
+        INSTANCE.filterTargetByTid.clear();
+        INSTANCE.filtersByExprId.clear();
+        INSTANCE.sessionVariable = ctx.getSessionVariable();
+        INSTANCE.limits = new FilterSizeLimits(INSTANCE.sessionVariable);
+        INSTANCE.origFilters.clear();
+        return INSTANCE;
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (join.getJoinType() == JoinType.INNER_JOIN) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();

Review Comment:
   we need meaningful name



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private static final IdGenerator<RuntimeFilterId> GENERATOR = 
RuntimeFilterId.createGenerator();
+
+    private final Map<ExprId, List<RuntimeFilter>> filtersByExprId = 
Maps.newHashMap();
+
+    private final Map<ExprId, List<RuntimeFilter.RuntimeFilterTarget>> 
filterTargetByTid = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> origFilters = 
Lists.newArrayList();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> 
join, CascadesContext ctx) {
+        Plan left = join.left();
+        Plan right = join.right();
+        if (join.getJoinType() == JoinType.INNER_JOIN) {
+            List<EqualTo> eqPreds = join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast).collect(Collectors.toList());
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
+            AtomicInteger cnt = new AtomicInteger();
+            final PhysicalHashJoin<Plan, Plan> joinReplica = join;

Review Comment:
   why create this variable



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java:
##########
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.postprocess;
+
+import org.apache.doris.analysis.ExplainOptions;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
+import org.apache.doris.nereids.datasets.ssb.SSBUtils;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpressionUtil;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.nereids.util.PatternMatchSupported;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.thrift.TQueryOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class RuntimeFilterTest extends SSBTestBase implements 
PatternMatchSupported {

Review Comment:
   add some UT to test runtime filter could through sort, aggregate, project, 
filter and join correctly.
   And add some UT to test runtime filter should not push through aggregate 
function, left outer join and others



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java:
##########
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.postprocess;
+
+import org.apache.doris.analysis.ExplainOptions;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
+import org.apache.doris.nereids.datasets.ssb.SSBUtils;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpressionUtil;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.nereids.util.PatternMatchSupported;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.thrift.TQueryOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class RuntimeFilterTest extends SSBTestBase implements 
PatternMatchSupported {
+
+    @Override
+    public void runBeforeEach() throws Exception {
+        NamedExpressionUtil.clear();
+    }
+
+    @Override
+    public void runBeforeAll() throws Exception {
+        super.runBeforeAll();
+        
connectContext.getSessionVariable().setEnableNereidsRuntimeFilter(true);
+    }
+
+    @Test
+    public void testGenerateRuntimeFilter() throws AnalysisException {
+        String sql = "SELECT * FROM lineorder JOIN customer on c_custkey = 
lo_custkey";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .implement()
+                .postProcess()
+                .matchesPhysicalPlan(
+                        physicalProject(
+                                physicalHashJoin(
+                                        physicalOlapScan(),
+                                        physicalOlapScan()
+                                ).when(join -> {
+                                    Expression expr = 
join.getHashJoinConjuncts().get(0);
+                                    Assertions.assertTrue(expr instanceof 
EqualTo);
+                                    List<RuntimeFilter> filters = 
join.getRuntimeFilters().getNereridsRuntimeFilter();
+                                    return filters.size() == 1
+                                            && 
checkRuntimeFilterExpr(filters.get(0), "c_custkey", "lo_custkey");
+                                })
+                        )
+                );
+    }
+
+    @Test
+    public void testGenerateRuntimeFilterByIllegalSrcExpr() throws 
AnalysisException {
+        String sql = "SELECT * FROM lineorder JOIN customer on c_custkey = 
c_custkey";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .implement()
+                .postProcess()
+                .matchesPhysicalPlan(
+                        physicalProject(
+                                physicalNestedLoopJoin(
+                                        physicalOlapScan(),
+                                        physicalOlapScan()
+                                )
+                        )
+                );
+    }
+
+    @Test
+    public void testComplexExpressionToRuntimeFilter() throws 
AnalysisException {
+        String sql
+                = "SELECT * FROM supplier JOIN customer on c_name = s_name and 
s_city = c_city and s_nation = c_nation";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .implement()
+                .postProcess()
+                .matchesPhysicalPlan(
+                        physicalProject(
+                                physicalHashJoin(
+                                        physicalOlapScan(),
+                                        physicalOlapScan()
+                                ).when(join -> {
+                                    List<RuntimeFilter> filters = 
join.getRuntimeFilters().getNereridsRuntimeFilter();
+                                    return filters.size() == 3
+                                            && 
checkRuntimeFilterExpr(filters.get(0), "c_name", "s_name")
+                                            && 
checkRuntimeFilterExpr(filters.get(1), "c_city", "s_city")
+                                            && 
checkRuntimeFilterExpr(filters.get(2), "c_nation", "s_nation");
+                                })
+                        )
+                );
+    }
+
+    @Test
+    public void testNestedJoinGenerateRuntimeFilter() throws AnalysisException 
{
+        String sql = SSBUtils.Q4_1;
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .implement()
+                .postProcess()
+                .matchesPhysicalPlan(
+                        physicalQuickSort(
+                                physicalAggregate(
+                                        physicalAggregate(
+                                                physicalHashJoin(
+                                                        physicalHashJoin(
+                                                                
physicalHashJoin(
+                                                                        
physicalHashJoin(
+                                                                               
 physicalOlapScan(),
+                                                                               
 physicalOlapScan()
+                                                                        ),
+                                                                        
physicalFilter(
+                                                                               
 physicalOlapScan()
+                                                                        )
+                                                                ),
+                                                                physicalFilter(
+                                                                        
physicalOlapScan()
+                                                                )
+                                                        ),
+                                                        physicalFilter(
+                                                                
physicalOlapScan()
+                                                        )
+                                                ).when(join -> {
+                                                    List<RuntimeFilter> 
filters = join.getRuntimeFilters().getNereridsRuntimeFilter();
+                                                    return filters.size() == 4
+                                                            && 
checkRuntimeFilterExpr(filters.get(0), "p_partkey", "lo_partkey")
+                                                            && 
checkRuntimeFilterExpr(filters.get(1), "s_suppkey", "lo_suppkey")
+                                                            && 
checkRuntimeFilterExpr(filters.get(2), "c_custkey", "lo_custkey")
+                                                            && 
checkRuntimeFilterExpr(filters.get(3), "lo_orderdate", "d_datekey");
+                                                })
+                                        )
+                                )
+                        )
+                );
+    }
+
+    @Test
+    public void testTranslateSSB() throws Exception {

Review Comment:
   this UT is not good since we cannot ensure runtime filter work well through 
this UT



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    /**
+     * s
+     */

Review Comment:
   remove it or write some meaningful comments



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/util/PhysicalPlanMatchingUtils.java:
##########
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.util;
+
+import org.apache.doris.nereids.pattern.Pattern;
+import org.apache.doris.nereids.trees.plans.Plan;
+
+import java.util.stream.IntStream;
+
+public class PhysicalPlanMatchingUtils {

Review Comment:
   why is must matching Physical?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to