This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 13d24297a7 [fix](Nereids) type check could not work when root node is table or file sink (#22902) 13d24297a7 is described below commit 13d24297a716c6994d3a3924d3bb73ca650acb88 Author: morrySnow <101034200+morrys...@users.noreply.github.com> AuthorDate: Tue Aug 15 11:45:16 2023 +0800 [fix](Nereids) type check could not work when root node is table or file sink (#22902) type check could not work because no expression in plan. sink and scan have no expression at all. so cannot check type. this pr add expression on logical sink to let type check work well --- .../nereids/analyzer/UnboundOlapTableSink.java | 3 +- .../doris/nereids/analyzer/UnboundResultSink.java | 5 +- .../doris/nereids/jobs/executor/Analyzer.java | 4 +- .../doris/nereids/parser/LogicalPlanBuilder.java | 2 +- .../org/apache/doris/nereids/rules/RuleType.java | 1 + .../rules/analysis/BindInsertTargetTable.java | 209 ------------------- .../doris/nereids/rules/analysis/BindSink.java | 227 +++++++++++++++++++++ .../logical/LogicalDeferMaterializeResultSink.java | 9 +- .../trees/plans/logical/LogicalFileSink.java | 43 ++-- .../trees/plans/logical/LogicalOlapTableSink.java | 57 +++--- .../trees/plans/logical/LogicalResultSink.java | 45 +--- .../nereids/trees/plans/logical/LogicalSink.java | 51 ++++- 12 files changed, 337 insertions(+), 319 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java index 484ab16b50..b7306fd44e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Objects; @@ -63,7 +64,7 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints, List<String> partitions, boolean isPartialUpdate, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child); this.nameParts = Utils.copyRequiredList(nameParts); this.colNames = Utils.copyRequiredList(colNames); this.hints = Utils.copyRequiredList(hints); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java index 9743d0fe8f..3c1519d6f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Optional; @@ -40,12 +41,12 @@ import java.util.Optional; public class UnboundResultSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE> implements Unbound, Sink { public UnboundResultSink(CHILD_TYPE child) { - super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, child); + super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, ImmutableList.of(), child); } public UnboundResultSink(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, ImmutableList.of(), groupExpression, logicalProperties, child); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java index 96afa6b24a..a620d6b5b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java @@ -22,9 +22,9 @@ import org.apache.doris.nereids.jobs.rewrite.RewriteJob; import org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySet; import org.apache.doris.nereids.rules.analysis.AnalyzeCTE; import org.apache.doris.nereids.rules.analysis.BindExpression; -import org.apache.doris.nereids.rules.analysis.BindInsertTargetTable; import org.apache.doris.nereids.rules.analysis.BindRelation; import org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver; +import org.apache.doris.nereids.rules.analysis.BindSink; import org.apache.doris.nereids.rules.analysis.CheckAnalysis; import org.apache.doris.nereids.rules.analysis.CheckBound; import org.apache.doris.nereids.rules.analysis.CheckPolicy; @@ -86,7 +86,7 @@ public class Analyzer extends AbstractBatchJobExecutor { new UserAuthentication(), new BindExpression() ), - topDown(new BindInsertTargetTable()), + topDown(new BindSink()), bottomUp(new CheckBound()), bottomUp( new ProjectToGlobalAggregate(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index a23825a842..c73ca04a2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -1496,7 +1496,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { properties.put(key, value); } Literal filePath = (Literal) visit(ctx.filePath); - return new LogicalFileSink<>(filePath.getStringValue(), format, properties, plan); + return new LogicalFileSink<>(filePath.getStringValue(), format, properties, ImmutableList.of(), plan); } private LogicalPlan withQueryOrganization(LogicalPlan inputPlan, QueryOrganizationContext ctx) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index fbefe06456..fb35699164 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -57,6 +57,7 @@ public enum RuleType { BINDING_SET_OPERATION_SLOT(RuleTypeClass.REWRITE), BINDING_GENERATE_FUNCTION(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_FILE(RuleTypeClass.REWRITE), REPLACE_SORT_EXPRESSION_BY_CHILD_OUTPUT(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java deleted file mode 100644 index 750413b919..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java +++ /dev/null @@ -1,209 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.rules.analysis; - -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.Pair; -import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; -import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.parser.NereidsParser; -import org.apache.doris.nereids.rules.Rule; -import org.apache.doris.nereids.rules.RuleType; -import org.apache.doris.nereids.trees.expressions.Alias; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.literal.Literal; -import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; -import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.nereids.trees.plans.logical.LogicalProject; -import org.apache.doris.nereids.types.DataType; -import org.apache.doris.nereids.util.RelationUtil; -import org.apache.doris.nereids.util.TypeCoercionUtils; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * bind an unbound logicalOlapTableSink represent the target table of an insert command - */ -public class BindInsertTargetTable extends OneAnalysisRuleFactory { - @Override - public Rule build() { - return unboundOlapTableSink() - .thenApply(ctx -> { - UnboundOlapTableSink<?> sink = ctx.root; - Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink); - Database database = pair.first; - OlapTable table = pair.second; - - LogicalPlan child = ((LogicalPlan) sink.child()); - - LogicalOlapTableSink<?> boundSink = new LogicalOlapTableSink<>( - database, - table, - bindTargetColumns(table, sink.getColNames()), - bindPartitionIds(table, sink.getPartitions()), - sink.isPartialUpdate(), - sink.child()); - - // we need to insert all the columns of the target table although some columns are not mentions. - // so we add a projects to supply the default value. - - if (boundSink.getCols().size() != child.getOutput().size()) { - throw new AnalysisException("insert into cols should be corresponding to the query output"); - } - - Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap(); - for (int i = 0; i < boundSink.getCols().size(); ++i) { - columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); - } - - Map<String, NamedExpression> columnToOutput = Maps.newLinkedHashMap(); - NereidsParser expressionParser = new NereidsParser(); - - // generate slots not mentioned in sql, mv slots and shaded slots. - for (Column column : boundSink.getTargetTable().getFullSchema()) { - if (column.isMaterializedViewColumn()) { - List<SlotRef> refs = column.getRefColumns(); - // now we have to replace the column to slots. - Preconditions.checkArgument(refs != null, - "mv column's ref column cannot be null"); - Expression parsedExpression = expressionParser.parseExpression( - column.getDefineExpr().toSql()); - Expression boundExpression = SlotReplacer.INSTANCE - .replace(parsedExpression, columnToOutput); - - NamedExpression slot = boundExpression instanceof NamedExpression - ? ((NamedExpression) boundExpression) - : new Alias(boundExpression, boundExpression.toSql()); - - columnToOutput.put(column.getName(), slot); - } else if (columnToChildOutput.containsKey(column)) { - columnToOutput.put(column.getName(), columnToChildOutput.get(column)); - } else { - if (table.hasSequenceCol() - && column.getName().equals(Column.SEQUENCE_COL) - && table.getSequenceMapCol() != null) { - Column seqCol = table.getFullSchema().stream() - .filter(col -> col.getName().equals(table.getSequenceMapCol())) - .findFirst().get(); - columnToOutput.put(column.getName(), columnToOutput.get(seqCol.getName())); - } else if (column.getDefaultValue() == null) { - columnToOutput.put(column.getName(), new Alias( - new NullLiteral(DataType.fromCatalogType(column.getType())), - column.getName() - )); - } else { - columnToOutput.put(column.getName(), new Alias(Literal.of(column.getDefaultValue()) - .checkedCastTo(DataType.fromCatalogType(column.getType())), column.getName())); - } - } - } - List<NamedExpression> fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); - - LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, boundSink.child()); - - // add cast project - List<NamedExpression> castExprs = Lists.newArrayList(); - for (int i = 0; i < table.getFullSchema().size(); ++i) { - Expression castExpr = TypeCoercionUtils.castIfNotSameType(fullOutputExprs.get(i), - DataType.fromCatalogType(table.getFullSchema().get(i).getType())); - if (castExpr instanceof NamedExpression) { - castExprs.add(((NamedExpression) castExpr)); - } else { - castExprs.add(new Alias(castExpr, castExpr.toSql())); - } - } - if (!castExprs.equals(fullOutputExprs)) { - fullOutputProject = new LogicalProject<Plan>(castExprs, fullOutputProject); - } - - return boundSink.withChildren(fullOutputProject); - - }).toRule(RuleType.BINDING_INSERT_TARGET_TABLE); - } - - private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundOlapTableSink<? extends Plan> sink) { - List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), - sink.getNameParts()); - Pair<DatabaseIf, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, - cascadesContext.getConnectContext().getEnv()); - if (!(pair.second instanceof OlapTable)) { - throw new AnalysisException("the target table of insert into is not an OLAP table"); - } - return Pair.of(((Database) pair.first), (OlapTable) pair.second); - } - - private List<Long> bindPartitionIds(OlapTable table, List<String> partitions) { - return partitions.isEmpty() - ? ImmutableList.of() - : partitions.stream().map(pn -> { - Partition partition = table.getPartition(pn); - if (partition == null) { - throw new AnalysisException(String.format("partition %s is not found in table %s", - pn, table.getName())); - } - return partition.getId(); - }).collect(Collectors.toList()); - } - - private List<Column> bindTargetColumns(OlapTable table, List<String> colsName) { - return colsName.isEmpty() - ? table.getFullSchema().stream().filter(column -> column.isVisible() - && !column.isMaterializedViewColumn()) - .collect(Collectors.toList()) - : colsName.stream().map(cn -> { - Column column = table.getColumn(cn); - if (column == null) { - throw new AnalysisException(String.format("column %s is not found in table %s", - cn, table.getName())); - } - return column; - }).collect(Collectors.toList()); - } - - private static class SlotReplacer extends DefaultExpressionRewriter<Map<String, NamedExpression>> { - public static final SlotReplacer INSTANCE = new SlotReplacer(); - - public Expression replace(Expression e, Map<String, NamedExpression> replaceMap) { - return e.accept(this, replaceMap); - } - - @Override - public Expression visitUnboundSlot(UnboundSlot unboundSlot, Map<String, NamedExpression> replaceMap) { - return replaceMap.get(unboundSlot.getName()); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java new file mode 100644 index 0000000000..69896e44ee --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.analysis; + +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.nereids.util.TypeCoercionUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * bind an unbound logicalOlapTableSink represent the target table of an insert command + */ +public class BindSink implements AnalysisRuleFactory { + + @Override + public List<Rule> buildRules() { + return ImmutableList.of( + RuleType.BINDING_INSERT_TARGET_TABLE.build( + unboundOlapTableSink().thenApply(ctx -> { + UnboundOlapTableSink<?> sink = ctx.root; + Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink); + Database database = pair.first; + OlapTable table = pair.second; + + LogicalPlan child = ((LogicalPlan) sink.child()); + + LogicalOlapTableSink<?> boundSink = new LogicalOlapTableSink<>( + database, + table, + bindTargetColumns(table, sink.getColNames()), + bindPartitionIds(table, sink.getPartitions()), + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.isPartialUpdate(), + sink.child()); + + // we need to insert all the columns of the target table + // although some columns are not mentions. + // so we add a projects to supply the default value. + + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException( + "insert into cols should be corresponding to the query output"); + } + + Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap(); + for (int i = 0; i < boundSink.getCols().size(); ++i) { + columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); + } + + Map<String, NamedExpression> columnToOutput = Maps.newLinkedHashMap(); + NereidsParser expressionParser = new NereidsParser(); + + // generate slots not mentioned in sql, mv slots and shaded slots. + for (Column column : boundSink.getTargetTable().getFullSchema()) { + if (column.isMaterializedViewColumn()) { + List<SlotRef> refs = column.getRefColumns(); + // now we have to replace the column to slots. + Preconditions.checkArgument(refs != null, + "mv column's ref column cannot be null"); + Expression parsedExpression = expressionParser.parseExpression( + column.getDefineExpr().toSql()); + Expression boundExpression = SlotReplacer.INSTANCE + .replace(parsedExpression, columnToOutput); + + NamedExpression slot = boundExpression instanceof NamedExpression + ? ((NamedExpression) boundExpression) + : new Alias(boundExpression, boundExpression.toSql()); + + columnToOutput.put(column.getName(), slot); + } else if (columnToChildOutput.containsKey(column)) { + columnToOutput.put(column.getName(), columnToChildOutput.get(column)); + } else { + if (table.hasSequenceCol() + && column.getName().equals(Column.SEQUENCE_COL) + && table.getSequenceMapCol() != null) { + Column seqCol = table.getFullSchema().stream() + .filter(col -> col.getName().equals(table.getSequenceMapCol())) + .findFirst().get(); + columnToOutput.put(column.getName(), columnToOutput.get(seqCol.getName())); + } else if (column.getDefaultValue() == null) { + columnToOutput.put(column.getName(), new Alias( + new NullLiteral(DataType.fromCatalogType(column.getType())), + column.getName() + )); + } else { + columnToOutput.put(column.getName(), + new Alias(Literal.of(column.getDefaultValue()) + .checkedCastTo(DataType.fromCatalogType(column.getType())), + column.getName())); + } + } + } + List<NamedExpression> fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); + + LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, + boundSink.child()); + + // add cast project + List<NamedExpression> castExprs = Lists.newArrayList(); + for (int i = 0; i < table.getFullSchema().size(); ++i) { + Expression castExpr = TypeCoercionUtils.castIfNotSameType(fullOutputExprs.get(i), + DataType.fromCatalogType(table.getFullSchema().get(i).getType())); + if (castExpr instanceof NamedExpression) { + castExprs.add(((NamedExpression) castExpr)); + } else { + castExprs.add(new Alias(castExpr, castExpr.toSql())); + } + } + if (!castExprs.equals(fullOutputExprs)) { + fullOutputProject = new LogicalProject<Plan>(castExprs, fullOutputProject); + } + + return boundSink.withChildAndUpdateOutput(fullOutputProject); + + })), + RuleType.BINDING_INSERT_FILE.build( + logicalFileSink().when(s -> s.getOutputExprs().isEmpty()) + .then(fileSink -> fileSink.withOutputExprs( + fileSink.child().getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()))) + ) + ); + } + + private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundOlapTableSink<? extends Plan> sink) { + List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair<DatabaseIf, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv()); + if (!(pair.second instanceof OlapTable)) { + throw new AnalysisException("the target table of insert into is not an OLAP table"); + } + return Pair.of(((Database) pair.first), (OlapTable) pair.second); + } + + private List<Long> bindPartitionIds(OlapTable table, List<String> partitions) { + return partitions.isEmpty() + ? ImmutableList.of() + : partitions.stream().map(pn -> { + Partition partition = table.getPartition(pn); + if (partition == null) { + throw new AnalysisException(String.format("partition %s is not found in table %s", + pn, table.getName())); + } + return partition.getId(); + }).collect(Collectors.toList()); + } + + private List<Column> bindTargetColumns(OlapTable table, List<String> colsName) { + return colsName.isEmpty() + ? table.getFullSchema().stream().filter(column -> column.isVisible() + && !column.isMaterializedViewColumn()) + .collect(Collectors.toList()) + : colsName.stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(Collectors.toList()); + } + + private static class SlotReplacer extends DefaultExpressionRewriter<Map<String, NamedExpression>> { + public static final SlotReplacer INSTANCE = new SlotReplacer(); + + public Expression replace(Expression e, Map<String, NamedExpression> replaceMap) { + return e.accept(this, replaceMap); + } + + @Override + public Expression visitUnboundSlot(UnboundSlot unboundSlot, Map<String, NamedExpression> replaceMap) { + return replaceMap.get(unboundSlot.getName()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java index 5dfbd4eba9..277d0dc903 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -54,7 +53,8 @@ public class LogicalDeferMaterializeResultSink<CHILD_TYPE extends Plan> OlapTable olapTable, long selectedIndexId, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - super(logicalResultSink.getType(), groupExpression, logicalProperties, child); + super(logicalResultSink.getType(), logicalResultSink.getOutputExprs(), + groupExpression, logicalProperties, child); this.logicalResultSink = logicalResultSink; this.olapTable = olapTable; this.selectedIndexId = selectedIndexId; @@ -109,11 +109,6 @@ public class LogicalDeferMaterializeResultSink<CHILD_TYPE extends Plan> olapTable, selectedIndexId, groupExpression, logicalProperties, children.get(0)); } - @Override - public List<Slot> computeOutput() { - return child().getOutput(); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java index 89918d2317..0dda497d28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java @@ -19,15 +19,13 @@ package org.apache.doris.nereids.trees.plans.logical; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.List; @@ -44,23 +42,29 @@ public class LogicalFileSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_ private final String format; private final Map<String, String> properties; - public LogicalFileSink(String filePath, String format, Map<String, String> properties, CHILD_TYPE child) { - this(filePath, format, properties, Optional.empty(), Optional.empty(), child); + public LogicalFileSink(String filePath, String format, + Map<String, String> properties, List<NamedExpression> outputExprs, CHILD_TYPE child) { + this(filePath, format, properties, outputExprs, Optional.empty(), Optional.empty(), child); } public LogicalFileSink(String filePath, String format, Map<String, String> properties, + List<NamedExpression> outputExprs, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_FILE_SINK, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_FILE_SINK, outputExprs, groupExpression, logicalProperties, child); this.filePath = Objects.requireNonNull(filePath); this.format = Objects.requireNonNull(format); this.properties = ImmutableMap.copyOf(Objects.requireNonNull(properties)); } + public Plan withOutputExprs(List<NamedExpression> outputExprs) { + return new LogicalFileSink<>(filePath, format, properties, outputExprs, child()); + } + @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalFileSink<>(filePath, format, properties, children.get(0)); + return new LogicalFileSink<>(filePath, format, properties, outputExprs, children.get(0)); } @Override @@ -68,11 +72,6 @@ public class LogicalFileSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_ return visitor.visitLogicalFileSink(this, context); } - @Override - public List<? extends Expression> getExpressions() { - return ImmutableList.of(); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -81,33 +80,31 @@ public class LogicalFileSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_ if (o == null || getClass() != o.getClass()) { return false; } + if (!super.equals(o)) { + return false; + } LogicalFileSink<?> that = (LogicalFileSink<?>) o; - return Objects.equals(filePath, that.filePath) - && Objects.equals(format, that.format) + return Objects.equals(filePath, that.filePath) && Objects.equals(format, that.format) && Objects.equals(properties, that.properties); } @Override public int hashCode() { - return Objects.hash(filePath, format, properties); + return Objects.hash(super.hashCode(), filePath, format, properties); } @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalFileSink<>(filePath, format, properties, groupExpression, - Optional.of(getLogicalProperties()), child()); + return new LogicalFileSink<>(filePath, format, properties, outputExprs, + groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalFileSink<>(filePath, format, properties, groupExpression, logicalProperties, children.get(0)); - } - - @Override - public List<Slot> computeOutput() { - return child().getOutput(); + return new LogicalFileSink<>(filePath, format, properties, outputExprs, + groupExpression, logicalProperties, children.get(0)); } public String getFilePath() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index 631b6b6ab3..3f59872d36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -22,8 +22,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; @@ -49,18 +48,19 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C private final boolean isPartialUpdate; public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols, List<Long> partitionIds, - boolean isPartialUpdate, CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, isPartialUpdate, Optional.empty(), Optional.empty(), child); + List<NamedExpression> outputExprs, boolean isPartialUpdate, CHILD_TYPE child) { + this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, + Optional.empty(), Optional.empty(), child); } /** * constructor */ public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols, - List<Long> partitionIds, boolean isPartialUpdate, Optional<GroupExpression> groupExpression, - Optional<LogicalProperties> logicalProperties, + List<Long> partitionIds, List<NamedExpression> outputExprs, boolean isPartialUpdate, + Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); this.cols = Utils.copyRequiredList(cols); @@ -68,10 +68,18 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C this.partitionIds = Utils.copyRequiredList(partitionIds); } + public Plan withChildAndUpdateOutput(Plan child) { + List<NamedExpression> output = child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, output, isPartialUpdate, + Optional.empty(), Optional.empty(), child); + } + @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child"); - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, Optional.empty(), Optional.empty(), children.get(0)); } @@ -103,17 +111,18 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C if (o == null || getClass() != o.getClass()) { return false; } - LogicalOlapTableSink<?> sink = (LogicalOlapTableSink<?>) o; - return isPartialUpdate == sink.isPartialUpdate() - && Objects.equals(database, sink.database) - && Objects.equals(targetTable, sink.targetTable) - && Objects.equals(partitionIds, sink.partitionIds) - && Objects.equals(cols, sink.cols); + if (!super.equals(o)) { + return false; + } + LogicalOlapTableSink<?> that = (LogicalOlapTableSink<?>) o; + return isPartialUpdate == that.isPartialUpdate && Objects.equals(database, that.database) + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) + && Objects.equals(partitionIds, that.partitionIds); } @Override public int hashCode() { - return Objects.hash(database, targetTable, partitionIds, cols, isPartialUpdate); + return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, isPartialUpdate); } @Override @@ -121,26 +130,16 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C return visitor.visitLogicalOlapTableSink(this, context); } - @Override - public List<? extends Expression> getExpressions() { - return ImmutableList.of(); - } - @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression, - Optional.of(getLogicalProperties()), child()); + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, + groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression, - logicalProperties, children.get(0)); - } - - @Override - public List<Slot> computeOutput() { - return child().getOutput(); + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, + groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java index cc2eb0cf37..6e754b491a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java @@ -19,9 +19,7 @@ package org.apache.doris.nereids.trees.plans.logical; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; @@ -29,10 +27,8 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import java.util.List; -import java.util.Objects; import java.util.Optional; /** @@ -40,22 +36,14 @@ import java.util.Optional; */ public class LogicalResultSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE> implements Sink { - private final List<NamedExpression> outputExprs; - public LogicalResultSink(List<NamedExpression> outputExprs, CHILD_TYPE child) { - super(PlanType.LOGICAL_RESULT_SINK, child); - this.outputExprs = outputExprs; + super(PlanType.LOGICAL_RESULT_SINK, outputExprs, child); } public LogicalResultSink(List<NamedExpression> outputExprs, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_RESULT_SINK, groupExpression, logicalProperties, child); - this.outputExprs = outputExprs; - } - - public List<NamedExpression> getOutputExprs() { - return outputExprs; + super(PlanType.LOGICAL_RESULT_SINK, outputExprs, groupExpression, logicalProperties, child); } @Override @@ -70,11 +58,6 @@ public class LogicalResultSink<CHILD_TYPE extends Plan> extends LogicalSink<CHIL return visitor.visitLogicalResultSink(this, context); } - @Override - public List<? extends Expression> getExpressions() { - return outputExprs; - } - @Override public LogicalResultSink<Plan> withGroupExpression(Optional<GroupExpression> groupExpression) { return new LogicalResultSink<>(outputExprs, groupExpression, Optional.of(getLogicalProperties()), child()); @@ -87,33 +70,9 @@ public class LogicalResultSink<CHILD_TYPE extends Plan> extends LogicalSink<CHIL return new LogicalResultSink<>(outputExprs, groupExpression, logicalProperties, children.get(0)); } - @Override - public List<Slot> computeOutput() { - return outputExprs.stream() - .map(NamedExpression::toSlot) - .collect(ImmutableList.toImmutableList()); - } - @Override public String toString() { return Utils.toSqlString("LogicalResultSink[" + id.asInt() + "]", "outputExprs", outputExprs); } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LogicalResultSink<?> that = (LogicalResultSink<?>) o; - return Objects.equals(outputExprs, that.outputExprs); - } - - @Override - public int hashCode() { - return Objects.hash(outputExprs); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java index fd98c29a05..3d10c639b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java @@ -19,21 +19,68 @@ package org.apache.doris.nereids.trees.plans.logical; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; import java.util.Optional; /** abstract logical sink */ public abstract class LogicalSink<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> { - public LogicalSink(PlanType type, CHILD_TYPE child) { + protected final List<NamedExpression> outputExprs; + + public LogicalSink(PlanType type, List<NamedExpression> outputExprs, CHILD_TYPE child) { super(type, child); + this.outputExprs = ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should not null")); } - public LogicalSink(PlanType type, + public LogicalSink(PlanType type, List<NamedExpression> outputExprs, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { super(type, groupExpression, logicalProperties, child); + this.outputExprs = ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should not null")); + } + + public List<NamedExpression> getOutputExprs() { + return outputExprs; + } + + @Override + public List<? extends Expression> getExpressions() { + return outputExprs; + } + + @Override + public List<Slot> computeOutput() { + return outputExprs.stream() + .map(NamedExpression::toSlot) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalSink<?> that = (LogicalSink<?>) o; + return Objects.equals(outputExprs, that.outputExprs); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), outputExprs); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org