This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch dev_rec
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c94e85b20c91f38f5063d1384429937fa85cbfbe
Author: lichi <[email protected]>
AuthorDate: Sat Oct 11 16:55:51 2025 +0800

    fix some bug in fe
---
 .../org/apache/doris/analysis/DescriptorTable.java |   5 +
 .../doris/catalog/RecursiveCteTempTable.java       |   4 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  77 ++++++-
 .../processor/post/RuntimeFilterPruner.java        |   9 +
 .../properties/ChildOutputPropertyDeriver.java     |   2 +-
 .../doris/nereids/rules/analysis/AnalyzeCTE.java   |   9 +-
 .../LogicalRecursiveCteToPhysicalRecursiveCte.java |   3 +-
 .../nereids/rules/rewrite/AdjustNullable.java      |  63 ++++--
 .../doris/nereids/rules/rewrite/ColumnPruning.java |  22 +-
 .../doris/nereids/stats/StatsCalculator.java       |  70 ++++++-
 .../trees/copier/LogicalPlanDeepCopier.java        |   8 +-
 .../trees/plans/algebra/RecursiveCte.java}         |  20 +-
 .../commands/CreateMaterializedViewCommand.java    |   2 +-
 .../distribute/worker/job/AssignedJobBuilder.java  |  93 ---------
 .../worker/job/UnassignedRecursiveCteScanJob.java  |   5 +-
 .../trees/plans/logical/LogicalRecursiveCte.java   | 223 ++++++++++++---------
 .../trees/plans/physical/PhysicalRecursiveCte.java | 113 +++++++----
 .../nereids/trees/plans/visitor/PlanVisitor.java   |   2 +-
 .../apache/doris/planner/RecursiveCteScanNode.java |  18 +-
 .../doris/qe/runtime/ThriftPlansBuilder.java       |  96 +++++++++
 20 files changed, 547 insertions(+), 297 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index fb6cc7df0a8..197293ac69a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -20,6 +20,7 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.catalog.RecursiveCteTempTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.IdGenerator;
@@ -212,6 +213,10 @@ public class DescriptorTable {
         }
 
         for (TableIf tbl : referencedTbls.values()) {
+            if (tbl instanceof RecursiveCteTempTable) {
+                // skip recursive cte temp table
+                continue;
+            }
             result.addToTableDescriptors(tbl.toThrift());
         }
         thriftDescTable = result;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java
index 9f36b04dfc4..ee924466fbb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java
@@ -17,10 +17,12 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.common.SystemIdGenerator;
+
 import java.util.List;
 
 public class RecursiveCteTempTable extends Table {
     public RecursiveCteTempTable(String tableName, List<Column> fullSchema) {
-        super(-1, tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema);
+        super(SystemIdGenerator.getNextId(), tableName, 
TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 21f9bbf2126..a27268a5af7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -116,7 +116,6 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PreAggStatus;
 import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
 import org.apache.doris.nereids.trees.plans.algebra.Relation;
-import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
@@ -2269,6 +2268,79 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return inputFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, PlanTranslatorContext context) {
+        List<PlanFragment> childrenFragments = new ArrayList<>();
+        for (Plan plan : recursiveCte.children()) {
+            childrenFragments.add(plan.accept(this, context));
+        }
+
+        TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), 
null, context);
+        List<SlotDescriptor> outputSlotDescs = new 
ArrayList<>(setTuple.getSlots());
+
+        RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
+                    recursiveCte.isUnionAll());
+        List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
+        recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
+        recursiveCteNode.setNereidsId(recursiveCte.getId());
+        List<List<Expression>> resultExpressionLists = Lists.newArrayList();
+        context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), 
recursiveCteNode.getId());
+        for (List<SlotReference> regularChildrenOutput : 
recursiveCte.getRegularChildrenOutputs()) {
+            resultExpressionLists.add(new ArrayList<>(regularChildrenOutput));
+        }
+
+        for (PlanFragment childFragment : childrenFragments) {
+            recursiveCteNode.addChild(childFragment.getPlanRoot());
+        }
+
+        List<List<Expr>> materializedResultExprLists = Lists.newArrayList();
+        for (int i = 0; i < resultExpressionLists.size(); ++i) {
+            List<Expression> resultExpressionList = 
resultExpressionLists.get(i);
+            List<Expr> exprList = Lists.newArrayList();
+            Preconditions.checkState(resultExpressionList.size() == 
outputSlotDescs.size());
+            for (int j = 0; j < resultExpressionList.size(); ++j) {
+                if (outputSlotDescs.get(j).isMaterialized()) {
+                    
exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), 
context));
+                    // TODO: reconsider this, we may change nullable info in 
previous nereids rules not here.
+                    outputSlotDescs.get(j)
+                            
.setIsNullable(outputSlotDescs.get(j).getIsNullable() || 
exprList.get(j).isNullable());
+                }
+            }
+            materializedResultExprLists.add(exprList);
+        }
+        
recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists);
+        
Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size()
+                == recursiveCteNode.getChildren().size());
+
+        PlanFragment recursiveCteFragment;
+        if (childrenFragments.isEmpty()) {
+            recursiveCteFragment = createPlanFragment(recursiveCteNode,
+                    DataPartition.UNPARTITIONED, recursiveCte);
+            context.addPlanFragment(recursiveCteFragment);
+        } else {
+            int childrenSize = childrenFragments.size();
+            recursiveCteFragment = childrenFragments.get(childrenSize - 1);
+            for (int i = childrenSize - 2; i >= 0; i--) {
+                context.mergePlanFragment(childrenFragments.get(i), 
recursiveCteFragment);
+                for (PlanFragment child : 
childrenFragments.get(i).getChildren()) {
+                    recursiveCteFragment.addChild(child);
+                }
+            }
+            setPlanRoot(recursiveCteFragment, recursiveCteNode, recursiveCte);
+        }
+
+        // in pipeline engine, we use parallel scan by default, but it broke 
the rule of data distribution
+        // we need turn of parallel scan to ensure to get correct result.
+        // TODO: nereids forbid all parallel scan under PhysicalSetOperation 
temporary
+        if 
(!recursiveCte.getPhysicalProperties().equals(PhysicalProperties.ANY)
+                && 
findOlapScanNodesByPassExchangeAndJoinNode(recursiveCteFragment.getPlanRoot())) 
{
+            recursiveCteFragment.setHasColocatePlanNode(true);
+            recursiveCteNode.setColocate(true);
+        }
+
+        return recursiveCteFragment;
+    }
+
     /**
      * Returns a new fragment with a UnionNode as its root. The data partition 
of the
      * returned fragment and how the data of the child fragments is consumed 
depends on the
@@ -2300,9 +2372,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             setOperationNode = new ExceptNode(context.nextPlanNodeId(), 
setTuple.getId());
         } else if (setOperation instanceof PhysicalIntersect) {
             setOperationNode = new IntersectNode(context.nextPlanNodeId(), 
setTuple.getId());
-        } else if (setOperation instanceof PhysicalRecursiveCte) {
-            setOperationNode = new RecursiveCteNode(context.nextPlanNodeId(), 
setTuple.getId(),
-                    
setOperation.getQualifier().equals(SetOperation.Qualifier.ALL));
         } else {
             throw new RuntimeException("not support set operation type " + 
setOperation);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
index 51a9003b78f..d7e4e5b67cb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
@@ -33,6 +33,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
@@ -76,6 +77,14 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
         return plan;
     }
 
+    @Override
+    public PhysicalRecursiveCte visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, CascadesContext context) {
+        for (Plan child : recursiveCte.children()) {
+            child.accept(this, context);
+        }
+        return recursiveCte;
+    }
+
     @Override
     public PhysicalSetOperation visitPhysicalSetOperation(PhysicalSetOperation 
setOperation, CascadesContext context) {
         for (Plan child : setOperation.children()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index d3592388dae..0751f14b715 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -151,7 +151,7 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
 
     @Override
     public PhysicalProperties 
visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan cteScan, PlanContext 
context) {
-        return PhysicalProperties.GATHER;
+        return PhysicalProperties.MUST_SHUFFLE;
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
index 8ac4234e846..a6259f0b24c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
@@ -159,9 +160,10 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory {
         LogicalPlan analyzedRecursiveChild = (LogicalPlan) 
innerRecursiveCascadesCtx.getRewritePlan();
         LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan;
 
-        // manually bind LogicalRecursiveCte, see bindSetOperation in 
BindExpression.java
+        // create LogicalRecursiveCte
         LogicalRecursiveCte analyzedCtePlan = new LogicalRecursiveCte(
-                logicalUnion.getQualifier(), 
ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild));
+                logicalUnion.getQualifier() == SetOperation.Qualifier.ALL,
+                ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild));
         List<List<NamedExpression>> childrenProjections = 
analyzedCtePlan.collectChildrenProjections();
         int childrenProjectionSize = childrenProjections.size();
         ImmutableList.Builder<List<SlotReference>> childrenOutputs = 
ImmutableList
@@ -180,8 +182,7 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory {
             newChildren.add(newChild);
             childrenOutputs.add((List<SlotReference>) (List) 
newChild.getOutput());
         }
-        analyzedCtePlan = (LogicalRecursiveCte) 
analyzedCtePlan.withChildrenAndTheirOutputs(newChildren.build(),
-                childrenOutputs.build());
+        analyzedCtePlan = 
analyzedCtePlan.withChildrenAndTheirOutputs(newChildren.build(), 
childrenOutputs.build());
         List<NamedExpression> newOutputs = analyzedCtePlan.buildNewOutputs();
         analyzedCtePlan = analyzedCtePlan.withNewOutputs(newOutputs);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java
index c8960aa9e51..467092895a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java
@@ -28,10 +28,9 @@ public class LogicalRecursiveCteToPhysicalRecursiveCte 
extends OneImplementation
     @Override
     public Rule build() {
         return logicalRecursiveCte().then(recursiveCte ->
-            new PhysicalRecursiveCte(recursiveCte.getQualifier(),
+            new PhysicalRecursiveCte(recursiveCte.isUnionAll(),
                     recursiveCte.getOutputs(),
                     recursiveCte.getRegularChildrenOutputs(),
-                    recursiveCte.getConstantExprsList(),
                     recursiveCte.getLogicalProperties(),
                     recursiveCte.children())
         ).toRule(RuleType.LOGICAL_RECURSIVE_CTE_TO_PHYSICAL_RECURSIVE_CTE);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
index 63d63f32b6f..204a018fbc7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
@@ -290,6 +290,53 @@ public class AdjustNullable extends 
DefaultPlanRewriter<Map<ExprId, Slot>> imple
         return repeat.withGroupSetsAndOutput(repeat.getGroupingSets(), 
newOutputs).recomputeLogicalProperties();
     }
 
+    @Override
+    public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, 
Map<ExprId, Slot> replaceMap) {
+        recursiveCte = (LogicalRecursiveCte) super.visit(recursiveCte, 
replaceMap);
+        ImmutableList.Builder<List<SlotReference>> newChildrenOutputs = 
ImmutableList.builder();
+        List<Boolean> inputNullable = null;
+        if (!recursiveCte.children().isEmpty()) {
+            inputNullable = 
Lists.newArrayListWithCapacity(recursiveCte.getOutputs().size());
+            for (int i = 0; i < recursiveCte.getOutputs().size(); i++) {
+                inputNullable.add(false);
+            }
+            for (int i = 0; i < recursiveCte.arity(); i++) {
+                List<Slot> childOutput = recursiveCte.child(i).getOutput();
+                List<SlotReference> setChildOutput = 
recursiveCte.getRegularChildOutput(i);
+                ImmutableList.Builder<SlotReference> newChildOutputs = 
ImmutableList.builder();
+                for (int j = 0; j < setChildOutput.size(); j++) {
+                    for (Slot slot : childOutput) {
+                        if 
(slot.getExprId().equals(setChildOutput.get(j).getExprId())) {
+                            inputNullable.set(j, slot.nullable() || 
inputNullable.get(j));
+                            newChildOutputs.add((SlotReference) slot);
+                            break;
+                        }
+                    }
+                }
+                newChildrenOutputs.add(newChildOutputs.build());
+            }
+        }
+        if (inputNullable == null) {
+            // this is a fail-safe
+            // means there is no children and having no getConstantExprsList
+            // no way to update the nullable flag, so just do nothing
+            return recursiveCte;
+        }
+        List<NamedExpression> outputs = recursiveCte.getOutputs();
+        List<NamedExpression> newOutputs = 
Lists.newArrayListWithCapacity(outputs.size());
+        for (int i = 0; i < inputNullable.size(); i++) {
+            NamedExpression ne = outputs.get(i);
+            Slot slot = ne instanceof Alias ? (Slot) ((Alias) ne).child() : 
(Slot) ne;
+            slot = slot.withNullable(inputNullable.get(i));
+            NamedExpression newOutput = ne instanceof Alias ? 
(NamedExpression) ne.withChildren(slot) : slot;
+            newOutputs.add(newOutput);
+            replaceMap.put(newOutput.getExprId(), newOutput.toSlot());
+        }
+        return recursiveCte.withNewOutputs(newOutputs)
+                .withChildrenAndTheirOutputs(recursiveCte.children(), 
newChildrenOutputs.build())
+                .recomputeLogicalProperties();
+    }
+
     @Override
     public Plan visitLogicalSetOperation(LogicalSetOperation setOperation, 
Map<ExprId, Slot> replaceMap) {
         setOperation = (LogicalSetOperation) super.visit(setOperation, 
replaceMap);
@@ -331,22 +378,6 @@ public class AdjustNullable extends 
DefaultPlanRewriter<Map<ExprId, Slot>> imple
                     inputNullable.set(j, inputNullable.get(j) || 
constantExprs.get(j).nullable());
                 }
             }
-        } else if (setOperation instanceof LogicalRecursiveCte) {
-            // LogicalRecursiveCte is basically like LogicalUnion, so just do 
same as LogicalUnion
-            LogicalRecursiveCte logicalRecursiveCte = (LogicalRecursiveCte) 
setOperation;
-            if (!logicalRecursiveCte.getConstantExprsList().isEmpty() && 
setOperation.children().isEmpty()) {
-                int outputSize = 
logicalRecursiveCte.getConstantExprsList().get(0).size();
-                // create the inputNullable list and fill it with all FALSE 
values
-                inputNullable = Lists.newArrayListWithCapacity(outputSize);
-                for (int i = 0; i < outputSize; i++) {
-                    inputNullable.add(false);
-                }
-            }
-            for (List<NamedExpression> constantExprs : 
logicalRecursiveCte.getConstantExprsList()) {
-                for (int j = 0; j < constantExprs.size(); j++) {
-                    inputNullable.set(j, inputNullable.get(j) || 
constantExprs.get(j).nullable());
-                }
-            }
         }
         if (inputNullable == null) {
             // this is a fail-safe
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java
index 96daceedfe4..30a00314fbf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java
@@ -217,7 +217,7 @@ public class ColumnPruning extends 
DefaultPlanRewriter<PruneContext> implements
     @Override
     public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, 
PruneContext context) {
         // LogicalRecursiveCte is basically like LogicalUnion, so just do same 
as LogicalUnion
-        if (recursiveCte.getQualifier() == Qualifier.DISTINCT) {
+        if (!recursiveCte.isUnionAll()) {
             return skipPruneThisAndFirstLevelChildren(recursiveCte);
         }
         LogicalRecursiveCte prunedOutputRecursiveCte = 
pruneRecursiveCteOutput(recursiveCte, context);
@@ -451,7 +451,6 @@ public class ColumnPruning extends 
DefaultPlanRewriter<PruneContext> implements
             return recursiveCte;
         }
         List<NamedExpression> prunedOutputs = Lists.newArrayList();
-        List<List<NamedExpression>> constantExprsList = 
recursiveCte.getConstantExprsList();
         List<List<SlotReference>> regularChildrenOutputs = 
recursiveCte.getRegularChildrenOutputs();
         List<Plan> children = recursiveCte.children();
         List<Integer> extractColumnIndex = Lists.newArrayList();
@@ -463,8 +462,6 @@ public class ColumnPruning extends 
DefaultPlanRewriter<PruneContext> implements
             }
         }
 
-        ImmutableList.Builder<List<NamedExpression>> prunedConstantExprsList
-                = 
ImmutableList.builderWithExpectedSize(constantExprsList.size());
         if (prunedOutputs.isEmpty()) {
             // process prune all columns
             NamedExpression originSlot = originOutput.get(0);
@@ -472,7 +469,7 @@ public class ColumnPruning extends 
DefaultPlanRewriter<PruneContext> implements
                     TinyIntType.INSTANCE, false, originSlot.getQualifier()));
             regularChildrenOutputs = 
Lists.newArrayListWithCapacity(regularChildrenOutputs.size());
             children = Lists.newArrayListWithCapacity(children.size());
-            for (int i = 0; i < recursiveCte.getArity(); i++) {
+            for (int i = 0; i < recursiveCte.children().size(); i++) {
                 Plan child = recursiveCte.child(i);
                 List<NamedExpression> newProjectOutput = ImmutableList.of(new 
Alias(new TinyIntLiteral((byte) 1)));
                 LogicalProject<?> project;
@@ -487,25 +484,12 @@ public class ColumnPruning extends 
DefaultPlanRewriter<PruneContext> implements
                 regularChildrenOutputs.add((List) project.getOutput());
                 children.add(project);
             }
-            for (int i = 0; i < constantExprsList.size(); i++) {
-                prunedConstantExprsList.add(ImmutableList.of(new Alias(new 
TinyIntLiteral((byte) 1))));
-            }
-        } else {
-            int len = extractColumnIndex.size();
-            for (List<NamedExpression> row : constantExprsList) {
-                ImmutableList.Builder<NamedExpression> newRow = 
ImmutableList.builderWithExpectedSize(len);
-                for (int idx : extractColumnIndex) {
-                    newRow.add(row.get(idx));
-                }
-                prunedConstantExprsList.add(newRow.build());
-            }
         }
 
         if (prunedOutputs.equals(originOutput) && 
!context.requiredSlotsIds.isEmpty()) {
             return recursiveCte;
         } else {
-            return 
recursiveCte.withNewOutputsChildrenAndConstExprsList(prunedOutputs, children,
-                    regularChildrenOutputs, prunedConstantExprsList.build());
+            return recursiveCte.withNewOutputsAndChildren(prunedOutputs, 
children, regularChildrenOutputs);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index a14c2dfc9cb..9ddcc7e2e40 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -60,6 +60,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Limit;
 import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
 import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN;
 import org.apache.doris.nereids.trees.plans.algebra.Project;
+import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte;
 import org.apache.doris.nereids.trees.plans.algebra.Relation;
 import org.apache.doris.nereids.trees.plans.algebra.Repeat;
 import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
@@ -925,7 +926,7 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
     @Override
     public Statistics visitLogicalRecursiveCte(
             LogicalRecursiveCte recursiveCte, Void context) {
-        return computeUnion(recursiveCte,
+        return computeRecursiveCte(recursiveCte,
                 groupExpression.children()
                         
.stream().map(Group::getStatistics).collect(Collectors.toList()));
     }
@@ -1108,7 +1109,7 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
 
     @Override
     public Statistics visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, Void context) {
-        return computeUnion(recursiveCte, groupExpression.children()
+        return computeRecursiveCte(recursiveCte, groupExpression.children()
                 
.stream().map(Group::getStatistics).collect(Collectors.toList()));
     }
 
@@ -1493,6 +1494,71 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return new Statistics(rowCount, 1, columnStatsMap);
     }
 
+    /**
+     * computeRecursiveCte
+     */
+    public Statistics computeRecursiveCte(RecursiveCte recursiveCte, 
List<Statistics> childStats) {
+        // TODO: refactor this for one row relation
+        List<SlotReference> head;
+        Statistics headStats;
+        List<List<SlotReference>> childOutputs = 
Lists.newArrayList(recursiveCte.getRegularChildrenOutputs());
+
+        head = childOutputs.get(0);
+        headStats = new StatisticsBuilder(childStats.get(0)).build();
+
+        StatisticsBuilder statisticsBuilder = new StatisticsBuilder();
+        List<NamedExpression> unionOutput = recursiveCte.getOutputs();
+        double unionRowCount = 
childStats.stream().mapToDouble(Statistics::getRowCount).sum();
+        statisticsBuilder.setRowCount(unionRowCount);
+
+        for (int i = 0; i < head.size(); i++) {
+            Slot headSlot = head.get(i);
+            ColumnStatisticBuilder colStatsBuilder = new 
ColumnStatisticBuilder(
+                    headStats.findColumnStatistics(headSlot));
+            for (int j = 1; j < childOutputs.size(); j++) {
+                Slot slot = childOutputs.get(j).get(i);
+                ColumnStatistic rightStatistic = 
childStats.get(j).findColumnStatistics(slot);
+                double rightRowCount = childStats.get(j).getRowCount();
+                colStatsBuilder = unionColumn(colStatsBuilder,
+                        headStats.getRowCount(), rightStatistic, 
rightRowCount, headSlot.getDataType());
+            }
+
+            //update hot values
+            Map<Literal, Float> unionHotValues = new HashMap<>();
+            for (int j = 0; j < childOutputs.size(); j++) {
+                Slot slot = childOutputs.get(j).get(i);
+                ColumnStatistic slotStats = 
childStats.get(j).findColumnStatistics(slot);
+                if (slotStats.getHotValues() != null) {
+                    for (Map.Entry<Literal, Float> entry : 
slotStats.getHotValues().entrySet()) {
+                        Float value = unionHotValues.get(entry.getKey());
+                        if (value == null) {
+                            unionHotValues.put(entry.getKey(),
+                                    (float) (entry.getValue() * 
childStats.get(j).getRowCount()));
+                        } else {
+                            unionHotValues.put(entry.getKey(),
+                                    (float) (value + entry.getValue() * 
childStats.get(j).getRowCount()));
+                        }
+                    }
+                }
+            }
+
+            Map<Literal, Float> resultHotValues = new LinkedHashMap<>();
+            for (Literal hot : unionHotValues.keySet()) {
+                float ratio = (float) (unionHotValues.get(hot) / 
unionRowCount);
+                if (ratio * colStatsBuilder.getNdv() >= 
SessionVariable.getSkewValueThreshold()
+                        || ratio >= SessionVariable.getHotValueThreshold()) {
+                    resultHotValues.put(hot, ratio);
+                }
+            }
+            if (!resultHotValues.isEmpty()) {
+                colStatsBuilder.setHotValues(resultHotValues);
+            }
+            statisticsBuilder.putColumnStatistics(unionOutput.get(i), 
colStatsBuilder.build());
+        }
+
+        return statisticsBuilder.setWidthInJoinCluster(1).build();
+    }
+
     /**
      * computeUnion
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
index c9cf877f709..739047f4a77 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
@@ -360,11 +360,6 @@ public class LogicalPlanDeepCopier extends 
DefaultPlanRewriter<DeepCopierContext
         List<Plan> children = recursiveCte.children().stream()
                 .map(c -> c.accept(this, context))
                 .collect(ImmutableList.toImmutableList());
-        List<List<NamedExpression>> constantExprsList = 
recursiveCte.getConstantExprsList().stream()
-                .map(l -> l.stream()
-                        .map(e -> (NamedExpression) 
ExpressionDeepCopier.INSTANCE.deepCopy(e, context))
-                        .collect(ImmutableList.toImmutableList()))
-                .collect(ImmutableList.toImmutableList());
         List<NamedExpression> outputs = recursiveCte.getOutputs().stream()
                 .map(o -> (NamedExpression) 
ExpressionDeepCopier.INSTANCE.deepCopy(o, context))
                 .collect(ImmutableList.toImmutableList());
@@ -373,8 +368,7 @@ public class LogicalPlanDeepCopier extends 
DefaultPlanRewriter<DeepCopierContext
                         .map(o -> (SlotReference) 
ExpressionDeepCopier.INSTANCE.deepCopy(o, context))
                         .collect(ImmutableList.toImmutableList()))
                 .collect(ImmutableList.toImmutableList());
-        return new LogicalRecursiveCte(recursiveCte.getQualifier(), outputs, 
childrenOutputs,
-                constantExprsList, recursiveCte.hasPushedFilter(), children);
+        return new LogicalRecursiveCte(recursiveCte.isUnionAll(), outputs, 
childrenOutputs, children);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/RecursiveCte.java
similarity index 64%
copy from 
fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/RecursiveCte.java
index 9f36b04dfc4..d7fd8f985a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/RecursiveCte.java
@@ -15,12 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.catalog;
+package org.apache.doris.nereids.trees.plans.algebra;
+
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 
 import java.util.List;
 
-public class RecursiveCteTempTable extends Table {
-    public RecursiveCteTempTable(String tableName, List<Column> fullSchema) {
-        super(-1, tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema);
-    }
+/**
+ * Common interface for logical/physical recursive cte.
+ */
+public interface RecursiveCte {
+    boolean isUnionAll();
+
+    List<SlotReference> getRegularChildOutput(int i);
+
+    List<NamedExpression> getOutputs();
+
+    List<List<SlotReference>> getRegularChildrenOutputs();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
index e8387d675dc..a7f99e101ae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
@@ -326,7 +326,7 @@ public class CreateMaterializedViewCommand extends Command 
implements ForwardWit
                 }
                 try {
                     Expr defineExpr = translateToLegacyExpr(predicate, 
context.planTranslatorContext);
-                    context.filterItem = new MVColumnItem(predicate.toSql(), 
defineExpr);
+                    context.filterItem = new 
MVColumnItem(defineExpr.toSqlWithoutTbl(), defineExpr);
                 } catch (Exception ex) {
                     throw new AnalysisException(ex.getMessage());
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java
index 4291ba32732..9369acd8d6f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java
@@ -17,33 +17,19 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker.job;
 
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.BackendDistributedPlanWorkerManager;
-import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
 import org.apache.doris.planner.ExchangeNode;
-import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.planner.RecursiveCteNode;
-import org.apache.doris.planner.RecursiveCteScanNode;
 import org.apache.doris.thrift.TExplainLevel;
-import org.apache.doris.thrift.TExpr;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TRecCTENode;
-import org.apache.doris.thrift.TRecCTEResetInfo;
-import org.apache.doris.thrift.TRecCTETarget;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
 
 /** AssignedJobBuilder */
 public class AssignedJobBuilder {
@@ -53,8 +39,6 @@ public class AssignedJobBuilder {
             boolean isLoadJob) {
         DistributeContext distributeContext = new 
DistributeContext(workerManager, isLoadJob);
         ListMultimap<PlanFragmentId, AssignedJob> allAssignedJobs = 
ArrayListMultimap.create();
-        Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new 
TreeMap<>();
-        Map<PlanFragmentId, Set<TNetworkAddress>> 
fragmentIdToNetworkAddressMap = new TreeMap<>();
         for (Entry<PlanFragmentId, UnassignedJob> kv : 
unassignedJobs.entrySet()) {
             PlanFragmentId fragmentId = kv.getKey();
             UnassignedJob unassignedJob = kv.getValue();
@@ -71,83 +55,6 @@ public class AssignedJobBuilder {
                         + ", fragment: " + 
unassignedJob.getFragment().getExplainString(TExplainLevel.VERBOSE));
             }
             allAssignedJobs.putAll(fragmentId, fragmentAssignedJobs);
-
-            Set<TNetworkAddress> networkAddresses = new TreeSet<>();
-            for (AssignedJob assignedJob : fragmentAssignedJobs) {
-                DistributedPlanWorker distributedPlanWorker = 
assignedJob.getAssignedWorker();
-                networkAddresses.add(new 
TNetworkAddress(distributedPlanWorker.host(), distributedPlanWorker.port()));
-            }
-            fragmentIdToNetworkAddressMap.put(fragmentId, networkAddresses);
-
-            PlanFragment planFragment = unassignedJob.getFragment();
-            List<RecursiveCteScanNode> recursiveCteScanNodes = 
planFragment.getPlanRoot()
-                    
.collectInCurrentFragment(RecursiveCteScanNode.class::isInstance);
-            if (!recursiveCteScanNodes.isEmpty()) {
-                if (recursiveCteScanNodes.size() != 1) {
-                    throw new IllegalStateException(
-                            String.format("one fragment can only have 1 
recursive cte scan node, but there is %d",
-                                    recursiveCteScanNodes.size()));
-                }
-                if (fragmentAssignedJobs.size() != 1) {
-                    throw new IllegalStateException(String.format(
-                            "fragmentAssignedJobs's size must be 1 for 
recursive cte scan node, but it is %d",
-                            fragmentAssignedJobs.size()));
-                }
-                TRecCTETarget tRecCTETarget = new TRecCTETarget();
-                DistributedPlanWorker distributedPlanWorker = 
fragmentAssignedJobs.get(0).getAssignedWorker();
-                tRecCTETarget.setAddr(new 
TNetworkAddress(distributedPlanWorker.host(), distributedPlanWorker.port()));
-                
tRecCTETarget.setFragmentInstanceId(fragmentAssignedJobs.get(0).instanceId());
-                
tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt());
-                fragmentIdToRecCteTargetMap.put(fragmentId, tRecCTETarget);
-            }
-
-            List<RecursiveCteNode> recursiveCteNodes = 
planFragment.getPlanRoot()
-                    
.collectInCurrentFragment(RecursiveCteNode.class::isInstance);
-            if (!recursiveCteNodes.isEmpty()) {
-                if (recursiveCteNodes.size() != 1) {
-                    throw new IllegalStateException(
-                            String.format("one fragment can only have 1 
recursive cte node, but there is %d",
-                                    recursiveCteNodes.size()));
-                }
-
-                List<TRecCTETarget> targets = new ArrayList<>();
-                List<TRecCTEResetInfo> fragmentsToReset = new ArrayList<>();
-                // PhysicalPlanTranslator will swap recursiveCteNodes's child 
fragment,
-                // so we get recursive one by 1st child
-                List<PlanFragment> childFragments = new ArrayList<>();
-                
planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, 
childFragments);
-                for (PlanFragment child : childFragments) {
-                    PlanFragmentId childFragmentId = child.getFragmentId();
-                    TRecCTETarget tRecCTETarget = 
fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null);
-                    if (tRecCTETarget != null) {
-                        targets.add(tRecCTETarget);
-                    }
-                    Set<TNetworkAddress> tNetworkAddresses = 
fragmentIdToNetworkAddressMap.get(childFragmentId);
-                    if (tNetworkAddresses == null) {
-                        throw new IllegalStateException(
-                                String.format("can't find TNetworkAddress for 
fragment %d", childFragmentId));
-                    }
-                    for (TNetworkAddress address : tNetworkAddresses) {
-                        TRecCTEResetInfo tRecCTEResetInfo = new 
TRecCTEResetInfo();
-                        
tRecCTEResetInfo.setFragmentId(childFragmentId.asInt());
-                        tRecCTEResetInfo.setAddr(address);
-                        fragmentsToReset.add(tRecCTEResetInfo);
-                    }
-                }
-
-                RecursiveCteNode recursiveCteNode = recursiveCteNodes.get(0);
-                List<List<Expr>> materializedResultExprLists = 
recursiveCteNode.getMaterializedResultExprLists();
-                List<List<TExpr>> texprLists = new 
ArrayList<>(materializedResultExprLists.size());
-                for (List<Expr> exprList : materializedResultExprLists) {
-                    texprLists.add(Expr.treesToThrift(exprList));
-                }
-                TRecCTENode tRecCTENode = new TRecCTENode();
-                tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll());
-                tRecCTENode.setTargets(targets);
-                tRecCTENode.setFragmentsToReset(fragmentsToReset);
-                tRecCTENode.setResultExprLists(texprLists);
-                recursiveCteNode.settRecCTENode(tRecCTENode);
-            }
         }
         return allAssignedJobs;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java
index 7fbb3f88e21..cfd3ebe7b2b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java
@@ -26,6 +26,7 @@ import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.ScanNode;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ListMultimap;
 
@@ -57,6 +58,8 @@ public class UnassignedRecursiveCteScanJob extends 
AbstractUnassignedScanJob {
     @Override
     protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> 
assignedJobs,
             DistributedPlanWorkerManager workerManager, 
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
-        return fillUpSingleEmptyInstance(workerManager);
+        Preconditions.checkArgument(!assignedJobs.isEmpty(),
+                "assignedJobs is empty for UnassignedRecursiveCteScanJob");
+        return assignedJobs;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java
index cfdf317c620..3958d5b4df1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java
@@ -19,19 +19,24 @@ package org.apache.doris.nereids.trees.plans.logical;
 
 import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.DataTrait;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
-import org.apache.doris.nereids.trees.plans.algebra.Union;
+import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.util.ExpressionUtils;
@@ -56,68 +61,126 @@ import java.util.Set;
 /**
  * LogicalRecursiveCte is basically like LogicalUnion
  */
-public class LogicalRecursiveCte extends LogicalSetOperation implements Union, 
OutputPrunable {
-
-    // in doris, we use union node to present one row relation
-    private final List<List<NamedExpression>> constantExprsList;
-    // When there is an agg on the union and there is a filter on the agg,
-    // it is necessary to keep the filter on the agg and push the filter down 
to each child of the union.
-    private final boolean hasPushedFilter;
+public class LogicalRecursiveCte extends AbstractLogicalPlan implements 
RecursiveCte, OutputPrunable {
+    protected final List<NamedExpression> outputs;
+    protected final List<List<SlotReference>> regularChildrenOutputs;
+    private final boolean isUnionAll;
 
     /** LogicalRecursiveCte */
-    public LogicalRecursiveCte(Qualifier qualifier, List<Plan> children) {
-        this(qualifier, ImmutableList.of(), children);
+    public LogicalRecursiveCte(boolean isUnionAll, List<Plan> children) {
+        this(isUnionAll, ImmutableList.of(), ImmutableList.of(), children);
     }
 
     /** LogicalRecursiveCte */
-    public LogicalRecursiveCte(Qualifier qualifier, 
List<List<NamedExpression>> constantExprsList,
-            List<Plan> children) {
-        this(qualifier, ImmutableList.of(), ImmutableList.of(), 
constantExprsList, false, children);
-    }
-
-    /** LogicalRecursiveCte */
-    public LogicalRecursiveCte(Qualifier qualifier, List<NamedExpression> 
outputs,
-            List<List<SlotReference>> childrenOutputs,
-            List<List<NamedExpression>> constantExprsList, boolean 
hasPushedFilter, List<Plan> children) {
-        this(qualifier, outputs, childrenOutputs, constantExprsList, 
hasPushedFilter, Optional.empty(),
+    public LogicalRecursiveCte(boolean isUnionAll, List<NamedExpression> 
outputs,
+            List<List<SlotReference>> childrenOutputs, List<Plan> children) {
+        this(isUnionAll, outputs, childrenOutputs, Optional.empty(),
                 Optional.empty(),
                 children);
     }
 
     /** LogicalRecursiveCte */
-    public LogicalRecursiveCte(Qualifier qualifier, List<NamedExpression> 
outputs,
+    public LogicalRecursiveCte(boolean isUnionAll, List<NamedExpression> 
outputs,
             List<List<SlotReference>> childrenOutputs,
-            List<List<NamedExpression>> constantExprsList, boolean 
hasPushedFilter,
             Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
             List<Plan> children) {
-        super(PlanType.LOGICAL_RECURSIVE_CTE, qualifier, outputs, 
childrenOutputs,
-                groupExpression, logicalProperties, children);
-        this.hasPushedFilter = hasPushedFilter;
-        this.constantExprsList = Utils.fastToImmutableList(
-                Objects.requireNonNull(constantExprsList, "constantExprsList 
should not be null"));
+        super(PlanType.LOGICAL_RECURSIVE_CTE, groupExpression, 
logicalProperties, children);
+        this.isUnionAll = isUnionAll;
+        this.outputs = ImmutableList.copyOf(outputs);
+        this.regularChildrenOutputs = ImmutableList.copyOf(childrenOutputs);
     }
 
-    public boolean hasPushedFilter() {
-        return hasPushedFilter;
+    @Override
+    public boolean isUnionAll() {
+        return isUnionAll;
     }
 
-    public List<List<NamedExpression>> getConstantExprsList() {
-        return constantExprsList;
+    @Override
+    public List<SlotReference> getRegularChildOutput(int i) {
+        return regularChildrenOutputs.get(i);
     }
 
     @Override
-    public List<? extends Expression> getExpressions() {
-        return 
constantExprsList.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList());
+    public List<List<SlotReference>> getRegularChildrenOutputs() {
+        return regularChildrenOutputs;
+    }
+
+    public List<List<NamedExpression>> collectChildrenProjections() {
+        return castCommonDataTypeOutputs();
+    }
+
+    private List<List<NamedExpression>> castCommonDataTypeOutputs() {
+        int childOutputSize = child(0).getOutput().size();
+        ImmutableList.Builder<NamedExpression> newLeftOutputs = 
ImmutableList.builderWithExpectedSize(
+                childOutputSize);
+        ImmutableList.Builder<NamedExpression> newRightOutputs = 
ImmutableList.builderWithExpectedSize(
+                childOutputSize
+        );
+        // Ensure that the output types of the left and right children are 
consistent and expand upward.
+        for (int i = 0; i < childOutputSize; ++i) {
+            Slot left = child(0).getOutput().get(i);
+            Slot right = child(1).getOutput().get(i);
+            DataType compatibleType;
+            try {
+                compatibleType = 
LogicalSetOperation.getAssignmentCompatibleType(left.getDataType(),
+                        right.getDataType());
+            } catch (Exception e) {
+                throw new AnalysisException(
+                        "Can not find compatible type for " + left + " and " + 
right + ", " + e.getMessage());
+            }
+            Expression newLeft = 
TypeCoercionUtils.castIfNotSameTypeStrict(left, compatibleType);
+            Expression newRight = 
TypeCoercionUtils.castIfNotSameTypeStrict(right, compatibleType);
+            if (newLeft instanceof Cast) {
+                newLeft = new Alias(newLeft, left.getName());
+            }
+            if (newRight instanceof Cast) {
+                newRight = new Alias(newRight, right.getName());
+            }
+            newLeftOutputs.add((NamedExpression) newLeft);
+            newRightOutputs.add((NamedExpression) newRight);
+        }
+
+        return ImmutableList.of(newLeftOutputs.build(), 
newRightOutputs.build());
+    }
+
+    /**
+     * Generate new output for Recursive Cte.
+     */
+    public List<NamedExpression> buildNewOutputs() {
+        List<Slot> slots = resetNullableForLeftOutputs();
+        ImmutableList.Builder<NamedExpression> newOutputs = 
ImmutableList.builderWithExpectedSize(slots.size());
+
+        for (int i = 0; i < slots.size(); i++) {
+            Slot slot = slots.get(i);
+            ExprId exprId = i < outputs.size() ? outputs.get(i).getExprId() : 
StatementScopeIdGenerator.newExprId();
+            newOutputs.add(
+                    new SlotReference(exprId, slot.toSql(), 
slot.getDataType(), slot.nullable(), ImmutableList.of())
+            );
+        }
+        return newOutputs.build();
+    }
+
+    // If the right child is nullable, need to ensure that the left child is 
also nullable
+    private List<Slot> resetNullableForLeftOutputs() {
+        int rightChildOutputSize = child(1).getOutput().size();
+        ImmutableList.Builder<Slot> resetNullableForLeftOutputs
+                = ImmutableList.builderWithExpectedSize(rightChildOutputSize);
+        for (int i = 0; i < rightChildOutputSize; ++i) {
+            if (child(1).getOutput().get(i).nullable() && 
!child(0).getOutput().get(i).nullable()) {
+                
resetNullableForLeftOutputs.add(child(0).getOutput().get(i).withNullable(true));
+            } else {
+                resetNullableForLeftOutputs.add(child(0).getOutput().get(i));
+            }
+        }
+        return resetNullableForLeftOutputs.build();
     }
 
     @Override
     public String toString() {
         return Utils.toSqlStringSkipNull("LogicalRecursiveCte",
-                "qualifier", qualifier,
+                "isUnionAll", isUnionAll,
                 "outputs", outputs,
                 "regularChildrenOutputs", regularChildrenOutputs,
-                "constantExprsList", constantExprsList,
-                "hasPushedFilter", hasPushedFilter,
                 "stats", statistics);
     }
 
@@ -130,13 +193,13 @@ public class LogicalRecursiveCte extends 
LogicalSetOperation implements Union, O
             return false;
         }
         LogicalRecursiveCte that = (LogicalRecursiveCte) o;
-        return super.equals(that) && hasPushedFilter == that.hasPushedFilter
-                && Objects.equals(constantExprsList, that.constantExprsList);
+        return isUnionAll == that.isUnionAll && Objects.equals(outputs, 
that.outputs)
+                && Objects.equals(regularChildrenOutputs, 
that.regularChildrenOutputs);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), hasPushedFilter, 
constantExprsList);
+        return Objects.hash(isUnionAll, outputs, regularChildrenOutputs);
     }
 
     @Override
@@ -145,64 +208,58 @@ public class LogicalRecursiveCte extends 
LogicalSetOperation implements Union, O
     }
 
     @Override
-    public LogicalRecursiveCte withChildren(List<Plan> children) {
-        return new LogicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs,
-                constantExprsList, hasPushedFilter, children);
+    public List<? extends Expression> getExpressions() {
+        return 
regularChildrenOutputs.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList());
     }
 
     @Override
-    public LogicalSetOperation withChildrenAndTheirOutputs(List<Plan> children,
+    public List<Slot> computeOutput() {
+        return outputs.stream()
+                .map(NamedExpression::toSlot)
+                .collect(ImmutableList.toImmutableList());
+    }
+
+    @Override
+    public LogicalRecursiveCte withChildren(List<Plan> children) {
+        return new LogicalRecursiveCte(isUnionAll, outputs, 
regularChildrenOutputs, children);
+    }
+
+    public LogicalRecursiveCte withChildrenAndTheirOutputs(List<Plan> children,
             List<List<SlotReference>> childrenOutputs) {
         Preconditions.checkArgument(children.size() == childrenOutputs.size(),
                 "children size %s is not equals with children outputs size %s",
                 children.size(), childrenOutputs.size());
-        return new LogicalRecursiveCte(qualifier, outputs, childrenOutputs, 
constantExprsList, hasPushedFilter,
-                children);
+        return new LogicalRecursiveCte(isUnionAll, outputs, childrenOutputs, 
children);
     }
 
     @Override
     public LogicalRecursiveCte withGroupExpression(Optional<GroupExpression> 
groupExpression) {
-        return new LogicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList, hasPushedFilter,
+        return new LogicalRecursiveCte(isUnionAll, outputs, 
regularChildrenOutputs,
                 groupExpression, Optional.of(getLogicalProperties()), 
children);
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
-        return new LogicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList, hasPushedFilter,
+        return new LogicalRecursiveCte(isUnionAll, outputs, 
regularChildrenOutputs,
                 groupExpression, logicalProperties, children);
     }
 
-    @Override
     public LogicalRecursiveCte withNewOutputs(List<NamedExpression> 
newOutputs) {
-        return new LogicalRecursiveCte(qualifier, newOutputs, 
regularChildrenOutputs, constantExprsList,
-                hasPushedFilter, Optional.empty(), Optional.empty(), children);
-    }
-
-    public LogicalRecursiveCte 
withNewOutputsAndConstExprsList(List<NamedExpression> newOutputs,
-            List<List<NamedExpression>> constantExprsList) {
-        return new LogicalRecursiveCte(qualifier, newOutputs, 
regularChildrenOutputs, constantExprsList,
-                hasPushedFilter, Optional.empty(), Optional.empty(), children);
-    }
-
-    public LogicalRecursiveCte withChildrenAndConstExprsList(List<Plan> 
children,
-            List<List<SlotReference>> childrenOutputs, 
List<List<NamedExpression>> constantExprsList) {
-        return new LogicalRecursiveCte(qualifier, outputs, childrenOutputs, 
constantExprsList, hasPushedFilter,
-                children);
+        return new LogicalRecursiveCte(isUnionAll, newOutputs, 
regularChildrenOutputs,
+                Optional.empty(), Optional.empty(), children);
     }
 
-    public LogicalRecursiveCte 
withNewOutputsChildrenAndConstExprsList(List<NamedExpression> newOutputs,
-            List<Plan> children,
-            List<List<SlotReference>> childrenOutputs,
-            List<List<NamedExpression>> constantExprsList) {
-        return new LogicalRecursiveCte(qualifier, newOutputs, childrenOutputs, 
constantExprsList,
-                hasPushedFilter, Optional.empty(), Optional.empty(), children);
+    public LogicalRecursiveCte withNewOutputsAndChildren(List<NamedExpression> 
newOutputs,
+                                                         List<Plan> children,
+                                                         
List<List<SlotReference>> childrenOutputs) {
+        return new LogicalRecursiveCte(isUnionAll, newOutputs, childrenOutputs,
+                Optional.empty(), Optional.empty(), children);
     }
 
-    public LogicalRecursiveCte withAllQualifier() {
-        return new LogicalRecursiveCte(Qualifier.ALL, outputs, 
regularChildrenOutputs, constantExprsList,
-                hasPushedFilter,
-                Optional.empty(), Optional.empty(), children);
+    @Override
+    public List<NamedExpression> getOutputs() {
+        return outputs;
     }
 
     @Override
@@ -212,7 +269,7 @@ public class LogicalRecursiveCte extends 
LogicalSetOperation implements Union, O
 
     @Override
     public void computeUnique(DataTrait.Builder builder) {
-        if (qualifier == Qualifier.DISTINCT) {
+        if (!isUnionAll) {
             builder.addUniqueSlot(ImmutableSet.copyOf(getOutput()));
         }
     }
@@ -224,19 +281,6 @@ public class LogicalRecursiveCte extends 
LogicalSetOperation implements Union, O
                         ConnectContext.get().getStatementContext(), this, 
PhysicalProperties.ANY)));
         for (int i = 0; i < getOutputs().size(); i++) {
             Optional<Literal> value = Optional.empty();
-            if (!constantExprsList.isEmpty()) {
-                value = 
ExpressionUtils.checkConstantExpr(constantExprsList.get(0).get(i), context);
-                if (!value.isPresent()) {
-                    continue;
-                }
-                final int fi = i;
-                Literal literal = value.get();
-                if (constantExprsList.stream()
-                        .map(exprs -> 
ExpressionUtils.checkConstantExpr(exprs.get(fi), context))
-                        .anyMatch(val -> !val.isPresent() || 
!val.get().equals(literal))) {
-                    continue;
-                }
-            }
             for (int childIdx = 0; childIdx < children.size(); childIdx++) {
                 // TODO: use originOutputs = child(childIdx).getOutput() ?
                 List<? extends Slot> originOutputs = 
regularChildrenOutputs.get(childIdx);
@@ -267,10 +311,7 @@ public class LogicalRecursiveCte extends 
LogicalSetOperation implements Union, O
 
     @Override
     public boolean hasUnboundExpression() {
-        if (!constantExprsList.isEmpty() && children.isEmpty()) {
-            return false;
-        }
-        return super.hasUnboundExpression();
+        return outputs.isEmpty();
     }
 
     private List<BitSet> mapSlotToIndex(Plan plan, List<Set<Slot>> 
equalSlotsList) {
@@ -384,7 +425,7 @@ public class LogicalRecursiveCte extends 
LogicalSetOperation implements Union, O
             Expression otherConstant = namedExpression.child(0);
             nullable |= otherConstant.nullable();
             DataType otherDataType = otherConstant.getDataType();
-            commonDataType = getAssignmentCompatibleType(commonDataType, 
otherDataType);
+            commonDataType = 
LogicalSetOperation.getAssignmentCompatibleType(commonDataType, otherDataType);
         }
         return Pair.of(commonDataType, nullable);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
index ef0cc98de0a..c0a7f0222d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
@@ -27,7 +27,7 @@ import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
-import org.apache.doris.nereids.trees.plans.algebra.Union;
+import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.qe.ConnectContext;
@@ -44,55 +44,91 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * PhysicalRecursiveCte is basically like PhysicalUnion
  */
-public class PhysicalRecursiveCte extends PhysicalSetOperation implements 
Union {
+public class PhysicalRecursiveCte extends AbstractPhysicalPlan implements 
RecursiveCte {
 
-    // in doris, we use union node to present one row relation
-    private final List<List<NamedExpression>> constantExprsList;
+    protected final List<NamedExpression> outputs;
+    protected final List<List<SlotReference>> regularChildrenOutputs;
+    private final boolean isUnionAll;
 
     /** PhysicalRecursiveCte */
-    public PhysicalRecursiveCte(Qualifier qualifier,
+    public PhysicalRecursiveCte(boolean isUnionAll,
             List<NamedExpression> outputs,
             List<List<SlotReference>> childrenOutputs,
-            List<List<NamedExpression>> constantExprsList,
             LogicalProperties logicalProperties,
             List<Plan> children) {
-        super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, 
childrenOutputs, logicalProperties, children);
-        this.constantExprsList = ImmutableList.copyOf(
-                Objects.requireNonNull(constantExprsList, "constantExprsList 
should not be null"));
+        this(isUnionAll, outputs, childrenOutputs, Optional.empty(), 
logicalProperties, children);
     }
 
     /** PhysicalRecursiveCte */
-    public PhysicalRecursiveCte(Qualifier qualifier,
+    public PhysicalRecursiveCte(boolean isUnionAll,
             List<NamedExpression> outputs,
             List<List<SlotReference>> childrenOutputs,
-            List<List<NamedExpression>> constantExprsList,
             Optional<GroupExpression> groupExpression,
             LogicalProperties logicalProperties,
             List<Plan> children) {
-        super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, 
childrenOutputs,
-                groupExpression, logicalProperties, children);
-        this.constantExprsList = ImmutableList.copyOf(
-                Objects.requireNonNull(constantExprsList, "constantExprsList 
should not be null"));
+        this(isUnionAll, outputs, childrenOutputs, groupExpression, 
logicalProperties,
+                PhysicalProperties.ANY, null, children);
     }
 
     /** PhysicalRecursiveCte */
-    public PhysicalRecursiveCte(Qualifier qualifier, List<NamedExpression> 
outputs,
-            List<List<SlotReference>> childrenOutputs, 
List<List<NamedExpression>> constantExprsList,
+    public PhysicalRecursiveCte(boolean isUnionAll, List<NamedExpression> 
outputs,
+            List<List<SlotReference>> childrenOutputs,
             Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
-            PhysicalProperties physicalProperties, Statistics statistics, 
List<Plan> inputs) {
-        super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, 
childrenOutputs,
-                groupExpression, logicalProperties, physicalProperties, 
statistics, inputs);
-        this.constantExprsList = ImmutableList.copyOf(
-                Objects.requireNonNull(constantExprsList, "constantExprsList 
should not be null"));
+            PhysicalProperties physicalProperties, Statistics statistics, 
List<Plan> children) {
+        super(PlanType.PHYSICAL_RECURSIVE_CTE, groupExpression, 
logicalProperties, physicalProperties,
+                statistics, children.toArray(new Plan[0]));
+        this.isUnionAll = isUnionAll;
+        this.outputs = ImmutableList.copyOf(outputs);
+        this.regularChildrenOutputs = ImmutableList.copyOf(childrenOutputs);
     }
 
-    public List<List<NamedExpression>> getConstantExprsList() {
-        return constantExprsList;
+    @Override
+    public boolean isUnionAll() {
+        return isUnionAll;
+    }
+
+    @Override
+    public List<SlotReference> getRegularChildOutput(int i) {
+        return regularChildrenOutputs.get(i);
+    }
+
+    @Override
+    public List<NamedExpression> getOutputs() {
+        return outputs;
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        return outputs.stream()
+                .map(NamedExpression::toSlot)
+                .collect(ImmutableList.toImmutableList());
+    }
+
+    @Override
+    public List<List<SlotReference>> getRegularChildrenOutputs() {
+        return regularChildrenOutputs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PhysicalRecursiveCte that = (PhysicalRecursiveCte) o;
+        return isUnionAll == that.isUnionAll && Objects.equals(outputs, 
that.outputs) && Objects.equals(
+                regularChildrenOutputs, that.regularChildrenOutputs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(isUnionAll, outputs, regularChildrenOutputs);
     }
 
     @Override
@@ -100,14 +136,18 @@ public class PhysicalRecursiveCte extends 
PhysicalSetOperation implements Union
         return visitor.visitPhysicalRecursiveCte(this, context);
     }
 
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return 
regularChildrenOutputs.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList());
+    }
+
     @Override
     public String toString() {
         return Utils.toSqlString("PhysicalRecursiveCte" + "[" + id.asInt() + 
"]" + getGroupIdWithPrefix(),
                 "stats", statistics,
-                "qualifier", qualifier,
+                "isUnionAll", isUnionAll,
                 "outputs", outputs,
-                "regularChildrenOutputs", regularChildrenOutputs,
-                "constantExprsList", constantExprsList);
+                "regularChildrenOutputs", regularChildrenOutputs);
     }
 
     @Override
@@ -117,11 +157,6 @@ public class PhysicalRecursiveCte extends 
PhysicalSetOperation implements Union
                 && 
context.getSessionVariable().getDetailShapePlanNodesSet().contains(getClass().getSimpleName()))
 {
             StringBuilder builder = new StringBuilder();
             builder.append(getClass().getSimpleName());
-            builder.append("(constantExprsList=");
-            builder.append(constantExprsList.stream()
-                    .map(exprs -> exprs.stream().map(Expression::shapeInfo)
-                            .collect(Collectors.joining(", ", "[", "]")))
-                    .collect(Collectors.joining(", ", "[", "]")));
             builder.append(")");
             return builder.toString();
         } else {
@@ -131,39 +166,39 @@ public class PhysicalRecursiveCte extends 
PhysicalSetOperation implements Union
 
     @Override
     public PhysicalRecursiveCte withChildren(List<Plan> children) {
-        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList, groupExpression,
+        return new PhysicalRecursiveCte(isUnionAll, outputs, 
regularChildrenOutputs, groupExpression,
                 getLogicalProperties(), children);
     }
 
     @Override
     public PhysicalRecursiveCte withGroupExpression(Optional<GroupExpression> 
groupExpression) {
-        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList,
+        return new PhysicalRecursiveCte(isUnionAll, outputs, 
regularChildrenOutputs,
                 groupExpression, getLogicalProperties(), children);
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
-        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList,
+        return new PhysicalRecursiveCte(isUnionAll, outputs, 
regularChildrenOutputs,
                 groupExpression, logicalProperties.get(), children);
     }
 
     @Override
     public PhysicalRecursiveCte withPhysicalPropertiesAndStats(
             PhysicalProperties physicalProperties, Statistics statistics) {
-        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList,
+        return new PhysicalRecursiveCte(isUnionAll, outputs, 
regularChildrenOutputs,
                 groupExpression, getLogicalProperties(), physicalProperties, 
statistics, children);
     }
 
     @Override
     public PhysicalRecursiveCte resetLogicalProperties() {
-        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList,
+        return new PhysicalRecursiveCte(isUnionAll, outputs, 
regularChildrenOutputs,
                 Optional.empty(), null, physicalProperties, statistics, 
children);
     }
 
     @Override
     public void computeUnique(DataTrait.Builder builder) {
-        if (qualifier == Qualifier.DISTINCT) {
+        if (!isUnionAll) {
             builder.addUniqueSlot(ImmutableSet.copyOf(getOutput()));
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index d34de256608..66918845e00 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -383,7 +383,7 @@ public abstract class PlanVisitor<R, C> implements 
CommandVisitor<R, C>, Relatio
     }
 
     public R visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, C 
context) {
-        return visitPhysicalSetOperation(recursiveCte, context);
+        return visit(recursiveCte, context);
     }
 
     public R visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> 
sort, C context) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java
index 103abd8b790..35065314bab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java
@@ -17,12 +17,12 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.TDataGenScanRange;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanNode;
@@ -63,18 +63,13 @@ public class RecursiveCteScanNode extends ScanNode {
         Collections.shuffle(backendList);
         Backend selectedBackend = backendList.get(0);
 
-        // create a dummy scan range
-        TScanRange scanRange = new TScanRange();
-        TDataGenScanRange dataGenScanRange = new TDataGenScanRange();
-        scanRange.setDataGenScanRange(dataGenScanRange);
-
         // create scan range locations
-        TScanRangeLocations locations = new TScanRangeLocations();
         TScanRangeLocation location = new TScanRangeLocation();
         location.setBackendId(selectedBackend.getId());
         location.setServer(new TNetworkAddress(selectedBackend.getHost(), 
selectedBackend.getBePort()));
+        TScanRangeLocations locations = new TScanRangeLocations();
         locations.addToLocations(location);
-        locations.setScanRange(scanRange);
+        locations.setScanRange(new TScanRange());
         scanRangeLocations.add(locations);
     }
 
@@ -96,8 +91,11 @@ public class RecursiveCteScanNode extends ScanNode {
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         StringBuilder output = new StringBuilder();
-        output.append(prefix).append("Recursive Cte: 
").append(getTableIf().getName());
-        output.append("\n");
+        output.append(prefix).append("Recursive Cte: 
").append(getTableIf().getName()).append("\n");
+        if (!conjuncts.isEmpty()) {
+            Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
+            output.append(prefix).append("PREDICATES: 
").append(expr.toSql()).append("\n");
+        }
         return output.toString();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 5fcd14fcb79..f6f24aae153 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.qe.runtime;
 
+import org.apache.doris.analysis.Expr;
 import org.apache.doris.catalog.AIResource;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Resource;
@@ -41,6 +42,9 @@ import org.apache.doris.planner.MultiCastDataSink;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.RecursiveCteNode;
+import org.apache.doris.planner.RecursiveCteScanNode;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SortNode;
 import org.apache.doris.qe.ConnectContext;
@@ -48,6 +52,7 @@ import org.apache.doris.qe.CoordinatorContext;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
 import org.apache.doris.thrift.TAIResource;
 import org.apache.doris.thrift.TDataSinkType;
+import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TFileScanRangeParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPipelineFragmentParams;
@@ -56,6 +61,9 @@ import org.apache.doris.thrift.TPipelineInstanceParams;
 import org.apache.doris.thrift.TPlanFragment;
 import org.apache.doris.thrift.TPlanFragmentDestination;
 import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TRecCTENode;
+import org.apache.doris.thrift.TRecCTEResetInfo;
+import org.apache.doris.thrift.TRecCTETarget;
 import org.apache.doris.thrift.TRuntimeFilterInfo;
 import org.apache.doris.thrift.TRuntimeFilterParams;
 import org.apache.doris.thrift.TScanRangeParams;
@@ -79,6 +87,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
@@ -89,6 +99,7 @@ public class ThriftPlansBuilder {
             CoordinatorContext coordinatorContext) {
 
         List<PipelineDistributedPlan> distributedPlans = 
coordinatorContext.distributedPlans;
+        setParamsForRecursiveCteNode(distributedPlans);
 
         // we should set runtime predicate first, then we can use heap sort 
and to thrift
         setRuntimePredicateIfNeed(coordinatorContext.scanNodes);
@@ -571,6 +582,91 @@ public class ThriftPlansBuilder {
         }
     }
 
+    private static void 
setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans) {
+        Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new 
TreeMap<>();
+        Map<PlanFragmentId, Set<TNetworkAddress>> 
fragmentIdToNetworkAddressMap = new TreeMap<>();
+        for (PipelineDistributedPlan plan : distributedPlans) {
+            List<AssignedJob> fragmentAssignedJobs = plan.getInstanceJobs();
+            Set<TNetworkAddress> networkAddresses = new TreeSet<>();
+            for (AssignedJob assignedJob : fragmentAssignedJobs) {
+                DistributedPlanWorker distributedPlanWorker = 
assignedJob.getAssignedWorker();
+                networkAddresses.add(new 
TNetworkAddress(distributedPlanWorker.host(),
+                        distributedPlanWorker.brpcPort()));
+            }
+            PlanFragment planFragment = plan.getFragmentJob().getFragment();
+            fragmentIdToNetworkAddressMap.put(planFragment.getFragmentId(), 
networkAddresses);
+
+            List<RecursiveCteScanNode> recursiveCteScanNodes = 
planFragment.getPlanRoot()
+                    
.collectInCurrentFragment(RecursiveCteScanNode.class::isInstance);
+            if (!recursiveCteScanNodes.isEmpty()) {
+                if (recursiveCteScanNodes.size() != 1) {
+                    throw new IllegalStateException(
+                            String.format("one fragment can only have 1 
recursive cte scan node, but there is %d",
+                                    recursiveCteScanNodes.size()));
+                }
+                if (fragmentAssignedJobs.isEmpty()) {
+                    throw new IllegalStateException(
+                            "fragmentAssignedJobs is empty for recursive cte 
scan node");
+                }
+                TRecCTETarget tRecCTETarget = new TRecCTETarget();
+                DistributedPlanWorker distributedPlanWorker = 
fragmentAssignedJobs.get(0).getAssignedWorker();
+                tRecCTETarget.setAddr(new 
TNetworkAddress(distributedPlanWorker.host(),
+                        distributedPlanWorker.brpcPort()));
+                
tRecCTETarget.setFragmentInstanceId(fragmentAssignedJobs.get(0).instanceId());
+                
tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt());
+                fragmentIdToRecCteTargetMap.put(planFragment.getFragmentId(), 
tRecCTETarget);
+            }
+
+            List<RecursiveCteNode> recursiveCteNodes = 
planFragment.getPlanRoot()
+                    
.collectInCurrentFragment(RecursiveCteNode.class::isInstance);
+            if (!recursiveCteNodes.isEmpty()) {
+                if (recursiveCteNodes.size() != 1) {
+                    throw new IllegalStateException(
+                            String.format("one fragment can only have 1 
recursive cte node, but there is %d",
+                                    recursiveCteNodes.size()));
+                }
+
+                List<TRecCTETarget> targets = new ArrayList<>();
+                List<TRecCTEResetInfo> fragmentsToReset = new ArrayList<>();
+                // PhysicalPlanTranslator will swap recursiveCteNodes's child 
fragment,
+                // so we get recursive one by 1st child
+                List<PlanFragment> childFragments = new ArrayList<>();
+                
planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, 
childFragments);
+                for (PlanFragment child : childFragments) {
+                    PlanFragmentId childFragmentId = child.getFragmentId();
+                    TRecCTETarget tRecCTETarget = 
fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null);
+                    if (tRecCTETarget != null) {
+                        targets.add(tRecCTETarget);
+                    }
+                    Set<TNetworkAddress> tNetworkAddresses = 
fragmentIdToNetworkAddressMap.get(childFragmentId);
+                    if (tNetworkAddresses == null) {
+                        throw new IllegalStateException(
+                                String.format("can't find TNetworkAddress for 
fragment %d", childFragmentId));
+                    }
+                    for (TNetworkAddress address : tNetworkAddresses) {
+                        TRecCTEResetInfo tRecCTEResetInfo = new 
TRecCTEResetInfo();
+                        
tRecCTEResetInfo.setFragmentId(childFragmentId.asInt());
+                        tRecCTEResetInfo.setAddr(address);
+                        fragmentsToReset.add(tRecCTEResetInfo);
+                    }
+                }
+
+                RecursiveCteNode recursiveCteNode = recursiveCteNodes.get(0);
+                List<List<Expr>> materializedResultExprLists = 
recursiveCteNode.getMaterializedResultExprLists();
+                List<List<TExpr>> texprLists = new 
ArrayList<>(materializedResultExprLists.size());
+                for (List<Expr> exprList : materializedResultExprLists) {
+                    texprLists.add(Expr.treesToThrift(exprList));
+                }
+                TRecCTENode tRecCTENode = new TRecCTENode();
+                tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll());
+                tRecCTENode.setTargets(targets);
+                tRecCTENode.setFragmentsToReset(fragmentsToReset);
+                tRecCTENode.setResultExprLists(texprLists);
+                recursiveCteNode.settRecCTENode(tRecCTENode);
+            }
+        }
+    }
+
     private static class PerNodeScanParams {
         Map<Integer, List<TScanRangeParams>> perNodeScanRanges;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to