This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch 2.1_39850
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 986006d219b14d274279feb2d5abf644524323aa
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Fri Aug 23 20:11:44 2024 +0800

    [fix](Nereids) producer to consumer should be multimap in cte (#39850)
    
    pick from master #39850
    
    because consumer could refer multi times for one producer' slot, so
    producer to consumer slot map should be multimap
---
 .../org/apache/doris/nereids/CascadesContext.java  | 19 +++++---
 .../org/apache/doris/nereids/StatementContext.java |  5 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  7 +--
 .../nereids/rules/rewrite/AdjustNullable.java      | 15 +++---
 .../doris/nereids/rules/rewrite/OrExpansion.java   |  8 +--
 .../rules/rewrite/VariantSubPathPruning.java       |  4 +-
 .../trees/copier/LogicalPlanDeepCopier.java        |  4 +-
 .../trees/plans/logical/LogicalCTEConsumer.java    | 57 +++++++++++-----------
 .../trees/plans/physical/PhysicalCTEConsumer.java  | 20 ++++----
 .../cte/test_cte_with_duplicate_consumer.groovy    | 25 ++++++++++
 10 files changed, 100 insertions(+), 64 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index 69f04ded057..0c3516f41c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -71,6 +71,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -617,8 +618,8 @@ public class CascadesContext implements ScheduleContext {
         return this.statementContext.getConsumerIdToFilters();
     }
 
-    public void addCTEConsumerGroup(CTEId cteId, Group g, Map<Slot, Slot> 
producerSlotToConsumerSlot) {
-        List<Pair<Map<Slot, Slot>, Group>> consumerGroups =
+    public void addCTEConsumerGroup(CTEId cteId, Group g, Multimap<Slot, Slot> 
producerSlotToConsumerSlot) {
+        List<Pair<Multimap<Slot, Slot>, Group>> consumerGroups =
                 
this.statementContext.getCteIdToConsumerGroup().computeIfAbsent(cteId, k -> new 
ArrayList<>());
         consumerGroups.add(Pair.of(producerSlotToConsumerSlot, g));
     }
@@ -627,12 +628,18 @@ public class CascadesContext implements ScheduleContext {
      * Update CTE consumer group as producer's stats update
      */
     public void updateConsumerStats(CTEId cteId, Statistics statistics) {
-        List<Pair<Map<Slot, Slot>, Group>> consumerGroups = 
this.statementContext.getCteIdToConsumerGroup().get(cteId);
-        for (Pair<Map<Slot, Slot>, Group> p : consumerGroups) {
-            Map<Slot, Slot> producerSlotToConsumerSlot = p.first;
+        List<Pair<Multimap<Slot, Slot>, Group>> consumerGroups
+                = this.statementContext.getCteIdToConsumerGroup().get(cteId);
+        for (Pair<Multimap<Slot, Slot>, Group> p : consumerGroups) {
+            Multimap<Slot, Slot> producerSlotToConsumerSlot = p.first;
             Statistics updatedConsumerStats = new 
StatisticsBuilder(statistics).build();
             for (Entry<Expression, ColumnStatistic> entry : 
statistics.columnStatistics().entrySet()) {
-                
updatedConsumerStats.addColumnStats(producerSlotToConsumerSlot.get(entry.getKey()),
 entry.getValue());
+                if (!(entry.getKey() instanceof Slot)) {
+                    continue;
+                }
+                for (Slot consumer : producerSlotToConsumerSlot.get((Slot) 
entry.getKey())) {
+                    updatedConsumerStats.addColumnStats(consumer, 
entry.getValue());
+                }
             }
             p.value().setStatistics(updatedConsumerStats);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index aa7838efffc..6120c583b9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -53,6 +53,7 @@ import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -114,7 +115,7 @@ public class StatementContext implements Closeable {
     private final Map<CTEId, Set<Slot>> cteIdToOutputIds = new HashMap<>();
     private final Map<RelationId, Set<Expression>> consumerIdToFilters = new 
HashMap<>();
     // Used to update consumer's stats
-    private final Map<CTEId, List<Pair<Map<Slot, Slot>, Group>>> 
cteIdToConsumerGroup = new HashMap<>();
+    private final Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> 
cteIdToConsumerGroup = new HashMap<>();
     private final Map<CTEId, LogicalPlan> rewrittenCteProducer = new 
HashMap<>();
     private final Map<CTEId, LogicalPlan> rewrittenCteConsumer = new 
HashMap<>();
     private final Set<String> viewDdlSqlSet = Sets.newHashSet();
@@ -354,7 +355,7 @@ public class StatementContext implements Closeable {
         return idToPlaceholderRealExpr;
     }
 
-    public Map<CTEId, List<Pair<Map<Slot, Slot>, Group>>> 
getCteIdToConsumerGroup() {
+    public Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> 
getCteIdToConsumerGroup() {
         return cteIdToConsumerGroup;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 7247c9d9291..129854b7ff9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1173,12 +1173,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         for (Slot producerSlot : cteProducer.getOutput()) {
             SlotRef slotRef = context.findSlotRef(producerSlot.getExprId());
             tupleDescriptor = slotRef.getDesc().getParent();
-            Slot consumerSlot = 
cteConsumer.getProducerToConsumerSlotMap().get(producerSlot);
-            // consumerSlot could be null if we prune partial consumers' 
columns
-            if (consumerSlot == null) {
-                continue;
+            for (Slot consumerSlot : 
cteConsumer.getProducerToConsumerSlotMap().get(producerSlot)) {
+                context.addExprIdSlotRefPair(consumerSlot.getExprId(), 
slotRef);
             }
-            context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef);
         }
         CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
         context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
index e387218c47c..808288b8fe3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
@@ -51,8 +51,10 @@ import org.apache.doris.nereids.util.ExpressionUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -261,14 +263,15 @@ public class AdjustNullable extends 
DefaultPlanRewriter<Map<ExprId, Slot>> imple
     @Override
     public Plan visitLogicalCTEConsumer(LogicalCTEConsumer cteConsumer, 
Map<ExprId, Slot> replaceMap) {
         Map<Slot, Slot> consumerToProducerOutputMap = new LinkedHashMap<>();
-        Map<Slot, Slot> producerToConsumerOutputMap = new LinkedHashMap<>();
+        Multimap<Slot, Slot> producerToConsumerOutputMap = 
LinkedHashMultimap.create();
         for (Slot producerOutputSlot : 
cteConsumer.getConsumerToProducerOutputMap().values()) {
             Slot newProducerOutputSlot = updateExpression(producerOutputSlot, 
replaceMap);
-            Slot newConsumerOutputSlot = 
cteConsumer.getProducerToConsumerOutputMap().get(producerOutputSlot)
-                    .withNullable(newProducerOutputSlot.nullable());
-            producerToConsumerOutputMap.put(newProducerOutputSlot, 
newConsumerOutputSlot);
-            consumerToProducerOutputMap.put(newConsumerOutputSlot, 
newProducerOutputSlot);
-            replaceMap.put(newConsumerOutputSlot.getExprId(), 
newConsumerOutputSlot);
+            for (Slot consumerOutputSlot : 
cteConsumer.getProducerToConsumerOutputMap().get(producerOutputSlot)) {
+                Slot newConsumerOutputSlot = 
consumerOutputSlot.withNullable(newProducerOutputSlot.nullable());
+                producerToConsumerOutputMap.put(newProducerOutputSlot, 
newConsumerOutputSlot);
+                consumerToProducerOutputMap.put(newConsumerOutputSlot, 
newProducerOutputSlot);
+                replaceMap.put(newConsumerOutputSlot.getExprId(), 
newConsumerOutputSlot);
+            }
         }
         return cteConsumer.withTwoMaps(consumerToProducerOutputMap, 
producerToConsumerOutputMap);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/OrExpansion.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/OrExpansion.java
index f4bdde0730b..99718c3d504 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/OrExpansion.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/OrExpansion.java
@@ -208,11 +208,11 @@ public class OrExpansion extends 
DefaultPlanRewriter<OrExpandsionContext> implem
     private Map<Slot, Slot> constructReplaceMap(LogicalCTEConsumer 
leftConsumer, Map<Slot, Slot> leftCloneToLeft,
             LogicalCTEConsumer rightConsumer, Map<Slot, Slot> 
rightCloneToRight) {
         Map<Slot, Slot> replaced = new HashMap<>();
-        for (Entry<Slot, Slot> entry : 
leftConsumer.getProducerToConsumerOutputMap().entrySet()) {
-            replaced.put(leftCloneToLeft.get(entry.getKey()), 
entry.getValue());
+        for (Entry<Slot, Slot> entry : 
leftConsumer.getConsumerToProducerOutputMap().entrySet()) {
+            replaced.put(leftCloneToLeft.get(entry.getValue()), 
entry.getKey());
         }
-        for (Entry<Slot, Slot> entry : 
rightConsumer.getProducerToConsumerOutputMap().entrySet()) {
-            replaced.put(rightCloneToRight.get(entry.getKey()), 
entry.getValue());
+        for (Entry<Slot, Slot> entry : 
rightConsumer.getConsumerToProducerOutputMap().entrySet()) {
+            replaced.put(rightCloneToRight.get(entry.getValue()), 
entry.getKey());
         }
         return replaced;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java
index 111493837a0..414dac1c95d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java
@@ -59,8 +59,10 @@ import org.apache.doris.nereids.util.ExpressionUtils;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 import java.util.Collections;
@@ -394,7 +396,7 @@ public class VariantSubPathPruning extends 
DefaultPlanRewriter<PruneContext> imp
                 return cteConsumer;
             }
             Map<Slot, Slot> consumerToProducerOutputMap = Maps.newHashMap();
-            Map<Slot, Slot> producerToConsumerOutputMap = Maps.newHashMap();
+            Multimap<Slot, Slot> producerToConsumerOutputMap = 
LinkedHashMultimap.create();
             Map<Slot, Map<List<String>, SlotReference>> oriSlotToSubPathToSlot 
= Maps.newHashMap();
             for (Map.Entry<Slot, Slot> consumerToProducer : 
cteConsumer.getConsumerToProducerOutputMap().entrySet()) {
                 Slot consumer = consumerToProducer.getKey();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
index 277f5ae345a..7b7c95dd811 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
@@ -63,6 +63,8 @@ import 
org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Multimap;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -397,7 +399,7 @@ public class LogicalPlanDeepCopier extends 
DefaultPlanRewriter<DeepCopierContext
             return 
context.getRelationReplaceMap().get(cteConsumer.getRelationId());
         }
         Map<Slot, Slot> consumerToProducerOutputMap = new LinkedHashMap<>();
-        Map<Slot, Slot> producerToConsumerOutputMap = new LinkedHashMap<>();
+        Multimap<Slot, Slot> producerToConsumerOutputMap = 
LinkedHashMultimap.create();
         for (Slot consumerOutput : cteConsumer.getOutput()) {
             Slot newOutput = (Slot) 
ExpressionDeepCopier.INSTANCE.deepCopy(consumerOutput, context);
             consumerToProducerOutputMap.put(newOutput, 
cteConsumer.getProducerSlot(consumerOutput));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTEConsumer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTEConsumer.java
index cd73c96d02e..415fdddf80b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTEConsumer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTEConsumer.java
@@ -33,6 +33,10 @@ import org.apache.doris.nereids.util.Utils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Multimap;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -50,20 +54,15 @@ public class LogicalCTEConsumer extends LogicalRelation 
implements BlockFuncDeps
     private final String name;
     private final CTEId cteId;
     private final Map<Slot, Slot> consumerToProducerOutputMap;
-    private final Map<Slot, Slot> producerToConsumerOutputMap;
+    private final Multimap<Slot, Slot> producerToConsumerOutputMap;
 
     /**
      * Logical CTE consumer.
      */
     public LogicalCTEConsumer(RelationId relationId, CTEId cteId, String name,
-            Map<Slot, Slot> consumerToProducerOutputMap, Map<Slot, Slot> 
producerToConsumerOutputMap) {
-        super(relationId, PlanType.LOGICAL_CTE_CONSUMER, Optional.empty(), 
Optional.empty());
-        this.cteId = Objects.requireNonNull(cteId, "cteId should not null");
-        this.name = Objects.requireNonNull(name, "name should not null");
-        this.consumerToProducerOutputMap = 
Objects.requireNonNull(consumerToProducerOutputMap,
-                "consumerToProducerOutputMap should not null");
-        this.producerToConsumerOutputMap = 
Objects.requireNonNull(producerToConsumerOutputMap,
-                "producerToConsumerOutputMap should not null");
+            Map<Slot, Slot> consumerToProducerOutputMap, Multimap<Slot, Slot> 
producerToConsumerOutputMap) {
+        this(relationId, cteId, name, consumerToProducerOutputMap, 
producerToConsumerOutputMap,
+                Optional.empty(), Optional.empty());
     }
 
     /**
@@ -73,24 +72,31 @@ public class LogicalCTEConsumer extends LogicalRelation 
implements BlockFuncDeps
         super(relationId, PlanType.LOGICAL_CTE_CONSUMER, Optional.empty(), 
Optional.empty());
         this.cteId = Objects.requireNonNull(cteId, "cteId should not null");
         this.name = Objects.requireNonNull(name, "name should not null");
-        this.consumerToProducerOutputMap = new LinkedHashMap<>();
-        this.producerToConsumerOutputMap = new LinkedHashMap<>();
-        initOutputMaps(producerPlan);
+        ImmutableMap.Builder<Slot, Slot> cToPBuilder = ImmutableMap.builder();
+        ImmutableMultimap.Builder<Slot, Slot> pToCBuilder = 
ImmutableMultimap.builder();
+        List<Slot> producerOutput = producerPlan.getOutput();
+        for (Slot producerOutputSlot : producerOutput) {
+            Slot consumerSlot = generateConsumerSlot(this.name, 
producerOutputSlot);
+            cToPBuilder.put(consumerSlot, producerOutputSlot);
+            pToCBuilder.put(producerOutputSlot, consumerSlot);
+        }
+        consumerToProducerOutputMap = cToPBuilder.build();
+        producerToConsumerOutputMap = pToCBuilder.build();
     }
 
     /**
      * Logical CTE consumer.
      */
     public LogicalCTEConsumer(RelationId relationId, CTEId cteId, String name,
-            Map<Slot, Slot> consumerToProducerOutputMap, Map<Slot, Slot> 
producerToConsumerOutputMap,
+            Map<Slot, Slot> consumerToProducerOutputMap, Multimap<Slot, Slot> 
producerToConsumerOutputMap,
             Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties) {
         super(relationId, PlanType.LOGICAL_CTE_CONSUMER, groupExpression, 
logicalProperties);
         this.cteId = Objects.requireNonNull(cteId, "cteId should not null");
         this.name = Objects.requireNonNull(name, "name should not null");
-        this.consumerToProducerOutputMap = 
Objects.requireNonNull(consumerToProducerOutputMap,
-                "consumerToProducerOutputMap should not null");
-        this.producerToConsumerOutputMap = 
Objects.requireNonNull(producerToConsumerOutputMap,
-                "producerToConsumerOutputMap should not null");
+        this.consumerToProducerOutputMap = 
ImmutableMap.copyOf(Objects.requireNonNull(consumerToProducerOutputMap,
+                "consumerToProducerOutputMap should not null"));
+        this.producerToConsumerOutputMap = 
ImmutableMultimap.copyOf(Objects.requireNonNull(producerToConsumerOutputMap,
+                "producerToConsumerOutputMap should not null"));
     }
 
     /**
@@ -107,20 +113,11 @@ public class LogicalCTEConsumer extends LogicalRelation 
implements BlockFuncDeps
                 slotRef != null ? Optional.of(slotRef.getInternalName()) : 
Optional.empty());
     }
 
-    private void initOutputMaps(LogicalPlan childPlan) {
-        List<Slot> producerOutput = childPlan.getOutput();
-        for (Slot producerOutputSlot : producerOutput) {
-            Slot consumerSlot = generateConsumerSlot(this.name, 
producerOutputSlot);
-            producerToConsumerOutputMap.put(producerOutputSlot, consumerSlot);
-            consumerToProducerOutputMap.put(consumerSlot, producerOutputSlot);
-        }
-    }
-
     public Map<Slot, Slot> getConsumerToProducerOutputMap() {
         return consumerToProducerOutputMap;
     }
 
-    public Map<Slot, Slot> getProducerToConsumerOutputMap() {
+    public Multimap<Slot, Slot> getProducerToConsumerOutputMap() {
         return producerToConsumerOutputMap;
     }
 
@@ -129,7 +126,8 @@ public class LogicalCTEConsumer extends LogicalRelation 
implements BlockFuncDeps
         return visitor.visitLogicalCTEConsumer(this, context);
     }
 
-    public Plan withTwoMaps(Map<Slot, Slot> consumerToProducerOutputMap, 
Map<Slot, Slot> producerToConsumerOutputMap) {
+    public Plan withTwoMaps(Map<Slot, Slot> consumerToProducerOutputMap,
+            Multimap<Slot, Slot> producerToConsumerOutputMap) {
         return new LogicalCTEConsumer(relationId, cteId, name,
                 consumerToProducerOutputMap, producerToConsumerOutputMap);
     }
@@ -162,7 +160,8 @@ public class LogicalCTEConsumer extends LogicalRelation 
implements BlockFuncDeps
     @Override
     public Plan pruneOutputs(List<NamedExpression> prunedOutputs) {
         Map<Slot, Slot> consumerToProducerOutputMap = new 
LinkedHashMap<>(this.consumerToProducerOutputMap.size());
-        Map<Slot, Slot> producerToConsumerOutputMap = new 
LinkedHashMap<>(this.consumerToProducerOutputMap.size());
+        Multimap<Slot, Slot> producerToConsumerOutputMap = 
LinkedHashMultimap.create(
+                this.consumerToProducerOutputMap.size(), 
this.consumerToProducerOutputMap.size());
         for (Entry<Slot, Slot> consumerToProducerSlot : 
this.consumerToProducerOutputMap.entrySet()) {
             if (prunedOutputs.contains(consumerToProducerSlot.getKey())) {
                 
consumerToProducerOutputMap.put(consumerToProducerSlot.getKey(), 
consumerToProducerSlot.getValue());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java
index 9139e3142b1..385c411e721 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java
@@ -31,6 +31,8 @@ import org.apache.doris.statistics.Statistics;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
 
 import java.util.List;
 import java.util.Map;
@@ -43,14 +45,14 @@ import java.util.Optional;
 public class PhysicalCTEConsumer extends PhysicalRelation {
 
     private final CTEId cteId;
-    private final Map<Slot, Slot> producerToConsumerSlotMap;
+    private final Multimap<Slot, Slot> producerToConsumerSlotMap;
     private final Map<Slot, Slot> consumerToProducerSlotMap;
 
     /**
      * Constructor
      */
     public PhysicalCTEConsumer(RelationId relationId, CTEId cteId, Map<Slot, 
Slot> consumerToProducerSlotMap,
-            Map<Slot, Slot> producerToConsumerSlotMap, LogicalProperties 
logicalProperties) {
+            Multimap<Slot, Slot> producerToConsumerSlotMap, LogicalProperties 
logicalProperties) {
         this(relationId, cteId, consumerToProducerSlotMap, 
producerToConsumerSlotMap,
                 Optional.empty(), logicalProperties);
     }
@@ -59,7 +61,7 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
      * Constructor
      */
     public PhysicalCTEConsumer(RelationId relationId, CTEId cteId,
-            Map<Slot, Slot> consumerToProducerSlotMap, Map<Slot, Slot> 
producerToConsumerSlotMap,
+            Map<Slot, Slot> consumerToProducerSlotMap, Multimap<Slot, Slot> 
producerToConsumerSlotMap,
             Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties) {
         this(relationId, cteId, consumerToProducerSlotMap, 
producerToConsumerSlotMap,
                 groupExpression, logicalProperties, null, null);
@@ -69,14 +71,14 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
      * Constructor
      */
     public PhysicalCTEConsumer(RelationId relationId, CTEId cteId, Map<Slot, 
Slot> consumerToProducerSlotMap,
-            Map<Slot, Slot> producerToConsumerSlotMap, 
Optional<GroupExpression> groupExpression,
+            Multimap<Slot, Slot> producerToConsumerSlotMap, 
Optional<GroupExpression> groupExpression,
             LogicalProperties logicalProperties, PhysicalProperties 
physicalProperties, Statistics statistics) {
         super(relationId, PlanType.PHYSICAL_CTE_CONSUMER, groupExpression,
                 logicalProperties, physicalProperties, statistics);
         this.cteId = cteId;
         this.consumerToProducerSlotMap = 
ImmutableMap.copyOf(Objects.requireNonNull(
                 consumerToProducerSlotMap, "consumerToProducerSlotMap should 
not null"));
-        this.producerToConsumerSlotMap = 
ImmutableMap.copyOf(Objects.requireNonNull(
+        this.producerToConsumerSlotMap = 
ImmutableMultimap.copyOf(Objects.requireNonNull(
                 producerToConsumerSlotMap, "consumerToProducerSlotMap should 
not null"));
     }
 
@@ -84,7 +86,7 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
         return cteId;
     }
 
-    public Map<Slot, Slot> getProducerToConsumerSlotMap() {
+    public Multimap<Slot, Slot> getProducerToConsumerSlotMap() {
         return producerToConsumerSlotMap;
     }
 
@@ -99,8 +101,7 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
     public String toString() {
         StringBuilder builder = new StringBuilder();
         if (!getAppliedRuntimeFilters().isEmpty()) {
-            getAppliedRuntimeFilters()
-                    .stream().forEach(rf -> builder.append(" 
RF").append(rf.getId().asInt()));
+            getAppliedRuntimeFilters().forEach(rf -> builder.append(" 
RF").append(rf.getId().asInt()));
         }
         return Utils.toSqlString("PhysicalCTEConsumer[" + id.asInt() + "]",
                 "stats", getStats(), "cteId", cteId, "RFs", builder, "map", 
consumerToProducerSlotMap);
@@ -141,8 +142,7 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
                 "cteId", cteId));
         if (!getAppliedRuntimeFilters().isEmpty()) {
             shapeBuilder.append(" apply RFs:");
-            getAppliedRuntimeFilters()
-                    .stream().forEach(rf -> shapeBuilder.append(" 
RF").append(rf.getId().asInt()));
+            getAppliedRuntimeFilters().forEach(rf -> shapeBuilder.append(" 
RF").append(rf.getId().asInt()));
         }
         return shapeBuilder.toString();
     }
diff --git 
a/regression-test/suites/nereids_p0/cte/test_cte_with_duplicate_consumer.groovy 
b/regression-test/suites/nereids_p0/cte/test_cte_with_duplicate_consumer.groovy
new file mode 100644
index 00000000000..4064efcfc6e
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/cte/test_cte_with_duplicate_consumer.groovy
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+suite("test_cte_with_duplicate_consumer") {
+    test {
+        sql """
+            WITH cte1(col1) AS (SELECT 1), cte2(col2_1, col2_2) AS (SELECT 
col1, col1 FROM cte1) SELECT * FROM cte2
+        """
+
+        result([[1, 1]])
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to