This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3117ac9289 [enhancement](Nereids) use post-order to generate runtime 
filter in RuntimeFilterGenerator (#13949)
3117ac9289 is described below

commit 3117ac9289fbfc37728f9f022289aa19b77a531d
Author: mch_ucchi <41606806+sohardforan...@users.noreply.github.com>
AuthorDate: Wed Nov 9 14:28:49 2022 +0800

    [enhancement](Nereids) use post-order to generate runtime filter in 
RuntimeFilterGenerator (#13949)
    
    change runtime filter generator from pre-order to post-order, it maybe 
change the quantity of generated runtime filters.
    and the ut will be corrected.
---
 .../glue/translator/RuntimeFilterTranslator.java   |   5 +-
 .../processor/post/RuntimeFilterContext.java       |  62 ++--------
 .../processor/post/RuntimeFilterGenerator.java     | 130 ++++++++++-----------
 .../trees/plans/physical/RuntimeFilter.java        |  44 -------
 .../nereids/postprocess/RuntimeFilterTest.java     |   7 +-
 5 files changed, 78 insertions(+), 170 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 1dc7a0a2c1..09dbb4115b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -60,9 +60,8 @@ public class RuntimeFilterTranslator {
      * @param ctx plan translator context
      */
     public void translateRuntimeFilterTarget(Slot slot, OlapScanNode node, 
PlanTranslatorContext ctx) {
-        context.setKVInNormalMap(context.getExprIdToOlapScanNodeSlotRef(),
-                slot.getExprId(), ctx.findSlotRef(slot.getExprId()));
-        
context.setKVInNormalMap(context.getScanNodeOfLegacyRuntimeFilterTarget(), 
slot, node);
+        context.getExprIdToOlapScanNodeSlotRef().put(slot.getExprId(), 
ctx.findSlotRef(slot.getExprId()));
+        context.getScanNodeOfLegacyRuntimeFilterTarget().put(slot, node);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index 5565dcf928..03c7be5f17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.processor.post;
 
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
@@ -32,12 +33,9 @@ import org.apache.doris.qe.SessionVariable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.jetbrains.annotations.NotNull;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -62,8 +60,10 @@ public class RuntimeFilterContext {
 
     private final Map<PhysicalHashJoin, List<RuntimeFilter>> 
runtimeFilterOnHashJoinNode = Maps.newHashMap();
 
-    // Alias's child to itself.
-    private final Map<Slot, NamedExpression> aliasChildToSelf = 
Maps.newHashMap();
+    // alias -> alias's child, if there's a key that is alias's child, the 
key-value will change by this way
+    // Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv 
will be C -> A.
+    // you can see disjoint set data structure to learn the processing 
detailed.
+    private final Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = Maps.newHashMap();
 
     private final Map<Slot, OlapScanNode> scanNodeOfLegacyRuntimeFilterTarget 
= Maps.newHashMap();
 
@@ -86,36 +86,21 @@ public class RuntimeFilterContext {
         return limits;
     }
 
-    public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) {
-        Preconditions.checkArgument(Arrays.stream(filters)
-                .allMatch(filter -> filter.getTargetExpr().getExprId() == id));
-        this.targetExprIdToFilter.computeIfAbsent(id, k -> 
Lists.newArrayList())
-                .addAll(Arrays.asList(filters));
+    public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
+        Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id);
+        this.targetExprIdToFilter.computeIfAbsent(id, k -> 
Lists.newArrayList()).add(filter);
     }
 
-    public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) {
-        return targetExprIdToFilter.get(id);
-    }
-
-    public void removeFilters(ExprId id) {
-        targetExprIdToFilter.remove(id);
-    }
-
-    public void setTargetsOnScanNode(RelationId id, Slot... slots) {
-        this.targetOnOlapScanNodeMap.computeIfAbsent(id, k -> 
Lists.newArrayList())
-                .addAll(Arrays.asList(slots));
-    }
-
-    public <K, V> void setKVInNormalMap(@NotNull Map<K, V> map, K key, V 
value) {
-        map.put(key, value);
+    public void setTargetsOnScanNode(RelationId id, Slot slot) {
+        this.targetOnOlapScanNodeMap.computeIfAbsent(id, k -> 
Lists.newArrayList()).add(slot);
     }
 
     public Map<ExprId, SlotRef> getExprIdToOlapScanNodeSlotRef() {
         return exprIdToOlapScanNodeSlotRef;
     }
 
-    public Map<Slot, NamedExpression> getAliasChildToSelf() {
-        return aliasChildToSelf;
+    public Map<NamedExpression, Pair<RelationId, NamedExpression>> 
getAliasTransferMap() {
+        return aliasTransferMap;
     }
 
     public Map<Slot, OlapScanNode> getScanNodeOfLegacyRuntimeFilterTarget() {
@@ -143,14 +128,6 @@ public class RuntimeFilterContext {
         return legacyFilters;
     }
 
-    public void setLegacyFilter(org.apache.doris.planner.RuntimeFilter filter) 
{
-        this.legacyFilters.add(filter);
-    }
-
-    public <K, V> boolean checkExistKey(@NotNull Map<K, V> map, K key) {
-        return map.containsKey(key);
-    }
-
     /**
      * get nereids runtime filters
      * @return nereids runtime filters
@@ -166,21 +143,6 @@ public class RuntimeFilterContext {
         return filters;
     }
 
-    /**
-     * get the slot list of the same olap scan node of the input slot.
-     * @param slot slot
-     * @return slot list
-     */
-    public List<NamedExpression> getSlotListOfTheSameSlotAtOlapScanNode(Slot 
slot) {
-        ImmutableList.Builder<NamedExpression> builder = 
ImmutableList.builder();
-        NamedExpression expr = slot;
-        do {
-            builder.add(expr);
-            expr = aliasChildToSelf.get(expr.toSlot());
-        } while (expr != null);
-        return builder.build();
-    }
-
     public void setTargetNullCount() {
         targetNullCount++;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index cc8531bafc..51c84f2dd6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -18,17 +18,23 @@
 package org.apache.doris.nereids.processor.post;
 
 import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.nereids.util.JoinUtils;
 import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
@@ -36,7 +42,8 @@ import com.google.common.collect.ImmutableSet;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -44,15 +51,12 @@ import java.util.stream.Collectors;
  * generate runtime filter
  */
 public class RuntimeFilterGenerator extends PlanPostProcessor {
-
-    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
-
-    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+    private static final ImmutableSet<JoinType> deniedJoinType = 
ImmutableSet.of(
             JoinType.LEFT_ANTI_JOIN,
-            JoinType.RIGHT_ANTI_JOIN,
             JoinType.FULL_OUTER_JOIN,
             JoinType.LEFT_OUTER_JOIN
     );
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
 
     /**
      * the runtime filter generator run at the phase of post process and plan 
translation of nereids planner.
@@ -72,94 +76,80 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap = ctx.getAliasTransferMap();
+        join.right().accept(this, context);
+        join.left().accept(this, context);
         if (deniedJoinType.contains(join.getJoinType())) {
-            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
-             * if it has encountered inner join, like
-             *                       a=b
-             *                      /   \
-             *                     /     \
-             *                    /       \
-             *                   /         \
-             *      left join-->a=c         b
-             *                  / \
-             *                 /   \
-             *                /     \
-             *               /       \
-             *              a         c
-             * runtime filter whose src expr is b can take effect on c.
-             * but now checking the inner join is unsupported. we may support 
it at later version.
-             */
-            join.getOutput().forEach(slot -> 
ctx.removeFilters(slot.getExprId()));
+            // copy to avoid bug when next call of getOutputSet()
+            Set<Slot> slots = join.getOutputSet();
+            slots.forEach(aliasTransferMap::remove);
         } else {
-            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
-                    (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values())
+                    .filter(type -> (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                     .collect(Collectors.toList());
             AtomicInteger cnt = new AtomicInteger();
             join.getHashJoinConjuncts().stream()
                     .map(EqualTo.class::cast)
+                    // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
                     // TODO: we will support it in later version.
-                    /*.peek(expr -> {
-                        // target is always the expr at the two side of equal 
of hash conjunctions.
-                        // TODO: some complex situation cannot be handled now, 
see testPushDownThroughJoin.
-                        List<SlotReference> slots = 
expr.children().stream().filter(SlotReference.class::isInstance)
-                                
.map(SlotReference.class::cast).collect(Collectors.toList());
-                        if (slots.size() != 2
-                                || 
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
-                                || 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+                    .forEach(expr -> legalTypes.forEach(type -> {
+                        Pair<Expression, Expression> normalizedChildren = 
checkAndMaybeSwapChild(expr, join);
+                        // aliasTransMap doesn't contain the key, means that 
the path from the olap scan to the join
+                        // contains join with denied join type. for example: a 
left join b on a.id = b.id
+                        if (normalizedChildren == null
+                                || !aliasTransferMap.containsKey((Slot) 
normalizedChildren.first)) {
                             return;
                         }
-                        int tag = 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0 
: 1;
-                        // generate runtime filter to associated expr. for 
example, a = b and a = c, RF b -> a can
-                        // generate RF b -> c
-                        List<RuntimeFilter> copiedRuntimeFilter = 
ctx.getFiltersByTargetExprId(slots.get(tag)
-                                        .getExprId()).stream()
-                                .map(filter -> new 
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
-                                        slots.get(tag ^ 1), filter.getType(), 
filter.getExprOrder(), join))
-                                .collect(Collectors.toList());
-                        ctx.setTargetExprIdToFilters(slots.get(tag ^ 
1).getExprId(),
-                                copiedRuntimeFilter.toArray(new 
RuntimeFilter[0]));
-                    })*/
-                    .forEach(expr -> legalTypes.stream()
-                            .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
-                                    type, cnt.getAndIncrement(), join))
-                            .filter(Objects::nonNull)
-                            .forEach(filter ->
-                                    
ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter)));
+                        Pair<Slot, Slot> slots = Pair.of(
+                                aliasTransferMap.get((Slot) 
normalizedChildren.first).second.toSlot(),
+                                ((Slot) normalizedChildren.second));
+                        RuntimeFilter filter = new 
RuntimeFilter(generator.getNextId(),
+                                slots.second, slots.first, type,
+                                cnt.getAndIncrement(), join);
+                        ctx.setTargetExprIdToFilter(slots.first.getExprId(), 
filter);
+                        ctx.setTargetsOnScanNode(
+                                aliasTransferMap.get((Slot) 
normalizedChildren.first).first,
+                                slots.first);
+                    }));
         }
-        join.left().accept(this, context);
-        join.right().accept(this, context);
         return join;
     }
 
     // TODO: support src key is agg slot.
     @Override
     public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext context) {
-        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        project.child().accept(this, context);
+        Map<NamedExpression, Pair<RelationId, NamedExpression>> 
aliasTransferMap
+                = context.getRuntimeFilterContext().getAliasTransferMap();
+        // change key when encounter alias.
         project.getProjects().stream().filter(Alias.class::isInstance)
                 .map(Alias.class::cast)
-                .filter(expr -> expr.child() instanceof SlotReference)
-                .forEach(expr -> 
ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()), 
expr));
-        project.child().accept(this, context);
+                .filter(alias -> alias.child() instanceof NamedExpression
+                        && aliasTransferMap.containsKey((NamedExpression) 
alias.child()))
+                .forEach(alias -> {
+                    NamedExpression child = ((NamedExpression) alias.child());
+                    aliasTransferMap.put(alias.toSlot(), 
aliasTransferMap.remove(child));
+                });
         return project;
     }
 
     @Override
     public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, 
CascadesContext context) {
+        // add all the slots in map.
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        scan.getOutput().stream()
-                .filter(slot -> 
ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
-                        .filter(expr -> 
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId()))
-                        .peek(expr -> {
-                            if (expr.getExprId() == slot.getExprId()) {
-                                return;
-                            }
-                            List<RuntimeFilter> filters = 
ctx.getFiltersByTargetExprId(expr.getExprId());
-                            ctx.removeFilters(expr.getExprId());
-                            filters.forEach(filter -> 
filter.setTargetSlot(slot));
-                            
ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters);
-                        })
-                        .count() > 0)
-                .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot));
+        scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, 
Pair.of(scan.getId(), slot)));
         return scan;
     }
+
+    private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo 
expr,
+            PhysicalHashJoin<? extends Plan, ? extends Plan> join) {
+        if (expr.child(0).equals(expr.child(1))
+                || 
!expr.children().stream().allMatch(SlotReference.class::isInstance)) {
+            return null;
+        }
+        // current we assume that there are certainly different slot reference 
in equal to.
+        // they are not from the same relation.
+        List<Expression> children = 
JoinUtils.swapEqualToForChildrenOrder(expr, 
join.left().getOutputSet()).children();
+        return Pair.of(children.get(0), children.get(1));
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index 8eeb6071d7..326bc6b634 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -17,12 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.physical;
 
-import org.apache.doris.common.Pair;
-import org.apache.doris.nereids.trees.expressions.EqualTo;
-import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
@@ -58,37 +53,6 @@ public class RuntimeFilter {
         this.builderNode = builderNode;
     }
 
-    /**
-     * create RF
-     */
-    public static RuntimeFilter createRuntimeFilter(RuntimeFilterId id, 
EqualTo conjunction,
-            TRuntimeFilterType type, int exprOrder, PhysicalHashJoin node) {
-        Pair<Expression, Expression> srcs = 
checkAndMaybeSwapChild(conjunction, node);
-        if (srcs == null) {
-            return null;
-        }
-        return new RuntimeFilter(id, ((SlotReference) srcs.second), 
((SlotReference) srcs.first), type, exprOrder,
-                node);
-    }
-
-    private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo 
expr,
-            PhysicalHashJoin join) {
-        if (expr.children().stream().anyMatch(Literal.class::isInstance)) {
-            return null;
-        }
-        if (expr.child(0).equals(expr.child(1))) {
-            return null;
-        }
-        if 
(!expr.children().stream().allMatch(SlotReference.class::isInstance)) {
-            return null;
-        }
-        // current we assume that there are certainly different slot reference 
in equal to.
-        // they are not from the same relation.
-        int exchangeTag = join.child(0).getOutput().stream().anyMatch(slot -> 
slot.getExprId().equals(
-                ((SlotReference) expr.child(1)).getExprId())) ? 1 : 0;
-        return Pair.of(expr.child(exchangeTag), expr.child(1 ^ exchangeTag));
-    }
-
     public Slot getSrcExpr() {
         return srcSlot;
     }
@@ -113,14 +77,6 @@ public class RuntimeFilter {
         return builderNode;
     }
 
-    public void setTargetSlot(Slot targetSlot) {
-        this.targetSlot = targetSlot;
-    }
-
-    public boolean isUninitialized() {
-        return !finalized;
-    }
-
     public void setFinalized() {
         this.finalized = true;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index 7b0fe88971..617ccf0f05 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -228,9 +228,10 @@ public class RuntimeFilterTest extends SSBTestBase {
 
     private void checkRuntimeFilterExprs(List<RuntimeFilter> filters, 
List<Pair<String, String>> colNames) {
         Assertions.assertEquals(filters.size(), colNames.size());
-        for (int i = 0; i < filters.size(); i++) {
-            
Assertions.assertTrue(filters.get(i).getSrcExpr().toSql().equals(colNames.get(i).first)
-                    && 
filters.get(i).getTargetExpr().toSql().equals(colNames.get(i).second));
+        for (RuntimeFilter filter : filters) {
+            Assertions.assertTrue(colNames.contains(Pair.of(
+                    filter.getSrcExpr().getName(),
+                    filter.getTargetExpr().getName())));
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to