This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch dev_rec
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4c06602b74283ed0bfeba573e317b166100299a8
Author: lichi <[email protected]>
AuthorDate: Fri Oct 24 11:26:46 2025 +0800

    update fe
---
 .../glue/translator/PhysicalPlanTranslator.java    |   2 +-
 .../doris/nereids/rules/analysis/AnalyzeCTE.java   |   4 +-
 ...eChildToPhysicalRecursiveCteRecursiveChild.java |   1 +
 .../doris/nereids/rules/rewrite/CTEInline.java     |  22 ++-
 .../trees/copier/LogicalPlanDeepCopier.java        |   2 +-
 .../logical/LogicalRecursiveCteRecursiveChild.java |  27 ++--
 .../plans/logical/LogicalRecursiveCteScan.java     |   2 +-
 .../trees/plans/physical/PhysicalRecursiveCte.java |   4 +
 .../PhysicalRecursiveCteRecursiveChild.java        |  28 ++--
 .../org/apache/doris/planner/RecursiveCteNode.java |   8 +-
 .../doris/qe/runtime/ThriftPlansBuilder.java       |  22 ++-
 .../nereids/rules/analysis/AnalyzeCTETest.java     | 148 +++++++++++++++++++++
 .../doris/nereids/rules/rewrite/CTEInlineTest.java |  84 ++++++++++++
 .../nereids/rules/rewrite/ColumnPruningTest.java   |  30 +++++
 14 files changed, 346 insertions(+), 38 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 96469ccd5bf..ee8fec61401 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -2283,7 +2283,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         List<SlotDescriptor> outputSlotDescs = new 
ArrayList<>(setTuple.getSlots());
 
         RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
-                    recursiveCte.isUnionAll());
+                    recursiveCte.getCteName(), recursiveCte.isUnionAll());
         List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
         recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
         recursiveCteNode.setNereidsId(recursiveCte.getId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
index bbf0c036cf4..62612422e08 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
@@ -191,8 +191,8 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory {
                         recursiveChildOutputs.get(i).getDataType(), 
anchorChildOutputTypes.get(i)));
             }
         }
-        analyzedRecursiveChild = new 
LogicalRecursiveCteRecursiveChild<>(forceOutputNullable(analyzedRecursiveChild,
-                ImmutableList.of()));
+        analyzedRecursiveChild = new 
LogicalRecursiveCteRecursiveChild<>(aliasQuery.getAlias(),
+                forceOutputNullable(analyzedRecursiveChild, 
ImmutableList.of()));
 
         // create LogicalRecursiveCte
         LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChild.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChild.java
index 2923924a6ad..689e550fa3a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChild.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChild.java
@@ -29,6 +29,7 @@ public class 
LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChi
     @Override
     public Rule build() {
         return logicalRecursiveCteRecursiveChild().then(recursiveCte -> new 
PhysicalRecursiveCteRecursiveChild(
+                recursiveCte.getCteName(),
                 recursiveCte.getLogicalProperties(),
                 recursiveCte.child()))
                 
.toRule(RuleType.LOGICAL_RECURSIVE_CTE_RECURSIVE_CHILD_TO_PHYSICAL_RECURSIVE_CTE_RECURSIVE_CHILD);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java
index 75f228ff7fd..9bbcb2e1e8d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java
@@ -52,15 +52,12 @@ import java.util.Set;
  * and put all of them to the top of plan depends on dependency tree of them.
  */
 public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> 
implements CustomRewriter {
+    // all cte used by recursive cte's recursive child should be inline
     private Set<LogicalCTEConsumer> mustInlineCteConsumers = new HashSet<>();
 
     @Override
     public Plan rewriteRoot(Plan plan, JobContext jobContext) {
-        List<LogicalRecursiveCteRecursiveChild> recursiveCteRecursiveChildList 
=
-                
plan.collectToList(LogicalRecursiveCteRecursiveChild.class::isInstance);
-        for (LogicalRecursiveCteRecursiveChild recursiveChild : 
recursiveCteRecursiveChildList) {
-            
mustInlineCteConsumers.addAll(recursiveChild.collect(LogicalCTEConsumer.class::isInstance));
-        }
+        collectMustInlineCteConsumers(plan, false, mustInlineCteConsumers);
 
         Plan root = plan.accept(this, null);
         // collect cte id to consumer
@@ -131,4 +128,19 @@ public class CTEInline extends 
DefaultPlanRewriter<LogicalCTEProducer<?>> implem
         }
         return cteConsumer;
     }
+
+    private void collectMustInlineCteConsumers(Plan planNode, boolean 
needCollect,
+            Set<LogicalCTEConsumer> cteConsumers) {
+        if (planNode instanceof LogicalCTEConsumer) {
+            if (needCollect) {
+                cteConsumers.add((LogicalCTEConsumer) planNode);
+            }
+        } else if (planNode instanceof LogicalRecursiveCteRecursiveChild) {
+            collectMustInlineCteConsumers(planNode.child(0), true, 
cteConsumers);
+        } else {
+            for (Plan child : planNode.children()) {
+                collectMustInlineCteConsumers(child, needCollect, 
cteConsumers);
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
index e900ba6b2b4..b13644dc10d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
@@ -377,7 +377,7 @@ public class LogicalPlanDeepCopier extends 
DefaultPlanRewriter<DeepCopierContext
     public Plan 
visitLogicalRecursiveCteRecursiveChild(LogicalRecursiveCteRecursiveChild<? 
extends Plan> recursiveChild,
             DeepCopierContext context) {
         Plan child = recursiveChild.child().accept(this, context);
-        return new LogicalRecursiveCteRecursiveChild<>(child);
+        return new 
LogicalRecursiveCteRecursiveChild<>(recursiveChild.getCteName(), child);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java
index 55747f5ba12..d2766a86f95 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java
@@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
 
 import com.google.common.collect.ImmutableList;
 
@@ -35,24 +36,30 @@ import java.util.Optional;
  * LogicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle
  */
 public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> 
extends LogicalUnary<CHILD_TYPE> {
+    private final String cteName;
 
-    public LogicalRecursiveCteRecursiveChild(CHILD_TYPE child) {
-        this(Optional.empty(), Optional.empty(), child);
+    public LogicalRecursiveCteRecursiveChild(String cteName, CHILD_TYPE child) 
{
+        this(cteName, Optional.empty(), Optional.empty(), child);
     }
 
-    public LogicalRecursiveCteRecursiveChild(Optional<GroupExpression> 
groupExpression,
+    public LogicalRecursiveCteRecursiveChild(String cteName, 
Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
-        this(groupExpression, logicalProperties, ImmutableList.of(child));
+        this(cteName, groupExpression, logicalProperties, 
ImmutableList.of(child));
     }
 
-    public LogicalRecursiveCteRecursiveChild(Optional<GroupExpression> 
groupExpression,
+    public LogicalRecursiveCteRecursiveChild(String cteName, 
Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> child) {
         super(PlanType.LOGICAL_RECURSIVE_CTE_RECURSIVE_CHILD, groupExpression, 
logicalProperties, child);
+        this.cteName = cteName;
+    }
+
+    public String getCteName() {
+        return cteName;
     }
 
     @Override
     public Plan withChildren(List<Plan> children) {
-        return new LogicalRecursiveCteRecursiveChild<>(Optional.empty(), 
Optional.empty(), children);
+        return new LogicalRecursiveCteRecursiveChild<>(cteName, 
Optional.empty(), Optional.empty(), children);
     }
 
     @Override
@@ -67,18 +74,20 @@ public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE 
extends Plan> extends
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new LogicalRecursiveCteRecursiveChild<>(groupExpression, 
Optional.of(getLogicalProperties()), children);
+        return new LogicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression,
+                Optional.of(getLogicalProperties()), children);
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
-        return new LogicalRecursiveCteRecursiveChild<>(groupExpression, 
logicalProperties, children);
+        return new LogicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression, logicalProperties, children);
     }
 
     @Override
     public String toString() {
-        return "LogicalRecursiveCteRecursiveChild(MUST_SHUFFLE)";
+        return Utils.toSqlStringSkipNull("LogicalRecursiveCteRecursiveChild",
+                "cteName", cteName);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java
index 110a6297858..31b335260b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java
@@ -52,7 +52,7 @@ public class LogicalRecursiveCteScan extends 
LogicalCatalogRelation {
     @Override
     public String toString() {
         return Utils.toSqlString("LogicalRecursiveCteScan",
-                "recursive cte name", table.getName());
+                "cteName", table.getName());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
index e5a74a8292a..44aab38fc30 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
@@ -92,6 +92,10 @@ public class PhysicalRecursiveCte extends 
AbstractPhysicalPlan implements Recurs
         return isUnionAll;
     }
 
+    public String getCteName() {
+        return cteName;
+    }
+
     @Override
     public List<SlotReference> getRegularChildOutput(int i) {
         return regularChildrenOutputs.get(i);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java
index 25903905b7a..9aef71e7ee9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java
@@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.statistics.Statistics;
 
 import com.google.common.base.Preconditions;
@@ -38,31 +39,36 @@ import java.util.Optional;
  * PhysicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle
  */
 public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHILD_TYPE> {
-    public PhysicalRecursiveCteRecursiveChild(LogicalProperties 
logicalProperties, CHILD_TYPE child) {
-        this(Optional.empty(), logicalProperties, child);
+    private final String cteName;
+
+    public PhysicalRecursiveCteRecursiveChild(String cteName, 
LogicalProperties logicalProperties, CHILD_TYPE child) {
+        this(cteName, Optional.empty(), logicalProperties, child);
     }
 
-    public PhysicalRecursiveCteRecursiveChild(Optional<GroupExpression> 
groupExpression,
+    public PhysicalRecursiveCteRecursiveChild(String cteName, 
Optional<GroupExpression> groupExpression,
             LogicalProperties logicalProperties, CHILD_TYPE child) {
-        this(groupExpression, logicalProperties, PhysicalProperties.ANY, null, 
child);
+        this(cteName, groupExpression, logicalProperties, 
PhysicalProperties.ANY, null, child);
     }
 
-    public PhysicalRecursiveCteRecursiveChild(Optional<GroupExpression> 
groupExpression,
+    public PhysicalRecursiveCteRecursiveChild(String cteName, 
Optional<GroupExpression> groupExpression,
             LogicalProperties logicalProperties, @Nullable PhysicalProperties 
physicalProperties, Statistics statistics,
             CHILD_TYPE child) {
         super(PlanType.PHYSICAL_RECURSIVE_CTE_RECURSIVE_CHILD, 
groupExpression, logicalProperties, physicalProperties,
                 statistics, child);
+        this.cteName = cteName;
     }
 
     @Override
     public String toString() {
-        return "PhysicalRecursiveCteRecursiveChild(MUST_SHUFFLE)";
+        return Utils.toSqlStringSkipNull("PhysicalRecursiveCteRecursiveChild",
+                "cteName", cteName);
     }
 
     @Override
     public Plan withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new PhysicalRecursiveCteRecursiveChild<>(groupExpression, 
getLogicalProperties(), children.get(0));
+        return new PhysicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression, getLogicalProperties(),
+                children.get(0));
     }
 
     @Override
@@ -77,14 +83,14 @@ public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE 
extends Plan> extends
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new PhysicalRecursiveCteRecursiveChild<>(groupExpression, 
getLogicalProperties(), child());
+        return new PhysicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression, getLogicalProperties(), child());
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new PhysicalRecursiveCteRecursiveChild<>(groupExpression, 
logicalProperties.get(), child());
+        return new PhysicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression, logicalProperties.get(), child());
     }
 
     @Override
@@ -109,7 +115,7 @@ public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE 
extends Plan> extends
 
     @Override
     public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
-        return new PhysicalRecursiveCteRecursiveChild<>(groupExpression, 
getLogicalProperties(), physicalProperties,
-                statistics, child());
+        return new PhysicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression, getLogicalProperties(),
+                physicalProperties, statistics, child());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java
index d9625dfc6dd..c8259cc2821 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java
@@ -31,13 +31,14 @@ import com.google.common.collect.Lists;
 import java.util.List;
 
 public class RecursiveCteNode extends PlanNode {
-
+    private String cteName;
     private boolean isUnionAll;
     private List<List<Expr>> materializedResultExprLists = 
Lists.newArrayList();
     private TRecCTENode tRecCTENode;
 
-    public RecursiveCteNode(PlanNodeId id, TupleId tupleId, boolean 
isUnionAll) {
+    public RecursiveCteNode(PlanNodeId id, TupleId tupleId, String cteName, 
boolean isUnionAll) {
         super(id, tupleId.asList(), "RECURSIVE_CTE", 
StatisticalType.RECURSIVE_CTE_NODE);
+        this.cteName = cteName;
         this.isUnionAll = isUnionAll;
     }
 
@@ -66,7 +67,7 @@ public class RecursiveCteNode extends PlanNode {
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         StringBuilder output = new StringBuilder();
-        output.append(prefix).append("Recursive Cte: ").append("\n");
+        output.append(prefix).append("Recursive Cte: 
").append(cteName).append("\n");
         output.append(prefix).append("isUnionAll: 
").append(isUnionAll).append("\n");
         if (!conjuncts.isEmpty()) {
             Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
@@ -78,6 +79,7 @@ public class RecursiveCteNode extends PlanNode {
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
+                .add("name", cteName)
                 .add("id", getId().asInt())
                 .add("tid", tupleIds.get(0).asInt())
                 .add("isUnionAll", isUnionAll).toString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index fc0bd2d1859..2733385154c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -597,13 +597,10 @@ public class ThriftPlansBuilder {
 
     private static Set<Integer> 
setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans,
             List<RuntimeFilter> runtimeFilters) {
-        Set<RecursiveCteNode> recursiveCteNodesInRecursiveSide = new 
HashSet<>();
-        PlanNode rootPlan = distributedPlans.get(distributedPlans.size() - 1)
-                        .getFragmentJob().getFragment().getPlanRoot();
-        collectAllRecursiveCteNodesInRecursiveSide(rootPlan, false, 
recursiveCteNodesInRecursiveSide);
         Set<Integer> fragmentToNotifyClose = new HashSet<>();
         Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new 
TreeMap<>();
         Map<PlanFragmentId, Set<TNetworkAddress>> 
fragmentIdToNetworkAddressMap = new TreeMap<>();
+        // distributedPlans is ordered in bottom up way, so does the fragments
         for (PipelineDistributedPlan plan : distributedPlans) {
             List<AssignedJob> fragmentAssignedJobs = plan.getInstanceJobs();
             Set<TNetworkAddress> networkAddresses = new TreeSet<>();
@@ -633,6 +630,7 @@ public class ThriftPlansBuilder {
                         distributedPlanWorker.brpcPort()));
                 
tRecCTETarget.setFragmentInstanceId(fragmentAssignedJobs.get(0).instanceId());
                 
tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt());
+                // find all RecursiveCteScanNode and its fragment id
                 fragmentIdToRecCteTargetMap.put(planFragment.getFragmentId(), 
tRecCTETarget);
             }
 
@@ -648,15 +646,22 @@ public class ThriftPlansBuilder {
                 List<TRecCTETarget> targets = new ArrayList<>();
                 List<TRecCTEResetInfo> fragmentsToReset = new ArrayList<>();
                 // PhysicalPlanTranslator will swap recursiveCteNode's child 
fragment,
-                // so we get recursive one by 1st child
+                // so we get recursive one by 1st child and collect all child 
fragment of recursive side
                 List<PlanFragment> childFragments = new ArrayList<>();
                 
planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, 
childFragments);
                 for (PlanFragment child : childFragments) {
                     PlanFragmentId childFragmentId = child.getFragmentId();
+                    // the fragment need to be notified to close
                     fragmentToNotifyClose.add(childFragmentId.asInt());
                     TRecCTETarget tRecCTETarget = 
fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null);
                     if (tRecCTETarget != null) {
+                        // one RecursiveCteNode can only have one 
corresponding RecursiveCteScanNode
                         targets.add(tRecCTETarget);
+                        // because we traverse the fragments in bottom-up way
+                        // we can safely remove accessed RecursiveCteScanNode
+                        // so the parent RecursiveCteNode won't see its 
grandson RecursiveCteScanNode
+                        // but can only see its child RecursiveCteScanNode
+                        fragmentIdToRecCteTargetMap.remove(childFragmentId);
                     }
                     Set<TNetworkAddress> tNetworkAddresses = 
fragmentIdToNetworkAddressMap.get(childFragmentId);
                     if (tNetworkAddresses == null) {
@@ -677,6 +682,7 @@ public class ThriftPlansBuilder {
                 for (List<Expr> exprList : materializedResultExprLists) {
                     texprLists.add(Expr.treesToThrift(exprList));
                 }
+                // the recursive side's rf need to be reset
                 List<Integer> runtimeFiltersToReset = new 
ArrayList<>(runtimeFilters.size());
                 for (RuntimeFilter rf : runtimeFilters) {
                     if (rf.hasRemoteTargets()
@@ -684,7 +690,13 @@ public class ThriftPlansBuilder {
                         runtimeFiltersToReset.add(rf.getFilterId().asInt());
                     }
                 }
+                // find recursiveCte used by other recursive cte
+                Set<RecursiveCteNode> recursiveCteNodesInRecursiveSide = new 
HashSet<>();
+                PlanNode rootPlan = 
distributedPlans.get(distributedPlans.size() - 1)
+                        .getFragmentJob().getFragment().getPlanRoot();
+                collectAllRecursiveCteNodesInRecursiveSide(rootPlan, false, 
recursiveCteNodesInRecursiveSide);
                 boolean isUsedByOtherRecCte = 
recursiveCteNodesInRecursiveSide.contains(recursiveCteNode);
+
                 TRecCTENode tRecCTENode = new TRecCTENode();
                 tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll());
                 tRecCTENode.setTargets(targets);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTETest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTETest.java
index a91c0dd4712..56cea284188 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTETest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTETest.java
@@ -33,10 +33,14 @@ import org.apache.doris.nereids.rules.rewrite.InApplyToJoin;
 import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderApply;
 import org.apache.doris.nereids.rules.rewrite.UnCorrelatedApplyFilter;
 import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable;
+import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.util.MemoPatternMatchSupported;
 import org.apache.doris.nereids.util.MemoTestUtils;
 import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.utframe.TestWithFeService;
 
 import com.google.common.collect.ImmutableList;
@@ -264,6 +268,92 @@ public class AnalyzeCTETest extends TestWithFeService 
implements MemoPatternMatc
                 );
     }
 
+    @Test
+    public void testRecCteOutputNullable() {
+        String sql = new StringBuilder()
+                .append("WITH RECURSIVE test_table AS (\n")
+                .append("    SELECT 1 UNION ALL\n")
+                .append("    SELECT 2 FROM test_table\n")
+                .append(")\n")
+                .append("SELECT * FROM test_table;")
+                .toString();
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .matches(
+                        logicalRecursiveCte(
+                                logicalProject(
+                                        logicalOneRowRelation(
+                                        )
+                                ).when(project -> 
project.getProjects().get(0).child(0) instanceof Nullable),
+                                logicalRecursiveCteRecursiveChild(
+                                        logicalProject(
+                                                logicalProject(
+                                                        logicalCTEConsumer()
+                                                )
+                                        ).when(project -> 
project.getProjects().get(0).child(0) instanceof Nullable)
+                                )
+                        )
+                );
+    }
+
+    @Test
+    public void testRecCteWithoutRecKeyword() {
+        String sql = new StringBuilder()
+                .append("WITH RECURSIVE t1 AS (\n")
+                .append("    SELECT 1\n")
+                .append("UNION ALL\n")
+                .append("    SELECT 2 FROM t1\n")
+                .append("),\n").append("t2 AS (\n")
+                .append("    SELECT 3\n")
+                .append("UNION ALL\n")
+                .append("    SELECT 4 FROM t1, t2\n")
+                .append(")\n")
+                .append("SELECT * FROM t2;")
+                .toString();
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .matches(
+                        logicalRecursiveCte(
+                                logicalProject(
+                                        logicalOneRowRelation(
+                                        )
+                                ),
+                                logicalRecursiveCteRecursiveChild(
+                                        logicalProject(
+                                                logicalProject(
+                                                        logicalJoin()
+                                                )
+                                        )
+                                )
+                        ).when(cte -> cte.getCteName().equals("t2"))
+                );
+    }
+
+    @Test
+    public void testRecCteMultipleUnion() {
+        String sql = new StringBuilder().append("with recursive t1 as 
(\n").append("    select\n")
+                .append("        1 as c1,\n").append("        1 as 
c2\n").append("),\n").append("t2 as (\n")
+                .append("    select\n").append("        2 as c1,\n").append("  
      2 as c2\n").append("),\n")
+                .append("xx as (\n").append("    select\n").append("        
c1,\n").append("        c2\n")
+                .append("    from\n").append("        t1\n").append("    
union\n").append("    select\n")
+                .append("        c1,\n").append("        c2\n").append("    
from\n").append("        t2\n")
+                .append("    union\n").append("    select\n").append("        
c1,\n").append("        c2\n")
+                .append("    from\n").append("        
xx\n").append(")\n").append("select\n").append("    *\n")
+                .append("from\n").append("    xx;").toString();
+        LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql);
+        StatementContext statementContext = new 
StatementContext(connectContext,
+                new OriginStatement(sql, 0));
+        NereidsPlanner planner = new NereidsPlanner(statementContext);
+        planner.planWithLock(unboundPlan, PhysicalProperties.ANY,
+                ExplainCommand.ExplainLevel.ANALYZED_PLAN);
+        MemoTestUtils.initMemoAndValidState(planner.getCascadesContext());
+        PlanChecker.from(planner.getCascadesContext()).matches(
+                logicalRecursiveCte(
+                        logicalProject(
+                                logicalUnion()),
+                        logicalRecursiveCteRecursiveChild()).when(cte -> 
cte.getCteName().equals("xx")));
+    }
+
 
     /* 
********************************************************************************************
      * Test CTE Exceptions
@@ -333,4 +423,62 @@ public class AnalyzeCTETest extends TestWithFeService 
implements MemoPatternMatc
                 () -> PlanChecker.from(connectContext).analyze(sql), "Not 
throw expected exception.");
         Assertions.assertTrue(exception.getMessage().contains("Table [cte2] 
does not exist in database"));
     }
+
+    @Test
+    public void testRecCteWithoutRecKeywordException() {
+        String sql = new StringBuilder()
+                .append("WITH t1 AS (\n")
+                .append("    SELECT 1 UNION ALL\n")
+                .append("    SELECT 2 FROM t1\n")
+                .append(")\n")
+                .append("SELECT * FROM t1;")
+                .toString();
+        LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql);
+        StatementContext statementContext = new 
StatementContext(connectContext,
+                new OriginStatement(sql, 0));
+        NereidsPlanner planner = new NereidsPlanner(statementContext);
+        AnalysisException exception = 
Assertions.assertThrows(AnalysisException.class,
+                () -> planner.planWithLock(unboundPlan, PhysicalProperties.ANY,
+                        ExplainCommand.ExplainLevel.ANALYZED_PLAN), "Not throw 
expected exception.");
+        Assertions.assertTrue(exception.getMessage().contains("Table [t1] does 
not exist in database"));
+    }
+
+    @Test
+    public void testRecCteDatatypeException() {
+        String sql = new StringBuilder().append("WITH RECURSIVE t1 AS 
(\n").append("    SELECT 1 AS number\n")
+                .append("UNION ALL\n").append("    SELECT number + 1 FROM t1 
WHERE number < 100\n").append(")\n")
+                .append("SELECT number FROM t1;").toString();
+        LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql);
+        StatementContext statementContext = new 
StatementContext(connectContext,
+                new OriginStatement(sql, 0));
+        NereidsPlanner planner = new NereidsPlanner(statementContext);
+        AnalysisException exception = 
Assertions.assertThrows(AnalysisException.class,
+                () -> planner.planWithLock(unboundPlan, PhysicalProperties.ANY,
+                        ExplainCommand.ExplainLevel.ANALYZED_PLAN),
+                "Not throw expected exception.");
+        Assertions.assertTrue(exception.getMessage().contains("please add cast 
manually to get expect datatype"));
+    }
+
+    @Test
+    public void testRecCteMultipleUnionException() {
+        String sql = new StringBuilder().append("with recursive t1 as 
(\n").append("    select\n")
+                .append("        1 as c1,\n").append("        1 as 
c2\n").append("),\n").append("t2 as (\n")
+                .append("    select\n").append("        2 as c1,\n").append("  
      2 as c2\n").append("),\n")
+                .append("xx as (\n").append("    select\n").append("        
c1,\n").append("        c2\n")
+                .append("    from\n").append("        t1\n").append("    
union\n").append("    select\n")
+                .append("        c1,\n").append("        c2\n").append("    
from\n").append("        xx\n")
+                .append("    union\n").append("    select\n").append("        
c1,\n").append("        c2\n")
+                .append("    from\n").append("        
t2\n").append(")\n").append("select\n").append("    *\n")
+                .append("from\n").append("    xx").toString();
+        LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql);
+        StatementContext statementContext = new 
StatementContext(connectContext,
+                new OriginStatement(sql, 0));
+        NereidsPlanner planner = new NereidsPlanner(statementContext);
+        AnalysisException exception = 
Assertions.assertThrows(AnalysisException.class,
+                () -> planner.planWithLock(unboundPlan, PhysicalProperties.ANY,
+                        ExplainCommand.ExplainLevel.ANALYZED_PLAN),
+                "Not throw expected exception.");
+        Assertions.assertTrue(exception.getMessage()
+                .contains("recursive reference to query xx must not appear 
within its non-recursive term"));
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CTEInlineTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CTEInlineTest.java
new file mode 100644
index 00000000000..0a0efc8b5db
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CTEInlineTest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.util.MemoPatternMatchSupported;
+import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Test;
+
+public class CTEInlineTest extends TestWithFeService implements 
MemoPatternMatchSupported {
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+        connectContext.setDatabase("test");
+    }
+
+    @Test
+    public void recCteInline() {
+        String sql = new StringBuilder().append("with recursive t1 as 
(\n").append("    select\n")
+                .append("        1 as c1,\n").append("        1 as 
c2\n").append("),\n").append("t2 as (\n")
+                .append("    select\n").append("        2 as c1,\n").append("  
      2 as c2\n").append("),\n")
+                .append("t3 as (\n").append("    select\n").append("        3 
as c1,\n").append("        3 as c2\n")
+                .append("),\n").append("xx as (\n").append("    
select\n").append("        c1,\n")
+                .append("        c2\n").append("    from\n").append("        
t1\n").append("    union\n")
+                .append("    select\n").append("        t2.c1,\n").append("    
    t2.c2\n").append("    from\n")
+                .append("        t2,\n").append("        xx\n").append("    
where\n").append("        t2.c1 = xx.c1\n")
+                .append("),\n").append("yy as (\n").append("    
select\n").append("        c1,\n")
+                .append("        c2\n").append("    from\n").append("        
t3\n").append("    union\n")
+                .append("    select\n").append("        t3.c1,\n").append("    
    t3.c2\n").append("    from\n")
+                .append("        t3,\n").append("        yy,\n").append("      
  xx\n").append("    where\n")
+                .append("        t3.c1 = yy.c1\n").append("        and t3.c2 = 
xx.c1\n").append(")\n")
+                .append("select\n").append("    
*\n").append("from\n").append("    yy y1,\n").append("    yy y2;")
+                .toString();
+        LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql);
+        StatementContext statementContext = new 
StatementContext(connectContext,
+                new OriginStatement(sql, 0));
+        NereidsPlanner planner = new NereidsPlanner(statementContext);
+        planner.planWithLock(unboundPlan, PhysicalProperties.ANY,
+                ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
+        MemoTestUtils.initMemoAndValidState(planner.getCascadesContext());
+        PlanChecker.from(planner.getCascadesContext()).matches(
+                logicalRecursiveCte(
+                        any(
+                        ),
+                        logicalRecursiveCteRecursiveChild(
+                                logicalProject(
+                                        logicalJoin(
+                                                any(),
+                                                logicalProject(
+                                                        logicalFilter(
+                                                                
logicalRecursiveCte().when(cte -> cte.getCteName().equals("xx"))
+                                                        )
+                                                )
+                                        )
+                                )
+                        )
+                ).when(cte -> cte.getCteName().equals("yy"))
+        );
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/ColumnPruningTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/ColumnPruningTest.java
index 12e9a1ad381..353f0c13863 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/ColumnPruningTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/ColumnPruningTest.java
@@ -17,15 +17,23 @@
 
 package org.apache.doris.nereids.rules.rewrite;
 
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.types.DoubleType;
 import org.apache.doris.nereids.types.TinyIntType;
 import org.apache.doris.nereids.util.MemoPatternMatchSupported;
+import org.apache.doris.nereids.util.MemoTestUtils;
 import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.utframe.TestWithFeService;
 
 import com.google.common.collect.ImmutableList;
@@ -328,6 +336,28 @@ public class ColumnPruningTest extends TestWithFeService 
implements MemoPatternM
                 );
     }
 
+    @Test
+    public void pruneRecCte() {
+        String sql = new StringBuilder().append("WITH RECURSIVE t1(col1, col2, 
col3) AS (\n")
+                .append("        SELECT 1, 1, 1\n").append("    UNION 
ALL\n").append("        SELECT 2, 2, 2\n")
+                .append("        FROM student, t1\n").append("        WHERE 
t1.col1 = student.id\n").append("    )\n")
+                .append("SELECT col1\n").append("FROM t1\n").append("WHERE 
col2 = 2;").toString();
+        LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql);
+        StatementContext statementContext = new 
StatementContext(connectContext,
+                new OriginStatement(sql, 0));
+        NereidsPlanner planner = new NereidsPlanner(statementContext);
+        planner.planWithLock(unboundPlan, PhysicalProperties.ANY,
+                ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
+        MemoTestUtils.initMemoAndValidState(planner.getCascadesContext());
+        PlanChecker.from(planner.getCascadesContext()).matches(
+                logicalProject(
+                        logicalFilter(
+                                logicalRecursiveCte().when(cte -> 
cte.getOutput().size() == 3)
+                        )
+                ).when(project -> project.getOutputs().size() == 1)
+        );
+    }
+
     private List<String> getOutputQualifiedNames(LogicalProject<? extends 
Plan> p) {
         return getOutputQualifiedNames(p.getOutputs());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to