morrySnow commented on code in PR #58916:
URL: https://github.com/apache/doris/pull/58916#discussion_r2667932953


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java:
##########
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.SystemIdGenerator;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+public class RecursiveCteTempTable extends Table {

Review Comment:
   why extends from Table? why need this?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCte is basically like LogicalUnion
+ */
+public class LogicalRecursiveCte extends AbstractLogicalPlan implements 
RecursiveCte, OutputPrunable {

Review Comment:
   The "CTE" should be capitalized to be consistent with "logicalCTE".



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle
+ */
+public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> 
extends LogicalUnary<CHILD_TYPE> {

Review Comment:
   we should give it a better name, for example: recursiveUnionProducer



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCte is basically like LogicalUnion
+ */
+public class LogicalRecursiveCte extends AbstractLogicalPlan implements 
RecursiveCte, OutputPrunable {

Review Comment:
   in pg, it named with `Recursive Union`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java:
##########
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCteScan.
+ */
+public class LogicalRecursiveCteScan extends LogicalCatalogRelation {

Review Comment:
   在pg中称为 WorkTableReference



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCte is basically like LogicalUnion
+ */
+public class LogicalRecursiveCte extends AbstractLogicalPlan implements 
RecursiveCte, OutputPrunable {
+    private final String cteName;
+    private final List<NamedExpression> outputs;
+    private final List<List<SlotReference>> regularChildrenOutputs;
+    private final boolean isUnionAll;

Review Comment:
   isUnionAll -> reuse Qualifier in set operation



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java:
##########
@@ -78,6 +82,7 @@ public LogicalSubQueryAlias(List<String> qualifier, 
Optional<List<String>> colum
         super(PlanType.LOGICAL_SUBQUERY_ALIAS, groupExpression, 
logicalProperties, child);
         this.qualifier = 
ImmutableList.copyOf(Objects.requireNonNull(qualifier, "qualifier is null"));
         this.columnAliases = columnAliases;
+        this.isRecursiveCte = computeIsRecursiveCte();

Review Comment:
   why we need isRecursiveCte in this class?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -1037,6 +1042,28 @@ public PlanFragment 
visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
         return planFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan 
recursiveCteScan,
+            PlanTranslatorContext context) {
+        TableIf table = recursiveCteScan.getTable();
+        List<Slot> slots = ImmutableList.copyOf(recursiveCteScan.getOutput());
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, 
context);
+
+        RecursiveCteScanNode scanNode = new RecursiveCteScanNode(table != null 
? table.getName() : "",
+                context.nextPlanNodeId(), tupleDescriptor);
+        scanNode.setNereidsId(recursiveCteScan.getId());
+        context.getNereidsIdToPlanNodeIdMap().put(recursiveCteScan.getId(), 
scanNode.getId());
+        Utils.execWithUncheckedException(scanNode::initScanRangeLocations);
+
+        translateRuntimeFilter(recursiveCteScan, scanNode, context);
+
+        context.addScanNode(scanNode, recursiveCteScan);

Review Comment:
   i don't think it should be add to scan node. it is not event a real scan node



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTE.java:
##########
@@ -41,21 +41,28 @@
 public class LogicalCTE<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> implements PropagateFuncDeps {
 
     private final List<LogicalSubQueryAlias<Plan>> aliasQueries;
+    private final boolean isRecursiveCte;

Review Comment:
   ```suggestion
       private final boolean isRecursive;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java:
##########
@@ -1127,6 +1127,8 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
 
     private final Map<Integer, ParserRuleContext> selectHintMap;
 
+    private boolean isInRecursiveCteContext = false;

Review Comment:
   add comment to explain why need this flag



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2179,6 +2208,78 @@ public PlanFragment 
visitPhysicalProject(PhysicalProject<? extends Plan> project
         return inputFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, PlanTranslatorContext context) {
+        List<PlanFragment> childrenFragments = new ArrayList<>();
+        for (Plan plan : recursiveCte.children()) {
+            childrenFragments.add(plan.accept(this, context));
+        }
+
+        TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), 
null, context);
+        List<SlotDescriptor> outputSlotDescs = new 
ArrayList<>(setTuple.getSlots());
+
+        RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
+                    recursiveCte.getCteName(), recursiveCte.isUnionAll());
+        List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
+        recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
+        recursiveCteNode.setNereidsId(recursiveCte.getId());
+        List<List<Expression>> resultExpressionLists = Lists.newArrayList();
+        context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), 
recursiveCteNode.getId());
+        for (List<SlotReference> regularChildrenOutput : 
recursiveCte.getRegularChildrenOutputs()) {
+            resultExpressionLists.add(new ArrayList<>(regularChildrenOutput));
+        }
+
+        for (PlanFragment childFragment : childrenFragments) {
+            recursiveCteNode.addChild(childFragment.getPlanRoot());
+        }
+
+        List<List<Expr>> materializedResultExprLists = Lists.newArrayList();
+        for (int i = 0; i < resultExpressionLists.size(); ++i) {
+            List<Expression> resultExpressionList = 
resultExpressionLists.get(i);
+            List<Expr> exprList = Lists.newArrayList();
+            Preconditions.checkState(resultExpressionList.size() == 
outputSlotDescs.size());
+            for (int j = 0; j < resultExpressionList.size(); ++j) {
+                
exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), 
context));
+                // TODO: reconsider this, we may change nullable info in 
previous nereids rules not here.
+                outputSlotDescs.get(j)
+                        .setIsNullable(outputSlotDescs.get(j).getIsNullable() 
|| exprList.get(j).isNullable());
+            }
+            materializedResultExprLists.add(exprList);
+        }
+        
recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists);
+        
Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size()
+                == recursiveCteNode.getChildren().size());
+
+        PlanFragment recursiveCteFragment;
+        if (childrenFragments.isEmpty()) {

Review Comment:
   why childrenFragments could be empty here? i think it could not happen



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2179,6 +2208,78 @@ public PlanFragment 
visitPhysicalProject(PhysicalProject<? extends Plan> project
         return inputFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, PlanTranslatorContext context) {
+        List<PlanFragment> childrenFragments = new ArrayList<>();
+        for (Plan plan : recursiveCte.children()) {
+            childrenFragments.add(plan.accept(this, context));
+        }
+
+        TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), 
null, context);
+        List<SlotDescriptor> outputSlotDescs = new 
ArrayList<>(setTuple.getSlots());
+
+        RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
+                    recursiveCte.getCteName(), recursiveCte.isUnionAll());
+        List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
+        recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
+        recursiveCteNode.setNereidsId(recursiveCte.getId());
+        List<List<Expression>> resultExpressionLists = Lists.newArrayList();
+        context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), 
recursiveCteNode.getId());
+        for (List<SlotReference> regularChildrenOutput : 
recursiveCte.getRegularChildrenOutputs()) {
+            resultExpressionLists.add(new ArrayList<>(regularChildrenOutput));
+        }
+
+        for (PlanFragment childFragment : childrenFragments) {
+            recursiveCteNode.addChild(childFragment.getPlanRoot());
+        }
+
+        List<List<Expr>> materializedResultExprLists = Lists.newArrayList();
+        for (int i = 0; i < resultExpressionLists.size(); ++i) {
+            List<Expression> resultExpressionList = 
resultExpressionLists.get(i);
+            List<Expr> exprList = Lists.newArrayList();
+            Preconditions.checkState(resultExpressionList.size() == 
outputSlotDescs.size());
+            for (int j = 0; j < resultExpressionList.size(); ++j) {
+                
exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), 
context));
+                // TODO: reconsider this, we may change nullable info in 
previous nereids rules not here.
+                outputSlotDescs.get(j)
+                        .setIsNullable(outputSlotDescs.get(j).getIsNullable() 
|| exprList.get(j).isNullable());
+            }
+            materializedResultExprLists.add(exprList);
+        }
+        
recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists);
+        
Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size()
+                == recursiveCteNode.getChildren().size());
+
+        PlanFragment recursiveCteFragment;
+        if (childrenFragments.isEmpty()) {
+            recursiveCteFragment = createPlanFragment(recursiveCteNode,
+                    DataPartition.UNPARTITIONED, recursiveCte);
+            context.addPlanFragment(recursiveCteFragment);
+        } else {
+            int childrenSize = childrenFragments.size();
+            recursiveCteFragment = childrenFragments.get(childrenSize - 1);
+            for (int i = childrenSize - 2; i >= 0; i--) {
+                context.mergePlanFragment(childrenFragments.get(i), 
recursiveCteFragment);
+                for (PlanFragment child : 
childrenFragments.get(i).getChildren()) {
+                    recursiveCteFragment.addChild(child);
+                }

Review Comment:
   if we merged one fragment, we should not add it into children list



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());

Review Comment:
   why need cte name when analyze anchor side?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable
+        analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild,

Review Comment:
   why change output slot name here?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable

Review Comment:
   add comment to explain why need forceNullable



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));

Review Comment:
   if also check children's size, but error message do not mention this check



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {

Review Comment:
   ```suggestion
           if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.arity() != 2) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable
+        analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild,
+                aliasQuery.getColumnAliases().orElse(ImmutableList.of()));
+        // analyze recursive child
+        LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1);
+        CascadesContext innerRecursiveCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, recursiveChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()),
+                analyzedAnchorChild.getOutput());
+        
innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () 
-> {
+            innerRecursiveCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedRecursiveChild = (LogicalPlan) 
innerRecursiveCascadesCtx.getRewritePlan();
+        List<LogicalRecursiveCteScan> recursiveCteScanList = 
analyzedRecursiveChild
+                .collectToList(LogicalRecursiveCteScan.class::isInstance);
+        if (recursiveCteScanList.size() > 1) {
+            throw new AnalysisException(String.format("recursive reference to 
query %s must not appear more than once",
+                    aliasQuery.getAlias()));
+        }

Review Comment:
   we do not support nested recursive cte in recursive side?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -171,16 +173,32 @@ private LogicalPlan bindWithCurrentDb(CascadesContext 
cascadesContext, UnboundRe
                 return consumer;
             }
         }
-        List<String> tableQualifier = RelationUtil.getQualifierName(
-                cascadesContext.getConnectContext(), 
unboundRelation.getNameParts());
-        TableIf table = 
cascadesContext.getStatementContext().getAndCacheTable(tableQualifier, 
TableFrom.QUERY,
-                Optional.of(unboundRelation));
+        LogicalPlan scan;
+        if 
(tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().orElse("")))
 {

Review Comment:
   should not use equalsIgnoreCase here, lower_case_table_names also effect cte 
binding, so we should use `org.apache.doris.nereids.CTEContext#findCTEContext` 
here. why not reuse normal cte binding code?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java:
##########
@@ -107,8 +107,10 @@ private CTEContext collectFromCte(
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
             LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+            // 看起来需要在CascadesContext中添加当前CTE的name,以便判断自引用

Review Comment:
   use english please



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java:
##########
@@ -403,6 +410,54 @@ public <P extends Plan> P pruneOutput(P plan, 
List<NamedExpression> originOutput
         }
     }
 
+    private LogicalRecursiveCte pruneRecursiveCteOutput(LogicalRecursiveCte 
recursiveCte, PruneContext context) {

Review Comment:
   remove this function, it useless



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2165,8 +2192,10 @@ public PlanFragment 
visitPhysicalProject(PhysicalProject<? extends Plan> project
             if (inputPlanNode instanceof OlapScanNode) {
                 ((OlapScanNode) inputPlanNode).updateRequiredSlots(context, 
requiredByProjectSlotIdSet);
             }
-            updateScanSlotsMaterialization((ScanNode) inputPlanNode, 
requiredSlotIdSet,
-                    requiredByProjectSlotIdSet, context);
+            if (!(inputPlanNode instanceof RecursiveCteScanNode)) {
+                updateScanSlotsMaterialization((ScanNode) inputPlanNode, 
requiredSlotIdSet,
+                        requiredByProjectSlotIdSet, context);

Review Comment:
   if RecursiveCteScanNode do not extends from ScanNode, we do not need to add 
this if statement



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable
+        analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild,
+                aliasQuery.getColumnAliases().orElse(ImmutableList.of()));
+        // analyze recursive child
+        LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1);
+        CascadesContext innerRecursiveCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, recursiveChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()),
+                analyzedAnchorChild.getOutput());
+        
innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () 
-> {
+            innerRecursiveCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedRecursiveChild = (LogicalPlan) 
innerRecursiveCascadesCtx.getRewritePlan();
+        List<LogicalRecursiveCteScan> recursiveCteScanList = 
analyzedRecursiveChild
+                .collectToList(LogicalRecursiveCteScan.class::isInstance);
+        if (recursiveCteScanList.size() > 1) {
+            throw new AnalysisException(String.format("recursive reference to 
query %s must not appear more than once",
+                    aliasQuery.getAlias()));
+        }
+        List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput();
+        List<DataType> anchorChildOutputTypes = new 
ArrayList<>(anchorChildOutputs.size());
+        for (Slot slot : anchorChildOutputs) {
+            anchorChildOutputTypes.add(slot.getDataType());
+        }
+        List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput();
+        for (int i = 0; i < recursiveChildOutputs.size(); ++i) {

Review Comment:
   could out of index? if recursiveChildOutputs.size() < 
analyzedAnchorChild.size()



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable
+        analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild,
+                aliasQuery.getColumnAliases().orElse(ImmutableList.of()));
+        // analyze recursive child
+        LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1);
+        CascadesContext innerRecursiveCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, recursiveChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()),
+                analyzedAnchorChild.getOutput());
+        
innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () 
-> {
+            innerRecursiveCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedRecursiveChild = (LogicalPlan) 
innerRecursiveCascadesCtx.getRewritePlan();
+        List<LogicalRecursiveCteScan> recursiveCteScanList = 
analyzedRecursiveChild
+                .collectToList(LogicalRecursiveCteScan.class::isInstance);
+        if (recursiveCteScanList.size() > 1) {
+            throw new AnalysisException(String.format("recursive reference to 
query %s must not appear more than once",
+                    aliasQuery.getAlias()));
+        }
+        List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput();
+        List<DataType> anchorChildOutputTypes = new 
ArrayList<>(anchorChildOutputs.size());
+        for (Slot slot : anchorChildOutputs) {
+            anchorChildOutputTypes.add(slot.getDataType());
+        }
+        List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput();
+        for (int i = 0; i < recursiveChildOutputs.size(); ++i) {
+            if 
(!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i)))
 {
+                throw new AnalysisException(String.format("%s recursive 
child's %d column's datatype in select list %s "
+                                + "is different from anchor child's output 
datatype %s, please add cast manually "
+                                + "to get expect datatype", 
aliasQuery.getAlias(), i + 1,
+                        recursiveChildOutputs.get(i).getDataType(), 
anchorChildOutputTypes.get(i)));
+            }
+        }
+        analyzedRecursiveChild = new 
LogicalRecursiveCteRecursiveChild<>(aliasQuery.getAlias(),
+                forceOutputNullable(analyzedRecursiveChild, 
ImmutableList.of()));
+
+        // create LogicalRecursiveCte
+        LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan;
+        LogicalRecursiveCte analyzedCtePlan = new 
LogicalRecursiveCte(aliasQuery.getAlias(),
+                logicalUnion.getQualifier() == SetOperation.Qualifier.ALL,
+                ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild));
+        List<List<NamedExpression>> childrenProjections = 
analyzedCtePlan.collectChildrenProjections();
+        int childrenProjectionSize = childrenProjections.size();

Review Comment:
   childrenProjectionSize always be 2? if so, builderWithExpectedSize is 
unnecessary



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable
+        analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild,
+                aliasQuery.getColumnAliases().orElse(ImmutableList.of()));
+        // analyze recursive child
+        LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1);
+        CascadesContext innerRecursiveCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, recursiveChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()),
+                analyzedAnchorChild.getOutput());
+        
innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () 
-> {
+            innerRecursiveCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedRecursiveChild = (LogicalPlan) 
innerRecursiveCascadesCtx.getRewritePlan();
+        List<LogicalRecursiveCteScan> recursiveCteScanList = 
analyzedRecursiveChild
+                .collectToList(LogicalRecursiveCteScan.class::isInstance);
+        if (recursiveCteScanList.size() > 1) {
+            throw new AnalysisException(String.format("recursive reference to 
query %s must not appear more than once",
+                    aliasQuery.getAlias()));
+        }
+        List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput();
+        List<DataType> anchorChildOutputTypes = new 
ArrayList<>(anchorChildOutputs.size());
+        for (Slot slot : anchorChildOutputs) {
+            anchorChildOutputTypes.add(slot.getDataType());
+        }
+        List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput();
+        for (int i = 0; i < recursiveChildOutputs.size(); ++i) {
+            if 
(!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i)))
 {
+                throw new AnalysisException(String.format("%s recursive 
child's %d column's datatype in select list %s "
+                                + "is different from anchor child's output 
datatype %s, please add cast manually "
+                                + "to get expect datatype", 
aliasQuery.getAlias(), i + 1,
+                        recursiveChildOutputs.get(i).getDataType(), 
anchorChildOutputTypes.get(i)));
+            }
+        }
+        analyzedRecursiveChild = new 
LogicalRecursiveCteRecursiveChild<>(aliasQuery.getAlias(),
+                forceOutputNullable(analyzedRecursiveChild, 
ImmutableList.of()));
+
+        // create LogicalRecursiveCte
+        LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan;
+        LogicalRecursiveCte analyzedCtePlan = new 
LogicalRecursiveCte(aliasQuery.getAlias(),
+                logicalUnion.getQualifier() == SetOperation.Qualifier.ALL,
+                ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild));
+        List<List<NamedExpression>> childrenProjections = 
analyzedCtePlan.collectChildrenProjections();

Review Comment:
   i don't think recursive cte need this step, because it used to do cast 
before set operation, but all output of children of recursive cte are excactly 
same, so we do not need to do cast.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java:
##########
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * PhysicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle
+ */
+public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHILD_TYPE> {
+    private final String cteName;
+
+    public PhysicalRecursiveCteRecursiveChild(String cteName, 
LogicalProperties logicalProperties, CHILD_TYPE child) {
+        this(cteName, Optional.empty(), logicalProperties, child);
+    }
+
+    public PhysicalRecursiveCteRecursiveChild(String cteName, 
Optional<GroupExpression> groupExpression,
+            LogicalProperties logicalProperties, CHILD_TYPE child) {
+        this(cteName, groupExpression, logicalProperties, 
PhysicalProperties.ANY, null, child);
+    }
+
+    public PhysicalRecursiveCteRecursiveChild(String cteName, 
Optional<GroupExpression> groupExpression,
+            LogicalProperties logicalProperties, @Nullable PhysicalProperties 
physicalProperties, Statistics statistics,
+            CHILD_TYPE child) {
+        super(PlanType.PHYSICAL_RECURSIVE_CTE_RECURSIVE_CHILD, 
groupExpression, logicalProperties, physicalProperties,
+                statistics, child);
+        this.cteName = cteName;
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlStringSkipNull("PhysicalRecursiveCteRecursiveChild",
+                "cteName", cteName);
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1);
+        return new PhysicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression, getLogicalProperties(),
+                children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalRecursiveCteRecursiveChild(this, context);
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return ImmutableList.of();
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new PhysicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression, getLogicalProperties(), child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        Preconditions.checkArgument(children.size() == 1);
+        return new PhysicalRecursiveCteRecursiveChild<>(cteName, 
groupExpression, logicalProperties.get(), child());
+    }
+
+    @Override
+    public void computeUnique(DataTrait.Builder builder) {
+
+    }
+
+    @Override
+    public void computeUniform(DataTrait.Builder builder) {
+
+    }
+
+    @Override
+    public void computeEqualSet(DataTrait.Builder builder) {
+

Review Comment:
   i'm confused if this node do not impl any data traits functions, why 
PhysicalRecursiveCte need to implement these functions?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java:
##########
@@ -876,6 +883,12 @@ public Statistics visitLogicalEsScan(LogicalEsScan esScan, 
Void context) {
         return computeCatalogRelation(esScan);
     }
 
+    @Override
+    public Statistics visitLogicalRecursiveCteScan(LogicalRecursiveCteScan 
recursiveCteScan, Void context) {
+        recursiveCteScan.getExpressions();
+        return computeCatalogRelation(recursiveCteScan);

Review Comment:
   why reuse compute catalog relation? it does not make any sense



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java:
##########
@@ -113,4 +128,19 @@ public Plan visitLogicalCTEConsumer(LogicalCTEConsumer 
cteConsumer, LogicalCTEPr
         }
         return cteConsumer;
     }
+
+    private void collectMustInlineCteConsumers(Plan planNode, boolean 
needCollect,
+            Set<LogicalCTEConsumer> cteConsumers) {
+        if (planNode instanceof LogicalCTEConsumer) {
+            if (needCollect) {
+                cteConsumers.add((LogicalCTEConsumer) planNode);
+            }
+        } else if (planNode instanceof LogicalRecursiveCteRecursiveChild) {
+            collectMustInlineCteConsumers(planNode.child(0), true, 
cteConsumers);

Review Comment:
   why need this collector? i think we could add a recursive flag on CteAnchor 
or somewhere else to process it easily



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle
+ */
+public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> 
extends LogicalUnary<CHILD_TYPE> {

Review Comment:
   should implement hashCode and equals



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable
+        analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild,
+                aliasQuery.getColumnAliases().orElse(ImmutableList.of()));
+        // analyze recursive child
+        LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1);
+        CascadesContext innerRecursiveCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, recursiveChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()),
+                analyzedAnchorChild.getOutput());

Review Comment:
   cte scan node read data from where? union output? recursive cte node output?



##########
fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java:
##########
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+// Full scan of recursive cte temp table
+public class RecursiveCteScanNode extends ScanNode {

Review Comment:
   i don't think it should impl scannode



##########
fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java:
##########
@@ -576,6 +595,130 @@ private static void 
filterInstancesWhichReceiveDataFromRemote(
         }
     }
 
+    private static Set<Integer> 
setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans,

Review Comment:
   this function is too long, please add some comment to it to explain how it 
is work



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCte is basically like LogicalUnion
+ */
+public class LogicalRecursiveCte extends AbstractLogicalPlan implements 
RecursiveCte, OutputPrunable {
+    private final String cteName;
+    private final List<NamedExpression> outputs;
+    private final List<List<SlotReference>> regularChildrenOutputs;
+    private final boolean isUnionAll;
+
+    /** LogicalRecursiveCte */
+    public LogicalRecursiveCte(String cteName, boolean isUnionAll, List<Plan> 
children) {
+        this(cteName, isUnionAll, ImmutableList.of(), ImmutableList.of(), 
children);
+    }
+
+    /** LogicalRecursiveCte */
+    public LogicalRecursiveCte(String cteName, boolean isUnionAll, 
List<NamedExpression> outputs,
+            List<List<SlotReference>> childrenOutputs, List<Plan> children) {
+        this(cteName, isUnionAll, outputs, childrenOutputs, Optional.empty(),
+                Optional.empty(),
+                children);
+    }
+
+    /** LogicalRecursiveCte */
+    public LogicalRecursiveCte(String cteName, boolean isUnionAll, 
List<NamedExpression> outputs,
+            List<List<SlotReference>> childrenOutputs,
+            Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
+            List<Plan> children) {
+        super(PlanType.LOGICAL_RECURSIVE_CTE, groupExpression, 
logicalProperties, children);
+        this.cteName = cteName;
+        this.isUnionAll = isUnionAll;
+        this.outputs = ImmutableList.copyOf(outputs);
+        this.regularChildrenOutputs = ImmutableList.copyOf(childrenOutputs);
+    }
+
+    @Override
+    public boolean isUnionAll() {
+        return isUnionAll;
+    }
+
+    public String getCteName() {
+        return cteName;
+    }
+
+    @Override
+    public List<SlotReference> getRegularChildOutput(int i) {
+        return regularChildrenOutputs.get(i);
+    }
+
+    @Override
+    public List<List<SlotReference>> getRegularChildrenOutputs() {
+        return regularChildrenOutputs;
+    }
+
+    public List<List<NamedExpression>> collectChildrenProjections() {
+        return castCommonDataTypeOutputs();
+    }
+
+    private List<List<NamedExpression>> castCommonDataTypeOutputs() {
+        int childOutputSize = child(0).getOutput().size();
+        ImmutableList.Builder<NamedExpression> newLeftOutputs = 
ImmutableList.builderWithExpectedSize(
+                childOutputSize);
+        ImmutableList.Builder<NamedExpression> newRightOutputs = 
ImmutableList.builderWithExpectedSize(
+                childOutputSize
+        );
+        // Ensure that the output types of the left and right children are 
consistent and expand upward.
+        for (int i = 0; i < childOutputSize; ++i) {
+            Slot left = child(0).getOutput().get(i);
+            Slot right = child(1).getOutput().get(i);
+            DataType compatibleType;
+            try {
+                compatibleType = 
LogicalSetOperation.getAssignmentCompatibleType(left.getDataType(),
+                        right.getDataType());
+            } catch (Exception e) {
+                throw new AnalysisException(
+                        "Can not find compatible type for " + left + " and " + 
right + ", " + e.getMessage());
+            }
+            Expression newLeft = 
TypeCoercionUtils.castIfNotSameTypeStrict(left, compatibleType);
+            Expression newRight = 
TypeCoercionUtils.castIfNotSameTypeStrict(right, compatibleType);
+            if (newLeft instanceof Cast) {
+                newLeft = new Alias(newLeft, left.getName());
+            }
+            if (newRight instanceof Cast) {
+                newRight = new Alias(newRight, right.getName());
+            }
+            newLeftOutputs.add((NamedExpression) newLeft);
+            newRightOutputs.add((NamedExpression) newRight);
+        }
+
+        return ImmutableList.of(newLeftOutputs.build(), 
newRightOutputs.build());
+    }
+
+    /**
+     * Generate new output for Recursive Cte.
+     */
+    public List<NamedExpression> buildNewOutputs() {
+        List<Slot> slots = resetNullableForLeftOutputs();
+        ImmutableList.Builder<NamedExpression> newOutputs = 
ImmutableList.builderWithExpectedSize(slots.size());
+
+        for (int i = 0; i < slots.size(); i++) {
+            Slot slot = slots.get(i);
+            ExprId exprId = i < outputs.size() ? outputs.get(i).getExprId() : 
StatementScopeIdGenerator.newExprId();
+            newOutputs.add(
+                    new SlotReference(exprId, slot.toSql(), 
slot.getDataType(), slot.nullable(), ImmutableList.of())
+            );
+        }
+        return newOutputs.build();
+    }
+
+    // If the right child is nullable, need to ensure that the left child is 
also nullable
+    private List<Slot> resetNullableForLeftOutputs() {
+        int rightChildOutputSize = child(1).getOutput().size();
+        ImmutableList.Builder<Slot> resetNullableForLeftOutputs
+                = ImmutableList.builderWithExpectedSize(rightChildOutputSize);
+        for (int i = 0; i < rightChildOutputSize; ++i) {
+            if (child(1).getOutput().get(i).nullable() && 
!child(0).getOutput().get(i).nullable()) {
+                
resetNullableForLeftOutputs.add(child(0).getOutput().get(i).withNullable(true));
+            } else {
+                resetNullableForLeftOutputs.add(child(0).getOutput().get(i));
+            }
+        }
+        return resetNullableForLeftOutputs.build();
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlStringSkipNull("LogicalRecursiveCte",
+                "cteName", cteName,
+                "isUnionAll", isUnionAll,
+                "outputs", outputs,
+                "regularChildrenOutputs", regularChildrenOutputs,
+                "stats", statistics);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LogicalRecursiveCte that = (LogicalRecursiveCte) o;
+        return cteName.equals(that.cteName) && isUnionAll == that.isUnionAll 
&& Objects.equals(outputs, that.outputs)
+                && Objects.equals(regularChildrenOutputs, 
that.regularChildrenOutputs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(cteName, isUnionAll, outputs, 
regularChildrenOutputs);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalRecursiveCte(this, context);
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return 
regularChildrenOutputs.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList());
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        return outputs.stream()
+                .map(NamedExpression::toSlot)
+                .collect(ImmutableList.toImmutableList());
+    }
+
+    @Override
+    public LogicalRecursiveCte withChildren(List<Plan> children) {
+        return new LogicalRecursiveCte(cteName, isUnionAll, outputs, 
regularChildrenOutputs, children);
+    }
+
+    public LogicalRecursiveCte withChildrenAndTheirOutputs(List<Plan> children,
+            List<List<SlotReference>> childrenOutputs) {
+        Preconditions.checkArgument(children.size() == childrenOutputs.size(),
+                "children size %s is not equals with children outputs size %s",
+                children.size(), childrenOutputs.size());
+        return new LogicalRecursiveCte(cteName, isUnionAll, outputs, 
childrenOutputs, children);
+    }
+
+    @Override
+    public LogicalRecursiveCte withGroupExpression(Optional<GroupExpression> 
groupExpression) {
+        return new LogicalRecursiveCte(cteName, isUnionAll, outputs, 
regularChildrenOutputs,
+                groupExpression, Optional.of(getLogicalProperties()), 
children);
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new LogicalRecursiveCte(cteName, isUnionAll, outputs, 
regularChildrenOutputs,
+                groupExpression, logicalProperties, children);
+    }
+
+    public LogicalRecursiveCte withNewOutputs(List<NamedExpression> 
newOutputs) {
+        return new LogicalRecursiveCte(cteName, isUnionAll, newOutputs, 
regularChildrenOutputs,
+                Optional.empty(), Optional.empty(), children);
+    }
+
+    public LogicalRecursiveCte withNewOutputsAndChildren(List<NamedExpression> 
newOutputs,
+                                                         List<Plan> children,
+                                                         
List<List<SlotReference>> childrenOutputs) {
+        return new LogicalRecursiveCte(cteName, isUnionAll, newOutputs, 
childrenOutputs,
+                Optional.empty(), Optional.empty(), children);
+    }
+
+    @Override
+    public List<NamedExpression> getOutputs() {
+        return outputs;
+    }
+
+    @Override
+    public LogicalRecursiveCte pruneOutputs(List<NamedExpression> 
prunedOutputs) {
+        return withNewOutputs(prunedOutputs);
+    }
+
+    @Override
+    public void computeUnique(DataTrait.Builder builder) {
+    }

Review Comment:
   it is not same with Physical one, so which one is right?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -318,6 +319,16 @@ public Void visitPhysicalUnion(PhysicalUnion union, 
PlanContext context) {
         return null;
     }
 
+    @Override
+    public Void visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, 
PlanContext context) {

Review Comment:
   recursiveCte should implement LogicalBinary and generate a request list with 
two element directly



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -150,6 +153,11 @@ public PhysicalProperties 
visitPhysicalFileScan(PhysicalFileScan fileScan, PlanC
         return PhysicalProperties.STORAGE_ANY;
     }
 
+    @Override
+    public PhysicalProperties 
visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan cteScan, PlanContext 
context) {
+        return PhysicalProperties.ANY;

Review Comment:
   if this is any, we should not do compute scan range for CteScanNode when 
translator, and let coordinator choose a random BE



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2179,6 +2208,78 @@ public PlanFragment 
visitPhysicalProject(PhysicalProject<? extends Plan> project
         return inputFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, PlanTranslatorContext context) {
+        List<PlanFragment> childrenFragments = new ArrayList<>();
+        for (Plan plan : recursiveCte.children()) {
+            childrenFragments.add(plan.accept(this, context));
+        }
+
+        TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), 
null, context);
+        List<SlotDescriptor> outputSlotDescs = new 
ArrayList<>(setTuple.getSlots());
+
+        RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
+                    recursiveCte.getCteName(), recursiveCte.isUnionAll());
+        List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));

Review Comment:
   should generate distributeExpr before generateTupleDesc



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2179,6 +2208,78 @@ public PlanFragment 
visitPhysicalProject(PhysicalProject<? extends Plan> project
         return inputFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, PlanTranslatorContext context) {
+        List<PlanFragment> childrenFragments = new ArrayList<>();
+        for (Plan plan : recursiveCte.children()) {
+            childrenFragments.add(plan.accept(this, context));
+        }
+
+        TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), 
null, context);
+        List<SlotDescriptor> outputSlotDescs = new 
ArrayList<>(setTuple.getSlots());
+
+        RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
+                    recursiveCte.getCteName(), recursiveCte.isUnionAll());
+        List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
+        recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
+        recursiveCteNode.setNereidsId(recursiveCte.getId());
+        List<List<Expression>> resultExpressionLists = Lists.newArrayList();
+        context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), 
recursiveCteNode.getId());
+        for (List<SlotReference> regularChildrenOutput : 
recursiveCte.getRegularChildrenOutputs()) {
+            resultExpressionLists.add(new ArrayList<>(regularChildrenOutput));
+        }
+
+        for (PlanFragment childFragment : childrenFragments) {
+            recursiveCteNode.addChild(childFragment.getPlanRoot());
+        }
+
+        List<List<Expr>> materializedResultExprLists = Lists.newArrayList();
+        for (int i = 0; i < resultExpressionLists.size(); ++i) {
+            List<Expression> resultExpressionList = 
resultExpressionLists.get(i);
+            List<Expr> exprList = Lists.newArrayList();
+            Preconditions.checkState(resultExpressionList.size() == 
outputSlotDescs.size());
+            for (int j = 0; j < resultExpressionList.size(); ++j) {
+                
exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), 
context));
+                // TODO: reconsider this, we may change nullable info in 
previous nereids rules not here.
+                outputSlotDescs.get(j)
+                        .setIsNullable(outputSlotDescs.get(j).getIsNullable() 
|| exprList.get(j).isNullable());
+            }
+            materializedResultExprLists.add(exprList);
+        }
+        
recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists);
+        
Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size()
+                == recursiveCteNode.getChildren().size());
+
+        PlanFragment recursiveCteFragment;
+        if (childrenFragments.isEmpty()) {
+            recursiveCteFragment = createPlanFragment(recursiveCteNode,
+                    DataPartition.UNPARTITIONED, recursiveCte);
+            context.addPlanFragment(recursiveCteFragment);
+        } else {
+            int childrenSize = childrenFragments.size();
+            recursiveCteFragment = childrenFragments.get(childrenSize - 1);
+            for (int i = childrenSize - 2; i >= 0; i--) {
+                context.mergePlanFragment(childrenFragments.get(i), 
recursiveCteFragment);

Review Comment:
   why need merge two children? i think only anchor side could be merged?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2179,6 +2208,78 @@ public PlanFragment 
visitPhysicalProject(PhysicalProject<? extends Plan> project
         return inputFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, PlanTranslatorContext context) {
+        List<PlanFragment> childrenFragments = new ArrayList<>();
+        for (Plan plan : recursiveCte.children()) {
+            childrenFragments.add(plan.accept(this, context));
+        }
+
+        TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), 
null, context);
+        List<SlotDescriptor> outputSlotDescs = new 
ArrayList<>(setTuple.getSlots());
+
+        RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
+                    recursiveCte.getCteName(), recursiveCte.isUnionAll());
+        List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
+        recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
+        recursiveCteNode.setNereidsId(recursiveCte.getId());
+        List<List<Expression>> resultExpressionLists = Lists.newArrayList();
+        context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), 
recursiveCteNode.getId());
+        for (List<SlotReference> regularChildrenOutput : 
recursiveCte.getRegularChildrenOutputs()) {
+            resultExpressionLists.add(new ArrayList<>(regularChildrenOutput));
+        }
+
+        for (PlanFragment childFragment : childrenFragments) {
+            recursiveCteNode.addChild(childFragment.getPlanRoot());
+        }
+
+        List<List<Expr>> materializedResultExprLists = Lists.newArrayList();
+        for (int i = 0; i < resultExpressionLists.size(); ++i) {
+            List<Expression> resultExpressionList = 
resultExpressionLists.get(i);
+            List<Expr> exprList = Lists.newArrayList();
+            Preconditions.checkState(resultExpressionList.size() == 
outputSlotDescs.size());
+            for (int j = 0; j < resultExpressionList.size(); ++j) {
+                
exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), 
context));
+                // TODO: reconsider this, we may change nullable info in 
previous nereids rules not here.
+                outputSlotDescs.get(j)
+                        .setIsNullable(outputSlotDescs.get(j).getIsNullable() 
|| exprList.get(j).isNullable());

Review Comment:
   must move to nereids, i think if u impl computeOutput in LogicalRecursiveCte 
in right way, the nullable should be correct when generate output tuple



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2179,6 +2208,78 @@ public PlanFragment 
visitPhysicalProject(PhysicalProject<? extends Plan> project
         return inputFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, PlanTranslatorContext context) {
+        List<PlanFragment> childrenFragments = new ArrayList<>();
+        for (Plan plan : recursiveCte.children()) {
+            childrenFragments.add(plan.accept(this, context));
+        }
+
+        TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), 
null, context);
+        List<SlotDescriptor> outputSlotDescs = new 
ArrayList<>(setTuple.getSlots());
+
+        RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
+                    recursiveCte.getCteName(), recursiveCte.isUnionAll());
+        List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
+        recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
+        recursiveCteNode.setNereidsId(recursiveCte.getId());
+        List<List<Expression>> resultExpressionLists = Lists.newArrayList();
+        context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), 
recursiveCteNode.getId());
+        for (List<SlotReference> regularChildrenOutput : 
recursiveCte.getRegularChildrenOutputs()) {
+            resultExpressionLists.add(new ArrayList<>(regularChildrenOutput));
+        }
+
+        for (PlanFragment childFragment : childrenFragments) {
+            recursiveCteNode.addChild(childFragment.getPlanRoot());
+        }
+
+        List<List<Expr>> materializedResultExprLists = Lists.newArrayList();
+        for (int i = 0; i < resultExpressionLists.size(); ++i) {
+            List<Expression> resultExpressionList = 
resultExpressionLists.get(i);
+            List<Expr> exprList = Lists.newArrayList();
+            Preconditions.checkState(resultExpressionList.size() == 
outputSlotDescs.size());

Review Comment:
   if use Preconditions.checkXXX, please add error message



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable
+        analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild,
+                aliasQuery.getColumnAliases().orElse(ImmutableList.of()));
+        // analyze recursive child
+        LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1);
+        CascadesContext innerRecursiveCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, recursiveChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()),
+                analyzedAnchorChild.getOutput());
+        
innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () 
-> {
+            innerRecursiveCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedRecursiveChild = (LogicalPlan) 
innerRecursiveCascadesCtx.getRewritePlan();
+        List<LogicalRecursiveCteScan> recursiveCteScanList = 
analyzedRecursiveChild
+                .collectToList(LogicalRecursiveCteScan.class::isInstance);
+        if (recursiveCteScanList.size() > 1) {
+            throw new AnalysisException(String.format("recursive reference to 
query %s must not appear more than once",
+                    aliasQuery.getAlias()));
+        }
+        List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput();
+        List<DataType> anchorChildOutputTypes = new 
ArrayList<>(anchorChildOutputs.size());
+        for (Slot slot : anchorChildOutputs) {
+            anchorChildOutputTypes.add(slot.getDataType());
+        }
+        List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput();
+        for (int i = 0; i < recursiveChildOutputs.size(); ++i) {
+            if 
(!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i)))
 {

Review Comment:
   think about two struct type, that have same field number and field type, 
only have different field name. another case is variant type, check variant 
type's equals function, it has some attributes, we should allow these 
attributes different because struct's fields' name and variant's attribute are 
may be generated by planner itself and could be changed between different 
version



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java:
##########
@@ -289,6 +290,53 @@ public Plan visitLogicalRepeat(LogicalRepeat<? extends 
Plan> repeat, Map<ExprId,
         return repeat.withGroupSetsAndOutput(repeat.getGroupingSets(), 
newOutputs).recomputeLogicalProperties();
     }
 
+    @Override
+    public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, 
Map<ExprId, Slot> replaceMap) {
+        recursiveCte = (LogicalRecursiveCte) super.visit(recursiveCte, 
replaceMap);
+        ImmutableList.Builder<List<SlotReference>> newChildrenOutputs = 
ImmutableList.builder();
+        List<Boolean> inputNullable = null;
+        if (!recursiveCte.children().isEmpty()) {

Review Comment:
   why recursiveCte's children could be empty? just copy from set operation and 
not optimize for recursiveCte?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java:
##########
@@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> 
analyzeCte(
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException(String.format("recursive cte must be 
union, don't support %s",
+                    parsedCtePlan.getClass().getSimpleName()));
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild
+                .collect(LogicalRecursiveCteScan.class::isInstance);
+        for (LogicalRecursiveCteScan cteScan : recursiveCteScans) {
+            if 
(cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) {
+                throw new AnalysisException(
+                        String.format("recursive reference to query %s must 
not appear within its non-recursive term",
+                                aliasQuery.getAlias()));
+            }
+        }
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+        // make all output nullable
+        analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild,
+                aliasQuery.getColumnAliases().orElse(ImmutableList.of()));
+        // analyze recursive child
+        LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1);
+        CascadesContext innerRecursiveCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, recursiveChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()),
+                analyzedAnchorChild.getOutput());
+        
innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () 
-> {
+            innerRecursiveCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedRecursiveChild = (LogicalPlan) 
innerRecursiveCascadesCtx.getRewritePlan();
+        List<LogicalRecursiveCteScan> recursiveCteScanList = 
analyzedRecursiveChild
+                .collectToList(LogicalRecursiveCteScan.class::isInstance);
+        if (recursiveCteScanList.size() > 1) {
+            throw new AnalysisException(String.format("recursive reference to 
query %s must not appear more than once",
+                    aliasQuery.getAlias()));
+        }
+        List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput();
+        List<DataType> anchorChildOutputTypes = new 
ArrayList<>(anchorChildOutputs.size());
+        for (Slot slot : anchorChildOutputs) {
+            anchorChildOutputTypes.add(slot.getDataType());
+        }
+        List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput();
+        for (int i = 0; i < recursiveChildOutputs.size(); ++i) {
+            if 
(!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i)))
 {
+                throw new AnalysisException(String.format("%s recursive 
child's %d column's datatype in select list %s "
+                                + "is different from anchor child's output 
datatype %s, please add cast manually "
+                                + "to get expect datatype", 
aliasQuery.getAlias(), i + 1,
+                        recursiveChildOutputs.get(i).getDataType(), 
anchorChildOutputTypes.get(i)));
+            }
+        }
+        analyzedRecursiveChild = new 
LogicalRecursiveCteRecursiveChild<>(aliasQuery.getAlias(),
+                forceOutputNullable(analyzedRecursiveChild, 
ImmutableList.of()));

Review Comment:
   why also changed recursive side's output name?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java:
##########
@@ -179,6 +182,10 @@ private void collectFromUnboundRelation(CascadesContext 
cascadesContext,
             List<String> nameParts, TableFrom tableFrom, 
Optional<UnboundRelation> unboundRelation) {
         if (nameParts.size() == 1) {
             String tableName = nameParts.get(0);
+            if (cascadesContext.getCurrentRecursiveCteName().isPresent()
+                    && 
tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().get())) 
{
+                return;
+            }

Review Comment:
   should reuse findCteContext too



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java:
##########
@@ -213,6 +214,12 @@ public Plan visitLogicalProject(LogicalProject<? extends 
Plan> project, PruneCon
         return pruneChildren(plan, new RoaringBitmap());
     }
 
+    @Override
+    public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, 
PruneContext context) {
+        // keep LogicalRecursiveCte's output unchanged
+        return skipPruneThis(recursiveCte);

Review Comment:
   i think we could safely prune it output if we do not let it impl 
OutputPrunable



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCte is basically like LogicalUnion
+ */
+public class LogicalRecursiveCte extends AbstractLogicalPlan implements 
RecursiveCte, OutputPrunable {

Review Comment:
   should not implement OutputPrunable



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java:
##########
@@ -289,6 +290,53 @@ public Plan visitLogicalRepeat(LogicalRepeat<? extends 
Plan> repeat, Map<ExprId,
         return repeat.withGroupSetsAndOutput(repeat.getGroupingSets(), 
newOutputs).recomputeLogicalProperties();
     }
 
+    @Override
+    public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, 
Map<ExprId, Slot> replaceMap) {
+        recursiveCte = (LogicalRecursiveCte) super.visit(recursiveCte, 
replaceMap);
+        ImmutableList.Builder<List<SlotReference>> newChildrenOutputs = 
ImmutableList.builder();
+        List<Boolean> inputNullable = null;
+        if (!recursiveCte.children().isEmpty()) {
+            inputNullable = 
Lists.newArrayListWithCapacity(recursiveCte.getOutputs().size());
+            for (int i = 0; i < recursiveCte.getOutputs().size(); i++) {
+                inputNullable.add(false);

Review Comment:
   these code block let me confused, in which scenario, LogicalRecursiveCte' 
children's output could be not null? or it is just defensive programming



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to