morrySnow commented on code in PR #58916: URL: https://github.com/apache/doris/pull/58916#discussion_r2667932953
########## fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java: ########## @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.common.SystemIdGenerator; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +public class RecursiveCteTempTable extends Table { Review Comment: why extends from Table? why need this? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java: ########## @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.TypeCoercionUtils; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * LogicalRecursiveCte is basically like LogicalUnion + */ +public class LogicalRecursiveCte extends AbstractLogicalPlan implements RecursiveCte, OutputPrunable { Review Comment: The "CTE" should be capitalized to be consistent with "logicalCTE". ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java: ########## @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +/** + * LogicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle + */ +public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> { Review Comment: we should give it a better name, for example: recursiveUnionProducer ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java: ########## @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.TypeCoercionUtils; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * LogicalRecursiveCte is basically like LogicalUnion + */ +public class LogicalRecursiveCte extends AbstractLogicalPlan implements RecursiveCte, OutputPrunable { Review Comment: in pg, it named with `Recursive Union` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java: ########## @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.RelationId; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import java.util.List; +import java.util.Optional; + +/** + * LogicalRecursiveCteScan. + */ +public class LogicalRecursiveCteScan extends LogicalCatalogRelation { Review Comment: 在pg中称为 WorkTableReference ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java: ########## @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.TypeCoercionUtils; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * LogicalRecursiveCte is basically like LogicalUnion + */ +public class LogicalRecursiveCte extends AbstractLogicalPlan implements RecursiveCte, OutputPrunable { + private final String cteName; + private final List<NamedExpression> outputs; + private final List<List<SlotReference>> regularChildrenOutputs; + private final boolean isUnionAll; Review Comment: isUnionAll -> reuse Qualifier in set operation ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java: ########## @@ -78,6 +82,7 @@ public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> colum super(PlanType.LOGICAL_SUBQUERY_ALIAS, groupExpression, logicalProperties, child); this.qualifier = ImmutableList.copyOf(Objects.requireNonNull(qualifier, "qualifier is null")); this.columnAliases = columnAliases; + this.isRecursiveCte = computeIsRecursiveCte(); Review Comment: why we need isRecursiveCte in this class? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -1037,6 +1042,28 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT return planFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan recursiveCteScan, + PlanTranslatorContext context) { + TableIf table = recursiveCteScan.getTable(); + List<Slot> slots = ImmutableList.copyOf(recursiveCteScan.getOutput()); + TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context); + + RecursiveCteScanNode scanNode = new RecursiveCteScanNode(table != null ? table.getName() : "", + context.nextPlanNodeId(), tupleDescriptor); + scanNode.setNereidsId(recursiveCteScan.getId()); + context.getNereidsIdToPlanNodeIdMap().put(recursiveCteScan.getId(), scanNode.getId()); + Utils.execWithUncheckedException(scanNode::initScanRangeLocations); + + translateRuntimeFilter(recursiveCteScan, scanNode, context); + + context.addScanNode(scanNode, recursiveCteScan); Review Comment: i don't think it should be add to scan node. it is not event a real scan node ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTE.java: ########## @@ -41,21 +41,28 @@ public class LogicalCTE<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements PropagateFuncDeps { private final List<LogicalSubQueryAlias<Plan>> aliasQueries; + private final boolean isRecursiveCte; Review Comment: ```suggestion private final boolean isRecursive; ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java: ########## @@ -1127,6 +1127,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { private final Map<Integer, ParserRuleContext> selectHintMap; + private boolean isInRecursiveCteContext = false; Review Comment: add comment to explain why need this flag ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -2179,6 +2208,78 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project return inputFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) { + List<PlanFragment> childrenFragments = new ArrayList<>(); + for (Plan plan : recursiveCte.children()) { + childrenFragments.add(plan.accept(this, context)); + } + + TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context); + List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots()); + + RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), + recursiveCte.getCteName(), recursiveCte.isUnionAll()); + List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); + recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists); + recursiveCteNode.setNereidsId(recursiveCte.getId()); + List<List<Expression>> resultExpressionLists = Lists.newArrayList(); + context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId()); + for (List<SlotReference> regularChildrenOutput : recursiveCte.getRegularChildrenOutputs()) { + resultExpressionLists.add(new ArrayList<>(regularChildrenOutput)); + } + + for (PlanFragment childFragment : childrenFragments) { + recursiveCteNode.addChild(childFragment.getPlanRoot()); + } + + List<List<Expr>> materializedResultExprLists = Lists.newArrayList(); + for (int i = 0; i < resultExpressionLists.size(); ++i) { + List<Expression> resultExpressionList = resultExpressionLists.get(i); + List<Expr> exprList = Lists.newArrayList(); + Preconditions.checkState(resultExpressionList.size() == outputSlotDescs.size()); + for (int j = 0; j < resultExpressionList.size(); ++j) { + exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), context)); + // TODO: reconsider this, we may change nullable info in previous nereids rules not here. + outputSlotDescs.get(j) + .setIsNullable(outputSlotDescs.get(j).getIsNullable() || exprList.get(j).isNullable()); + } + materializedResultExprLists.add(exprList); + } + recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists); + Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size() + == recursiveCteNode.getChildren().size()); + + PlanFragment recursiveCteFragment; + if (childrenFragments.isEmpty()) { Review Comment: why childrenFragments could be empty here? i think it could not happen ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -2179,6 +2208,78 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project return inputFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) { + List<PlanFragment> childrenFragments = new ArrayList<>(); + for (Plan plan : recursiveCte.children()) { + childrenFragments.add(plan.accept(this, context)); + } + + TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context); + List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots()); + + RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), + recursiveCte.getCteName(), recursiveCte.isUnionAll()); + List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); + recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists); + recursiveCteNode.setNereidsId(recursiveCte.getId()); + List<List<Expression>> resultExpressionLists = Lists.newArrayList(); + context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId()); + for (List<SlotReference> regularChildrenOutput : recursiveCte.getRegularChildrenOutputs()) { + resultExpressionLists.add(new ArrayList<>(regularChildrenOutput)); + } + + for (PlanFragment childFragment : childrenFragments) { + recursiveCteNode.addChild(childFragment.getPlanRoot()); + } + + List<List<Expr>> materializedResultExprLists = Lists.newArrayList(); + for (int i = 0; i < resultExpressionLists.size(); ++i) { + List<Expression> resultExpressionList = resultExpressionLists.get(i); + List<Expr> exprList = Lists.newArrayList(); + Preconditions.checkState(resultExpressionList.size() == outputSlotDescs.size()); + for (int j = 0; j < resultExpressionList.size(); ++j) { + exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), context)); + // TODO: reconsider this, we may change nullable info in previous nereids rules not here. + outputSlotDescs.get(j) + .setIsNullable(outputSlotDescs.get(j).getIsNullable() || exprList.get(j).isNullable()); + } + materializedResultExprLists.add(exprList); + } + recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists); + Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size() + == recursiveCteNode.getChildren().size()); + + PlanFragment recursiveCteFragment; + if (childrenFragments.isEmpty()) { + recursiveCteFragment = createPlanFragment(recursiveCteNode, + DataPartition.UNPARTITIONED, recursiveCte); + context.addPlanFragment(recursiveCteFragment); + } else { + int childrenSize = childrenFragments.size(); + recursiveCteFragment = childrenFragments.get(childrenSize - 1); + for (int i = childrenSize - 2; i >= 0; i--) { + context.mergePlanFragment(childrenFragments.get(i), recursiveCteFragment); + for (PlanFragment child : childrenFragments.get(i).getChildren()) { + recursiveCteFragment.addChild(child); + } Review Comment: if we merged one fragment, we should not add it into children list ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); Review Comment: why need cte name when analyze anchor side? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, Review Comment: why change output slot name here? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable Review Comment: add comment to explain why need forceNullable ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); Review Comment: if also check children's size, but error message do not mention this check ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { Review Comment: ```suggestion if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.arity() != 2) { ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, + aliasQuery.getColumnAliases().orElse(ImmutableList.of())); + // analyze recursive child + LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); + CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, recursiveChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), + analyzedAnchorChild.getOutput()); + innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerRecursiveCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); + List<LogicalRecursiveCteScan> recursiveCteScanList = analyzedRecursiveChild + .collectToList(LogicalRecursiveCteScan.class::isInstance); + if (recursiveCteScanList.size() > 1) { + throw new AnalysisException(String.format("recursive reference to query %s must not appear more than once", + aliasQuery.getAlias())); + } Review Comment: we do not support nested recursive cte in recursive side? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java: ########## @@ -171,16 +173,32 @@ private LogicalPlan bindWithCurrentDb(CascadesContext cascadesContext, UnboundRe return consumer; } } - List<String> tableQualifier = RelationUtil.getQualifierName( - cascadesContext.getConnectContext(), unboundRelation.getNameParts()); - TableIf table = cascadesContext.getStatementContext().getAndCacheTable(tableQualifier, TableFrom.QUERY, - Optional.of(unboundRelation)); + LogicalPlan scan; + if (tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().orElse(""))) { Review Comment: should not use equalsIgnoreCase here, lower_case_table_names also effect cte binding, so we should use `org.apache.doris.nereids.CTEContext#findCTEContext` here. why not reuse normal cte binding code? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java: ########## @@ -107,8 +107,10 @@ private CTEContext collectFromCte( for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + // 看起来需要在CascadesContext中添加当前CTE的name,以便判断自引用 Review Comment: use english please ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java: ########## @@ -403,6 +410,54 @@ public <P extends Plan> P pruneOutput(P plan, List<NamedExpression> originOutput } } + private LogicalRecursiveCte pruneRecursiveCteOutput(LogicalRecursiveCte recursiveCte, PruneContext context) { Review Comment: remove this function, it useless ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -2165,8 +2192,10 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project if (inputPlanNode instanceof OlapScanNode) { ((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet); } - updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet, - requiredByProjectSlotIdSet, context); + if (!(inputPlanNode instanceof RecursiveCteScanNode)) { + updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet, + requiredByProjectSlotIdSet, context); Review Comment: if RecursiveCteScanNode do not extends from ScanNode, we do not need to add this if statement ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, + aliasQuery.getColumnAliases().orElse(ImmutableList.of())); + // analyze recursive child + LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); + CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, recursiveChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), + analyzedAnchorChild.getOutput()); + innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerRecursiveCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); + List<LogicalRecursiveCteScan> recursiveCteScanList = analyzedRecursiveChild + .collectToList(LogicalRecursiveCteScan.class::isInstance); + if (recursiveCteScanList.size() > 1) { + throw new AnalysisException(String.format("recursive reference to query %s must not appear more than once", + aliasQuery.getAlias())); + } + List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput(); + List<DataType> anchorChildOutputTypes = new ArrayList<>(anchorChildOutputs.size()); + for (Slot slot : anchorChildOutputs) { + anchorChildOutputTypes.add(slot.getDataType()); + } + List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput(); + for (int i = 0; i < recursiveChildOutputs.size(); ++i) { Review Comment: could out of index? if recursiveChildOutputs.size() < analyzedAnchorChild.size() ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, + aliasQuery.getColumnAliases().orElse(ImmutableList.of())); + // analyze recursive child + LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); + CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, recursiveChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), + analyzedAnchorChild.getOutput()); + innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerRecursiveCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); + List<LogicalRecursiveCteScan> recursiveCteScanList = analyzedRecursiveChild + .collectToList(LogicalRecursiveCteScan.class::isInstance); + if (recursiveCteScanList.size() > 1) { + throw new AnalysisException(String.format("recursive reference to query %s must not appear more than once", + aliasQuery.getAlias())); + } + List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput(); + List<DataType> anchorChildOutputTypes = new ArrayList<>(anchorChildOutputs.size()); + for (Slot slot : anchorChildOutputs) { + anchorChildOutputTypes.add(slot.getDataType()); + } + List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput(); + for (int i = 0; i < recursiveChildOutputs.size(); ++i) { + if (!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i))) { + throw new AnalysisException(String.format("%s recursive child's %d column's datatype in select list %s " + + "is different from anchor child's output datatype %s, please add cast manually " + + "to get expect datatype", aliasQuery.getAlias(), i + 1, + recursiveChildOutputs.get(i).getDataType(), anchorChildOutputTypes.get(i))); + } + } + analyzedRecursiveChild = new LogicalRecursiveCteRecursiveChild<>(aliasQuery.getAlias(), + forceOutputNullable(analyzedRecursiveChild, ImmutableList.of())); + + // create LogicalRecursiveCte + LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan; + LogicalRecursiveCte analyzedCtePlan = new LogicalRecursiveCte(aliasQuery.getAlias(), + logicalUnion.getQualifier() == SetOperation.Qualifier.ALL, + ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild)); + List<List<NamedExpression>> childrenProjections = analyzedCtePlan.collectChildrenProjections(); + int childrenProjectionSize = childrenProjections.size(); Review Comment: childrenProjectionSize always be 2? if so, builderWithExpectedSize is unnecessary ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, + aliasQuery.getColumnAliases().orElse(ImmutableList.of())); + // analyze recursive child + LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); + CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, recursiveChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), + analyzedAnchorChild.getOutput()); + innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerRecursiveCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); + List<LogicalRecursiveCteScan> recursiveCteScanList = analyzedRecursiveChild + .collectToList(LogicalRecursiveCteScan.class::isInstance); + if (recursiveCteScanList.size() > 1) { + throw new AnalysisException(String.format("recursive reference to query %s must not appear more than once", + aliasQuery.getAlias())); + } + List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput(); + List<DataType> anchorChildOutputTypes = new ArrayList<>(anchorChildOutputs.size()); + for (Slot slot : anchorChildOutputs) { + anchorChildOutputTypes.add(slot.getDataType()); + } + List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput(); + for (int i = 0; i < recursiveChildOutputs.size(); ++i) { + if (!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i))) { + throw new AnalysisException(String.format("%s recursive child's %d column's datatype in select list %s " + + "is different from anchor child's output datatype %s, please add cast manually " + + "to get expect datatype", aliasQuery.getAlias(), i + 1, + recursiveChildOutputs.get(i).getDataType(), anchorChildOutputTypes.get(i))); + } + } + analyzedRecursiveChild = new LogicalRecursiveCteRecursiveChild<>(aliasQuery.getAlias(), + forceOutputNullable(analyzedRecursiveChild, ImmutableList.of())); + + // create LogicalRecursiveCte + LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan; + LogicalRecursiveCte analyzedCtePlan = new LogicalRecursiveCte(aliasQuery.getAlias(), + logicalUnion.getQualifier() == SetOperation.Qualifier.ALL, + ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild)); + List<List<NamedExpression>> childrenProjections = analyzedCtePlan.collectChildrenProjections(); Review Comment: i don't think recursive cte need this step, because it used to do cast before set operation, but all output of children of recursive cte are excactly same, so we do not need to do cast. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java: ########## @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.jetbrains.annotations.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * PhysicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle + */ +public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> { + private final String cteName; + + public PhysicalRecursiveCteRecursiveChild(String cteName, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(cteName, Optional.empty(), logicalProperties, child); + } + + public PhysicalRecursiveCteRecursiveChild(String cteName, Optional<GroupExpression> groupExpression, + LogicalProperties logicalProperties, CHILD_TYPE child) { + this(cteName, groupExpression, logicalProperties, PhysicalProperties.ANY, null, child); + } + + public PhysicalRecursiveCteRecursiveChild(String cteName, Optional<GroupExpression> groupExpression, + LogicalProperties logicalProperties, @Nullable PhysicalProperties physicalProperties, Statistics statistics, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_RECURSIVE_CTE_RECURSIVE_CHILD, groupExpression, logicalProperties, physicalProperties, + statistics, child); + this.cteName = cteName; + } + + @Override + public String toString() { + return Utils.toSqlStringSkipNull("PhysicalRecursiveCteRecursiveChild", + "cteName", cteName); + } + + @Override + public Plan withChildren(List<Plan> children) { + Preconditions.checkArgument(children.size() == 1); + return new PhysicalRecursiveCteRecursiveChild<>(cteName, groupExpression, getLogicalProperties(), + children.get(0)); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitPhysicalRecursiveCteRecursiveChild(this, context); + } + + @Override + public List<? extends Expression> getExpressions() { + return ImmutableList.of(); + } + + @Override + public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new PhysicalRecursiveCteRecursiveChild<>(cteName, groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + Preconditions.checkArgument(children.size() == 1); + return new PhysicalRecursiveCteRecursiveChild<>(cteName, groupExpression, logicalProperties.get(), child()); + } + + @Override + public void computeUnique(DataTrait.Builder builder) { + + } + + @Override + public void computeUniform(DataTrait.Builder builder) { + + } + + @Override + public void computeEqualSet(DataTrait.Builder builder) { + Review Comment: i'm confused if this node do not impl any data traits functions, why PhysicalRecursiveCte need to implement these functions? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java: ########## @@ -876,6 +883,12 @@ public Statistics visitLogicalEsScan(LogicalEsScan esScan, Void context) { return computeCatalogRelation(esScan); } + @Override + public Statistics visitLogicalRecursiveCteScan(LogicalRecursiveCteScan recursiveCteScan, Void context) { + recursiveCteScan.getExpressions(); + return computeCatalogRelation(recursiveCteScan); Review Comment: why reuse compute catalog relation? it does not make any sense ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java: ########## @@ -113,4 +128,19 @@ public Plan visitLogicalCTEConsumer(LogicalCTEConsumer cteConsumer, LogicalCTEPr } return cteConsumer; } + + private void collectMustInlineCteConsumers(Plan planNode, boolean needCollect, + Set<LogicalCTEConsumer> cteConsumers) { + if (planNode instanceof LogicalCTEConsumer) { + if (needCollect) { + cteConsumers.add((LogicalCTEConsumer) planNode); + } + } else if (planNode instanceof LogicalRecursiveCteRecursiveChild) { + collectMustInlineCteConsumers(planNode.child(0), true, cteConsumers); Review Comment: why need this collector? i think we could add a recursive flag on CteAnchor or somewhere else to process it easily ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java: ########## @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +/** + * LogicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle + */ +public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> { Review Comment: should implement hashCode and equals ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, + aliasQuery.getColumnAliases().orElse(ImmutableList.of())); + // analyze recursive child + LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); + CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, recursiveChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), + analyzedAnchorChild.getOutput()); Review Comment: cte scan node read data from where? union output? recursive cte node output? ########## fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java: ########## @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; + +// Full scan of recursive cte temp table +public class RecursiveCteScanNode extends ScanNode { Review Comment: i don't think it should impl scannode ########## fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java: ########## @@ -576,6 +595,130 @@ private static void filterInstancesWhichReceiveDataFromRemote( } } + private static Set<Integer> setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans, Review Comment: this function is too long, please add some comment to it to explain how it is work ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java: ########## @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.TypeCoercionUtils; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * LogicalRecursiveCte is basically like LogicalUnion + */ +public class LogicalRecursiveCte extends AbstractLogicalPlan implements RecursiveCte, OutputPrunable { + private final String cteName; + private final List<NamedExpression> outputs; + private final List<List<SlotReference>> regularChildrenOutputs; + private final boolean isUnionAll; + + /** LogicalRecursiveCte */ + public LogicalRecursiveCte(String cteName, boolean isUnionAll, List<Plan> children) { + this(cteName, isUnionAll, ImmutableList.of(), ImmutableList.of(), children); + } + + /** LogicalRecursiveCte */ + public LogicalRecursiveCte(String cteName, boolean isUnionAll, List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, List<Plan> children) { + this(cteName, isUnionAll, outputs, childrenOutputs, Optional.empty(), + Optional.empty(), + children); + } + + /** LogicalRecursiveCte */ + public LogicalRecursiveCte(String cteName, boolean isUnionAll, List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, + Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, + List<Plan> children) { + super(PlanType.LOGICAL_RECURSIVE_CTE, groupExpression, logicalProperties, children); + this.cteName = cteName; + this.isUnionAll = isUnionAll; + this.outputs = ImmutableList.copyOf(outputs); + this.regularChildrenOutputs = ImmutableList.copyOf(childrenOutputs); + } + + @Override + public boolean isUnionAll() { + return isUnionAll; + } + + public String getCteName() { + return cteName; + } + + @Override + public List<SlotReference> getRegularChildOutput(int i) { + return regularChildrenOutputs.get(i); + } + + @Override + public List<List<SlotReference>> getRegularChildrenOutputs() { + return regularChildrenOutputs; + } + + public List<List<NamedExpression>> collectChildrenProjections() { + return castCommonDataTypeOutputs(); + } + + private List<List<NamedExpression>> castCommonDataTypeOutputs() { + int childOutputSize = child(0).getOutput().size(); + ImmutableList.Builder<NamedExpression> newLeftOutputs = ImmutableList.builderWithExpectedSize( + childOutputSize); + ImmutableList.Builder<NamedExpression> newRightOutputs = ImmutableList.builderWithExpectedSize( + childOutputSize + ); + // Ensure that the output types of the left and right children are consistent and expand upward. + for (int i = 0; i < childOutputSize; ++i) { + Slot left = child(0).getOutput().get(i); + Slot right = child(1).getOutput().get(i); + DataType compatibleType; + try { + compatibleType = LogicalSetOperation.getAssignmentCompatibleType(left.getDataType(), + right.getDataType()); + } catch (Exception e) { + throw new AnalysisException( + "Can not find compatible type for " + left + " and " + right + ", " + e.getMessage()); + } + Expression newLeft = TypeCoercionUtils.castIfNotSameTypeStrict(left, compatibleType); + Expression newRight = TypeCoercionUtils.castIfNotSameTypeStrict(right, compatibleType); + if (newLeft instanceof Cast) { + newLeft = new Alias(newLeft, left.getName()); + } + if (newRight instanceof Cast) { + newRight = new Alias(newRight, right.getName()); + } + newLeftOutputs.add((NamedExpression) newLeft); + newRightOutputs.add((NamedExpression) newRight); + } + + return ImmutableList.of(newLeftOutputs.build(), newRightOutputs.build()); + } + + /** + * Generate new output for Recursive Cte. + */ + public List<NamedExpression> buildNewOutputs() { + List<Slot> slots = resetNullableForLeftOutputs(); + ImmutableList.Builder<NamedExpression> newOutputs = ImmutableList.builderWithExpectedSize(slots.size()); + + for (int i = 0; i < slots.size(); i++) { + Slot slot = slots.get(i); + ExprId exprId = i < outputs.size() ? outputs.get(i).getExprId() : StatementScopeIdGenerator.newExprId(); + newOutputs.add( + new SlotReference(exprId, slot.toSql(), slot.getDataType(), slot.nullable(), ImmutableList.of()) + ); + } + return newOutputs.build(); + } + + // If the right child is nullable, need to ensure that the left child is also nullable + private List<Slot> resetNullableForLeftOutputs() { + int rightChildOutputSize = child(1).getOutput().size(); + ImmutableList.Builder<Slot> resetNullableForLeftOutputs + = ImmutableList.builderWithExpectedSize(rightChildOutputSize); + for (int i = 0; i < rightChildOutputSize; ++i) { + if (child(1).getOutput().get(i).nullable() && !child(0).getOutput().get(i).nullable()) { + resetNullableForLeftOutputs.add(child(0).getOutput().get(i).withNullable(true)); + } else { + resetNullableForLeftOutputs.add(child(0).getOutput().get(i)); + } + } + return resetNullableForLeftOutputs.build(); + } + + @Override + public String toString() { + return Utils.toSqlStringSkipNull("LogicalRecursiveCte", + "cteName", cteName, + "isUnionAll", isUnionAll, + "outputs", outputs, + "regularChildrenOutputs", regularChildrenOutputs, + "stats", statistics); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LogicalRecursiveCte that = (LogicalRecursiveCte) o; + return cteName.equals(that.cteName) && isUnionAll == that.isUnionAll && Objects.equals(outputs, that.outputs) + && Objects.equals(regularChildrenOutputs, that.regularChildrenOutputs); + } + + @Override + public int hashCode() { + return Objects.hash(cteName, isUnionAll, outputs, regularChildrenOutputs); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitLogicalRecursiveCte(this, context); + } + + @Override + public List<? extends Expression> getExpressions() { + return regularChildrenOutputs.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList()); + } + + @Override + public List<Slot> computeOutput() { + return outputs.stream() + .map(NamedExpression::toSlot) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public LogicalRecursiveCte withChildren(List<Plan> children) { + return new LogicalRecursiveCte(cteName, isUnionAll, outputs, regularChildrenOutputs, children); + } + + public LogicalRecursiveCte withChildrenAndTheirOutputs(List<Plan> children, + List<List<SlotReference>> childrenOutputs) { + Preconditions.checkArgument(children.size() == childrenOutputs.size(), + "children size %s is not equals with children outputs size %s", + children.size(), childrenOutputs.size()); + return new LogicalRecursiveCte(cteName, isUnionAll, outputs, childrenOutputs, children); + } + + @Override + public LogicalRecursiveCte withGroupExpression(Optional<GroupExpression> groupExpression) { + return new LogicalRecursiveCte(cteName, isUnionAll, outputs, regularChildrenOutputs, + groupExpression, Optional.of(getLogicalProperties()), children); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new LogicalRecursiveCte(cteName, isUnionAll, outputs, regularChildrenOutputs, + groupExpression, logicalProperties, children); + } + + public LogicalRecursiveCte withNewOutputs(List<NamedExpression> newOutputs) { + return new LogicalRecursiveCte(cteName, isUnionAll, newOutputs, regularChildrenOutputs, + Optional.empty(), Optional.empty(), children); + } + + public LogicalRecursiveCte withNewOutputsAndChildren(List<NamedExpression> newOutputs, + List<Plan> children, + List<List<SlotReference>> childrenOutputs) { + return new LogicalRecursiveCte(cteName, isUnionAll, newOutputs, childrenOutputs, + Optional.empty(), Optional.empty(), children); + } + + @Override + public List<NamedExpression> getOutputs() { + return outputs; + } + + @Override + public LogicalRecursiveCte pruneOutputs(List<NamedExpression> prunedOutputs) { + return withNewOutputs(prunedOutputs); + } + + @Override + public void computeUnique(DataTrait.Builder builder) { + } Review Comment: it is not same with Physical one, so which one is right? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java: ########## @@ -318,6 +319,16 @@ public Void visitPhysicalUnion(PhysicalUnion union, PlanContext context) { return null; } + @Override + public Void visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanContext context) { Review Comment: recursiveCte should implement LogicalBinary and generate a request list with two element directly ########## fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java: ########## @@ -150,6 +153,11 @@ public PhysicalProperties visitPhysicalFileScan(PhysicalFileScan fileScan, PlanC return PhysicalProperties.STORAGE_ANY; } + @Override + public PhysicalProperties visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan cteScan, PlanContext context) { + return PhysicalProperties.ANY; Review Comment: if this is any, we should not do compute scan range for CteScanNode when translator, and let coordinator choose a random BE ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -2179,6 +2208,78 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project return inputFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) { + List<PlanFragment> childrenFragments = new ArrayList<>(); + for (Plan plan : recursiveCte.children()) { + childrenFragments.add(plan.accept(this, context)); + } + + TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context); + List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots()); + + RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), + recursiveCte.getCteName(), recursiveCte.isUnionAll()); + List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); Review Comment: should generate distributeExpr before generateTupleDesc ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -2179,6 +2208,78 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project return inputFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) { + List<PlanFragment> childrenFragments = new ArrayList<>(); + for (Plan plan : recursiveCte.children()) { + childrenFragments.add(plan.accept(this, context)); + } + + TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context); + List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots()); + + RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), + recursiveCte.getCteName(), recursiveCte.isUnionAll()); + List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); + recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists); + recursiveCteNode.setNereidsId(recursiveCte.getId()); + List<List<Expression>> resultExpressionLists = Lists.newArrayList(); + context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId()); + for (List<SlotReference> regularChildrenOutput : recursiveCte.getRegularChildrenOutputs()) { + resultExpressionLists.add(new ArrayList<>(regularChildrenOutput)); + } + + for (PlanFragment childFragment : childrenFragments) { + recursiveCteNode.addChild(childFragment.getPlanRoot()); + } + + List<List<Expr>> materializedResultExprLists = Lists.newArrayList(); + for (int i = 0; i < resultExpressionLists.size(); ++i) { + List<Expression> resultExpressionList = resultExpressionLists.get(i); + List<Expr> exprList = Lists.newArrayList(); + Preconditions.checkState(resultExpressionList.size() == outputSlotDescs.size()); + for (int j = 0; j < resultExpressionList.size(); ++j) { + exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), context)); + // TODO: reconsider this, we may change nullable info in previous nereids rules not here. + outputSlotDescs.get(j) + .setIsNullable(outputSlotDescs.get(j).getIsNullable() || exprList.get(j).isNullable()); + } + materializedResultExprLists.add(exprList); + } + recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists); + Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size() + == recursiveCteNode.getChildren().size()); + + PlanFragment recursiveCteFragment; + if (childrenFragments.isEmpty()) { + recursiveCteFragment = createPlanFragment(recursiveCteNode, + DataPartition.UNPARTITIONED, recursiveCte); + context.addPlanFragment(recursiveCteFragment); + } else { + int childrenSize = childrenFragments.size(); + recursiveCteFragment = childrenFragments.get(childrenSize - 1); + for (int i = childrenSize - 2; i >= 0; i--) { + context.mergePlanFragment(childrenFragments.get(i), recursiveCteFragment); Review Comment: why need merge two children? i think only anchor side could be merged? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -2179,6 +2208,78 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project return inputFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) { + List<PlanFragment> childrenFragments = new ArrayList<>(); + for (Plan plan : recursiveCte.children()) { + childrenFragments.add(plan.accept(this, context)); + } + + TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context); + List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots()); + + RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), + recursiveCte.getCteName(), recursiveCte.isUnionAll()); + List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); + recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists); + recursiveCteNode.setNereidsId(recursiveCte.getId()); + List<List<Expression>> resultExpressionLists = Lists.newArrayList(); + context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId()); + for (List<SlotReference> regularChildrenOutput : recursiveCte.getRegularChildrenOutputs()) { + resultExpressionLists.add(new ArrayList<>(regularChildrenOutput)); + } + + for (PlanFragment childFragment : childrenFragments) { + recursiveCteNode.addChild(childFragment.getPlanRoot()); + } + + List<List<Expr>> materializedResultExprLists = Lists.newArrayList(); + for (int i = 0; i < resultExpressionLists.size(); ++i) { + List<Expression> resultExpressionList = resultExpressionLists.get(i); + List<Expr> exprList = Lists.newArrayList(); + Preconditions.checkState(resultExpressionList.size() == outputSlotDescs.size()); + for (int j = 0; j < resultExpressionList.size(); ++j) { + exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), context)); + // TODO: reconsider this, we may change nullable info in previous nereids rules not here. + outputSlotDescs.get(j) + .setIsNullable(outputSlotDescs.get(j).getIsNullable() || exprList.get(j).isNullable()); Review Comment: must move to nereids, i think if u impl computeOutput in LogicalRecursiveCte in right way, the nullable should be correct when generate output tuple ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -2179,6 +2208,78 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project return inputFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) { + List<PlanFragment> childrenFragments = new ArrayList<>(); + for (Plan plan : recursiveCte.children()) { + childrenFragments.add(plan.accept(this, context)); + } + + TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context); + List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots()); + + RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), + recursiveCte.getCteName(), recursiveCte.isUnionAll()); + List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); + recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists); + recursiveCteNode.setNereidsId(recursiveCte.getId()); + List<List<Expression>> resultExpressionLists = Lists.newArrayList(); + context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId()); + for (List<SlotReference> regularChildrenOutput : recursiveCte.getRegularChildrenOutputs()) { + resultExpressionLists.add(new ArrayList<>(regularChildrenOutput)); + } + + for (PlanFragment childFragment : childrenFragments) { + recursiveCteNode.addChild(childFragment.getPlanRoot()); + } + + List<List<Expr>> materializedResultExprLists = Lists.newArrayList(); + for (int i = 0; i < resultExpressionLists.size(); ++i) { + List<Expression> resultExpressionList = resultExpressionLists.get(i); + List<Expr> exprList = Lists.newArrayList(); + Preconditions.checkState(resultExpressionList.size() == outputSlotDescs.size()); Review Comment: if use Preconditions.checkXXX, please add error message ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, + aliasQuery.getColumnAliases().orElse(ImmutableList.of())); + // analyze recursive child + LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); + CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, recursiveChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), + analyzedAnchorChild.getOutput()); + innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerRecursiveCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); + List<LogicalRecursiveCteScan> recursiveCteScanList = analyzedRecursiveChild + .collectToList(LogicalRecursiveCteScan.class::isInstance); + if (recursiveCteScanList.size() > 1) { + throw new AnalysisException(String.format("recursive reference to query %s must not appear more than once", + aliasQuery.getAlias())); + } + List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput(); + List<DataType> anchorChildOutputTypes = new ArrayList<>(anchorChildOutputs.size()); + for (Slot slot : anchorChildOutputs) { + anchorChildOutputTypes.add(slot.getDataType()); + } + List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput(); + for (int i = 0; i < recursiveChildOutputs.size(); ++i) { + if (!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i))) { Review Comment: think about two struct type, that have same field number and field type, only have different field name. another case is variant type, check variant type's equals function, it has some attributes, we should allow these attributes different because struct's fields' name and variant's attribute are may be generated by planner itself and could be changed between different version ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java: ########## @@ -289,6 +290,53 @@ public Plan visitLogicalRepeat(LogicalRepeat<? extends Plan> repeat, Map<ExprId, return repeat.withGroupSetsAndOutput(repeat.getGroupingSets(), newOutputs).recomputeLogicalProperties(); } + @Override + public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, Map<ExprId, Slot> replaceMap) { + recursiveCte = (LogicalRecursiveCte) super.visit(recursiveCte, replaceMap); + ImmutableList.Builder<List<SlotReference>> newChildrenOutputs = ImmutableList.builder(); + List<Boolean> inputNullable = null; + if (!recursiveCte.children().isEmpty()) { Review Comment: why recursiveCte's children could be empty? just copy from set operation and not optimize for recursiveCte? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java: ########## @@ -95,25 +109,146 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte( List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + Set<LogicalRecursiveCteScan> recursiveCteScans = analyzedAnchorChild + .collect(LogicalRecursiveCteScan.class::isInstance); + for (LogicalRecursiveCteScan cteScan : recursiveCteScans) { + if (cteScan.getTable().getName().equalsIgnoreCase(aliasQuery.getAlias())) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } + } + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, + aliasQuery.getColumnAliases().orElse(ImmutableList.of())); + // analyze recursive child + LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); + CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, recursiveChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), + analyzedAnchorChild.getOutput()); + innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerRecursiveCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); + List<LogicalRecursiveCteScan> recursiveCteScanList = analyzedRecursiveChild + .collectToList(LogicalRecursiveCteScan.class::isInstance); + if (recursiveCteScanList.size() > 1) { + throw new AnalysisException(String.format("recursive reference to query %s must not appear more than once", + aliasQuery.getAlias())); + } + List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput(); + List<DataType> anchorChildOutputTypes = new ArrayList<>(anchorChildOutputs.size()); + for (Slot slot : anchorChildOutputs) { + anchorChildOutputTypes.add(slot.getDataType()); + } + List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput(); + for (int i = 0; i < recursiveChildOutputs.size(); ++i) { + if (!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i))) { + throw new AnalysisException(String.format("%s recursive child's %d column's datatype in select list %s " + + "is different from anchor child's output datatype %s, please add cast manually " + + "to get expect datatype", aliasQuery.getAlias(), i + 1, + recursiveChildOutputs.get(i).getDataType(), anchorChildOutputTypes.get(i))); + } + } + analyzedRecursiveChild = new LogicalRecursiveCteRecursiveChild<>(aliasQuery.getAlias(), + forceOutputNullable(analyzedRecursiveChild, ImmutableList.of())); Review Comment: why also changed recursive side's output name? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java: ########## @@ -179,6 +182,10 @@ private void collectFromUnboundRelation(CascadesContext cascadesContext, List<String> nameParts, TableFrom tableFrom, Optional<UnboundRelation> unboundRelation) { if (nameParts.size() == 1) { String tableName = nameParts.get(0); + if (cascadesContext.getCurrentRecursiveCteName().isPresent() + && tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().get())) { + return; + } Review Comment: should reuse findCteContext too ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java: ########## @@ -213,6 +214,12 @@ public Plan visitLogicalProject(LogicalProject<? extends Plan> project, PruneCon return pruneChildren(plan, new RoaringBitmap()); } + @Override + public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, PruneContext context) { + // keep LogicalRecursiveCte's output unchanged + return skipPruneThis(recursiveCte); Review Comment: i think we could safely prune it output if we do not let it impl OutputPrunable ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java: ########## @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.TypeCoercionUtils; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * LogicalRecursiveCte is basically like LogicalUnion + */ +public class LogicalRecursiveCte extends AbstractLogicalPlan implements RecursiveCte, OutputPrunable { Review Comment: should not implement OutputPrunable ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java: ########## @@ -289,6 +290,53 @@ public Plan visitLogicalRepeat(LogicalRepeat<? extends Plan> repeat, Map<ExprId, return repeat.withGroupSetsAndOutput(repeat.getGroupingSets(), newOutputs).recomputeLogicalProperties(); } + @Override + public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, Map<ExprId, Slot> replaceMap) { + recursiveCte = (LogicalRecursiveCte) super.visit(recursiveCte, replaceMap); + ImmutableList.Builder<List<SlotReference>> newChildrenOutputs = ImmutableList.builder(); + List<Boolean> inputNullable = null; + if (!recursiveCte.children().isEmpty()) { + inputNullable = Lists.newArrayListWithCapacity(recursiveCte.getOutputs().size()); + for (int i = 0; i < recursiveCte.getOutputs().size(); i++) { + inputNullable.add(false); Review Comment: these code block let me confused, in which scenario, LogicalRecursiveCte' children's output could be not null? or it is just defensive programming -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
