This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 13d24297a7 [fix](Nereids) type check could not work when root node is 
table or file sink (#22902)
13d24297a7 is described below

commit 13d24297a716c6994d3a3924d3bb73ca650acb88
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Tue Aug 15 11:45:16 2023 +0800

    [fix](Nereids) type check could not work when root node is table or file 
sink (#22902)
    
    type check could not work because no expression in plan.
    sink and scan have no expression at all. so cannot check type.
    this pr add expression on logical sink to let type check work well
---
 .../nereids/analyzer/UnboundOlapTableSink.java     |   3 +-
 .../doris/nereids/analyzer/UnboundResultSink.java  |   5 +-
 .../doris/nereids/jobs/executor/Analyzer.java      |   4 +-
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   2 +-
 .../org/apache/doris/nereids/rules/RuleType.java   |   1 +
 .../rules/analysis/BindInsertTargetTable.java      | 209 -------------------
 .../doris/nereids/rules/analysis/BindSink.java     | 227 +++++++++++++++++++++
 .../logical/LogicalDeferMaterializeResultSink.java |   9 +-
 .../trees/plans/logical/LogicalFileSink.java       |  43 ++--
 .../trees/plans/logical/LogicalOlapTableSink.java  |  57 +++---
 .../trees/plans/logical/LogicalResultSink.java     |  45 +---
 .../nereids/trees/plans/logical/LogicalSink.java   |  51 ++++-
 12 files changed, 337 insertions(+), 319 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java
index 484ab16b50..b7306fd44e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java
@@ -31,6 +31,7 @@ import 
org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.Utils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 import java.util.List;
 import java.util.Objects;
@@ -63,7 +64,7 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> 
extends LogicalSink<C
     public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, 
List<String> hints,
             List<String> partitions, boolean isPartialUpdate, 
Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
-        super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, groupExpression, 
logicalProperties, child);
+        super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), 
groupExpression, logicalProperties, child);
         this.nameParts = Utils.copyRequiredList(nameParts);
         this.colNames = Utils.copyRequiredList(colNames);
         this.hints = Utils.copyRequiredList(hints);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java
index 9743d0fe8f..3c1519d6f8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java
@@ -30,6 +30,7 @@ import 
org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.Utils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 import java.util.List;
 import java.util.Optional;
@@ -40,12 +41,12 @@ import java.util.Optional;
 public class UnboundResultSink<CHILD_TYPE extends Plan> extends 
LogicalSink<CHILD_TYPE> implements Unbound, Sink {
 
     public UnboundResultSink(CHILD_TYPE child) {
-        super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, child);
+        super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, ImmutableList.of(), child);
     }
 
     public UnboundResultSink(Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
-        super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, groupExpression, 
logicalProperties, child);
+        super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, ImmutableList.of(), 
groupExpression, logicalProperties, child);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java
index 96afa6b24a..a620d6b5b1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java
@@ -22,9 +22,9 @@ import org.apache.doris.nereids.jobs.rewrite.RewriteJob;
 import 
org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySet;
 import org.apache.doris.nereids.rules.analysis.AnalyzeCTE;
 import org.apache.doris.nereids.rules.analysis.BindExpression;
-import org.apache.doris.nereids.rules.analysis.BindInsertTargetTable;
 import org.apache.doris.nereids.rules.analysis.BindRelation;
 import 
org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver;
+import org.apache.doris.nereids.rules.analysis.BindSink;
 import org.apache.doris.nereids.rules.analysis.CheckAnalysis;
 import org.apache.doris.nereids.rules.analysis.CheckBound;
 import org.apache.doris.nereids.rules.analysis.CheckPolicy;
@@ -86,7 +86,7 @@ public class Analyzer extends AbstractBatchJobExecutor {
                 new UserAuthentication(),
                 new BindExpression()
             ),
-            topDown(new BindInsertTargetTable()),
+            topDown(new BindSink()),
             bottomUp(new CheckBound()),
             bottomUp(
                 new ProjectToGlobalAggregate(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index a23825a842..c73ca04a2f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -1496,7 +1496,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             properties.put(key, value);
         }
         Literal filePath = (Literal) visit(ctx.filePath);
-        return new LogicalFileSink<>(filePath.getStringValue(), format, 
properties, plan);
+        return new LogicalFileSink<>(filePath.getStringValue(), format, 
properties, ImmutableList.of(), plan);
     }
 
     private LogicalPlan withQueryOrganization(LogicalPlan inputPlan, 
QueryOrganizationContext ctx) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index fbefe06456..fb35699164 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -57,6 +57,7 @@ public enum RuleType {
     BINDING_SET_OPERATION_SLOT(RuleTypeClass.REWRITE),
     BINDING_GENERATE_FUNCTION(RuleTypeClass.REWRITE),
     BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
+    BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
 
     REPLACE_SORT_EXPRESSION_BY_CHILD_OUTPUT(RuleTypeClass.REWRITE),
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java
deleted file mode 100644
index 750413b919..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java
+++ /dev/null
@@ -1,209 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.rules.analysis;
-
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DatabaseIf;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.Pair;
-import org.apache.doris.nereids.CascadesContext;
-import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
-import org.apache.doris.nereids.analyzer.UnboundSlot;
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.parser.NereidsParser;
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.trees.expressions.Alias;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.NamedExpression;
-import org.apache.doris.nereids.trees.expressions.literal.Literal;
-import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
-import 
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
-import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
-import org.apache.doris.nereids.types.DataType;
-import org.apache.doris.nereids.util.RelationUtil;
-import org.apache.doris.nereids.util.TypeCoercionUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * bind an unbound logicalOlapTableSink represent the target table of an 
insert command
- */
-public class BindInsertTargetTable extends OneAnalysisRuleFactory {
-    @Override
-    public Rule build() {
-        return unboundOlapTableSink()
-                .thenApply(ctx -> {
-                    UnboundOlapTableSink<?> sink = ctx.root;
-                    Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, 
sink);
-                    Database database = pair.first;
-                    OlapTable table = pair.second;
-
-                    LogicalPlan child = ((LogicalPlan) sink.child());
-
-                    LogicalOlapTableSink<?> boundSink = new 
LogicalOlapTableSink<>(
-                            database,
-                            table,
-                            bindTargetColumns(table, sink.getColNames()),
-                            bindPartitionIds(table, sink.getPartitions()),
-                            sink.isPartialUpdate(),
-                            sink.child());
-
-                    // we need to insert all the columns of the target table 
although some columns are not mentions.
-                    // so we add a projects to supply the default value.
-
-                    if (boundSink.getCols().size() != 
child.getOutput().size()) {
-                        throw new AnalysisException("insert into cols should 
be corresponding to the query output");
-                    }
-
-                    Map<Column, NamedExpression> columnToChildOutput = 
Maps.newHashMap();
-                    for (int i = 0; i < boundSink.getCols().size(); ++i) {
-                        columnToChildOutput.put(boundSink.getCols().get(i), 
child.getOutput().get(i));
-                    }
-
-                    Map<String, NamedExpression> columnToOutput = 
Maps.newLinkedHashMap();
-                    NereidsParser expressionParser = new NereidsParser();
-
-                    // generate slots not mentioned in sql, mv slots and 
shaded slots.
-                    for (Column column : 
boundSink.getTargetTable().getFullSchema()) {
-                        if (column.isMaterializedViewColumn()) {
-                            List<SlotRef> refs = column.getRefColumns();
-                            // now we have to replace the column to slots.
-                            Preconditions.checkArgument(refs != null,
-                                    "mv column's ref column cannot be null");
-                            Expression parsedExpression = 
expressionParser.parseExpression(
-                                    column.getDefineExpr().toSql());
-                            Expression boundExpression = SlotReplacer.INSTANCE
-                                    .replace(parsedExpression, columnToOutput);
-
-                            NamedExpression slot = boundExpression instanceof 
NamedExpression
-                                    ? ((NamedExpression) boundExpression)
-                                    : new Alias(boundExpression, 
boundExpression.toSql());
-
-                            columnToOutput.put(column.getName(), slot);
-                        } else if (columnToChildOutput.containsKey(column)) {
-                            columnToOutput.put(column.getName(), 
columnToChildOutput.get(column));
-                        } else {
-                            if (table.hasSequenceCol()
-                                    && 
column.getName().equals(Column.SEQUENCE_COL)
-                                    && table.getSequenceMapCol() != null) {
-                                Column seqCol = table.getFullSchema().stream()
-                                        .filter(col -> 
col.getName().equals(table.getSequenceMapCol()))
-                                        .findFirst().get();
-                                columnToOutput.put(column.getName(), 
columnToOutput.get(seqCol.getName()));
-                            } else if (column.getDefaultValue() == null) {
-                                columnToOutput.put(column.getName(), new Alias(
-                                        new 
NullLiteral(DataType.fromCatalogType(column.getType())),
-                                        column.getName()
-                                ));
-                            } else {
-                                columnToOutput.put(column.getName(), new 
Alias(Literal.of(column.getDefaultValue())
-                                        
.checkedCastTo(DataType.fromCatalogType(column.getType())), column.getName()));
-                            }
-                        }
-                    }
-                    List<NamedExpression> fullOutputExprs = 
ImmutableList.copyOf(columnToOutput.values());
-
-                    LogicalProject<?> fullOutputProject = new 
LogicalProject<>(fullOutputExprs, boundSink.child());
-
-                    // add cast project
-                    List<NamedExpression> castExprs = Lists.newArrayList();
-                    for (int i = 0; i < table.getFullSchema().size(); ++i) {
-                        Expression castExpr = 
TypeCoercionUtils.castIfNotSameType(fullOutputExprs.get(i),
-                                
DataType.fromCatalogType(table.getFullSchema().get(i).getType()));
-                        if (castExpr instanceof NamedExpression) {
-                            castExprs.add(((NamedExpression) castExpr));
-                        } else {
-                            castExprs.add(new Alias(castExpr, 
castExpr.toSql()));
-                        }
-                    }
-                    if (!castExprs.equals(fullOutputExprs)) {
-                        fullOutputProject = new 
LogicalProject<Plan>(castExprs, fullOutputProject);
-                    }
-
-                    return boundSink.withChildren(fullOutputProject);
-
-                }).toRule(RuleType.BINDING_INSERT_TARGET_TABLE);
-    }
-
-    private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, 
UnboundOlapTableSink<? extends Plan> sink) {
-        List<String> tableQualifier = 
RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
-                sink.getNameParts());
-        Pair<DatabaseIf, TableIf> pair = 
RelationUtil.getDbAndTable(tableQualifier,
-                cascadesContext.getConnectContext().getEnv());
-        if (!(pair.second instanceof OlapTable)) {
-            throw new AnalysisException("the target table of insert into is 
not an OLAP table");
-        }
-        return Pair.of(((Database) pair.first), (OlapTable) pair.second);
-    }
-
-    private List<Long> bindPartitionIds(OlapTable table, List<String> 
partitions) {
-        return partitions.isEmpty()
-                ? ImmutableList.of()
-                : partitions.stream().map(pn -> {
-                    Partition partition = table.getPartition(pn);
-                    if (partition == null) {
-                        throw new AnalysisException(String.format("partition 
%s is not found in table %s",
-                                pn, table.getName()));
-                    }
-                    return partition.getId();
-                }).collect(Collectors.toList());
-    }
-
-    private List<Column> bindTargetColumns(OlapTable table, List<String> 
colsName) {
-        return colsName.isEmpty()
-                ? table.getFullSchema().stream().filter(column -> 
column.isVisible()
-                        && !column.isMaterializedViewColumn())
-                .collect(Collectors.toList())
-                : colsName.stream().map(cn -> {
-                    Column column = table.getColumn(cn);
-                    if (column == null) {
-                        throw new AnalysisException(String.format("column %s 
is not found in table %s",
-                                cn, table.getName()));
-                    }
-                    return column;
-                }).collect(Collectors.toList());
-    }
-
-    private static class SlotReplacer extends 
DefaultExpressionRewriter<Map<String, NamedExpression>> {
-        public static final SlotReplacer INSTANCE = new SlotReplacer();
-
-        public Expression replace(Expression e, Map<String, NamedExpression> 
replaceMap) {
-            return e.accept(this, replaceMap);
-        }
-
-        @Override
-        public Expression visitUnboundSlot(UnboundSlot unboundSlot, 
Map<String, NamedExpression> replaceMap) {
-            return replaceMap.get(unboundSlot.getName());
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
new file mode 100644
index 0000000000..69896e44ee
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import 
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * bind an unbound logicalOlapTableSink represent the target table of an 
insert command
+ */
+public class BindSink implements AnalysisRuleFactory {
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                RuleType.BINDING_INSERT_TARGET_TABLE.build(
+                        unboundOlapTableSink().thenApply(ctx -> {
+                            UnboundOlapTableSink<?> sink = ctx.root;
+                            Pair<Database, OlapTable> pair = 
bind(ctx.cascadesContext, sink);
+                            Database database = pair.first;
+                            OlapTable table = pair.second;
+
+                            LogicalPlan child = ((LogicalPlan) sink.child());
+
+                            LogicalOlapTableSink<?> boundSink = new 
LogicalOlapTableSink<>(
+                                    database,
+                                    table,
+                                    bindTargetColumns(table, 
sink.getColNames()),
+                                    bindPartitionIds(table, 
sink.getPartitions()),
+                                    child.getOutput().stream()
+                                            .map(NamedExpression.class::cast)
+                                            
.collect(ImmutableList.toImmutableList()),
+                                    sink.isPartialUpdate(),
+                                    sink.child());
+
+                            // we need to insert all the columns of the target 
table
+                            // although some columns are not mentions.
+                            // so we add a projects to supply the default 
value.
+
+                            if (boundSink.getCols().size() != 
child.getOutput().size()) {
+                                throw new AnalysisException(
+                                        "insert into cols should be 
corresponding to the query output");
+                            }
+
+                            Map<Column, NamedExpression> columnToChildOutput = 
Maps.newHashMap();
+                            for (int i = 0; i < boundSink.getCols().size(); 
++i) {
+                                
columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i));
+                            }
+
+                            Map<String, NamedExpression> columnToOutput = 
Maps.newLinkedHashMap();
+                            NereidsParser expressionParser = new 
NereidsParser();
+
+                            // generate slots not mentioned in sql, mv slots 
and shaded slots.
+                            for (Column column : 
boundSink.getTargetTable().getFullSchema()) {
+                                if (column.isMaterializedViewColumn()) {
+                                    List<SlotRef> refs = 
column.getRefColumns();
+                                    // now we have to replace the column to 
slots.
+                                    Preconditions.checkArgument(refs != null,
+                                            "mv column's ref column cannot be 
null");
+                                    Expression parsedExpression = 
expressionParser.parseExpression(
+                                            column.getDefineExpr().toSql());
+                                    Expression boundExpression = 
SlotReplacer.INSTANCE
+                                            .replace(parsedExpression, 
columnToOutput);
+
+                                    NamedExpression slot = boundExpression 
instanceof NamedExpression
+                                            ? ((NamedExpression) 
boundExpression)
+                                            : new Alias(boundExpression, 
boundExpression.toSql());
+
+                                    columnToOutput.put(column.getName(), slot);
+                                } else if 
(columnToChildOutput.containsKey(column)) {
+                                    columnToOutput.put(column.getName(), 
columnToChildOutput.get(column));
+                                } else {
+                                    if (table.hasSequenceCol()
+                                            && 
column.getName().equals(Column.SEQUENCE_COL)
+                                            && table.getSequenceMapCol() != 
null) {
+                                        Column seqCol = 
table.getFullSchema().stream()
+                                                .filter(col -> 
col.getName().equals(table.getSequenceMapCol()))
+                                                .findFirst().get();
+                                        columnToOutput.put(column.getName(), 
columnToOutput.get(seqCol.getName()));
+                                    } else if (column.getDefaultValue() == 
null) {
+                                        columnToOutput.put(column.getName(), 
new Alias(
+                                                new 
NullLiteral(DataType.fromCatalogType(column.getType())),
+                                                column.getName()
+                                        ));
+                                    } else {
+                                        columnToOutput.put(column.getName(),
+                                                new 
Alias(Literal.of(column.getDefaultValue())
+                                                        
.checkedCastTo(DataType.fromCatalogType(column.getType())),
+                                                        column.getName()));
+                                    }
+                                }
+                            }
+                            List<NamedExpression> fullOutputExprs = 
ImmutableList.copyOf(columnToOutput.values());
+
+                            LogicalProject<?> fullOutputProject = new 
LogicalProject<>(fullOutputExprs,
+                                    boundSink.child());
+
+                            // add cast project
+                            List<NamedExpression> castExprs = 
Lists.newArrayList();
+                            for (int i = 0; i < table.getFullSchema().size(); 
++i) {
+                                Expression castExpr = 
TypeCoercionUtils.castIfNotSameType(fullOutputExprs.get(i),
+                                        
DataType.fromCatalogType(table.getFullSchema().get(i).getType()));
+                                if (castExpr instanceof NamedExpression) {
+                                    castExprs.add(((NamedExpression) 
castExpr));
+                                } else {
+                                    castExprs.add(new Alias(castExpr, 
castExpr.toSql()));
+                                }
+                            }
+                            if (!castExprs.equals(fullOutputExprs)) {
+                                fullOutputProject = new 
LogicalProject<Plan>(castExprs, fullOutputProject);
+                            }
+
+                            return 
boundSink.withChildAndUpdateOutput(fullOutputProject);
+
+                        })),
+                RuleType.BINDING_INSERT_FILE.build(
+                        logicalFileSink().when(s -> 
s.getOutputExprs().isEmpty())
+                                .then(fileSink -> fileSink.withOutputExprs(
+                                        fileSink.child().getOutput().stream()
+                                                
.map(NamedExpression.class::cast)
+                                                
.collect(ImmutableList.toImmutableList())))
+                )
+        );
+    }
+
+    private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, 
UnboundOlapTableSink<? extends Plan> sink) {
+        List<String> tableQualifier = 
RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
+                sink.getNameParts());
+        Pair<DatabaseIf, TableIf> pair = 
RelationUtil.getDbAndTable(tableQualifier,
+                cascadesContext.getConnectContext().getEnv());
+        if (!(pair.second instanceof OlapTable)) {
+            throw new AnalysisException("the target table of insert into is 
not an OLAP table");
+        }
+        return Pair.of(((Database) pair.first), (OlapTable) pair.second);
+    }
+
+    private List<Long> bindPartitionIds(OlapTable table, List<String> 
partitions) {
+        return partitions.isEmpty()
+                ? ImmutableList.of()
+                : partitions.stream().map(pn -> {
+                    Partition partition = table.getPartition(pn);
+                    if (partition == null) {
+                        throw new AnalysisException(String.format("partition 
%s is not found in table %s",
+                                pn, table.getName()));
+                    }
+                    return partition.getId();
+                }).collect(Collectors.toList());
+    }
+
+    private List<Column> bindTargetColumns(OlapTable table, List<String> 
colsName) {
+        return colsName.isEmpty()
+                ? table.getFullSchema().stream().filter(column -> 
column.isVisible()
+                        && !column.isMaterializedViewColumn())
+                .collect(Collectors.toList())
+                : colsName.stream().map(cn -> {
+                    Column column = table.getColumn(cn);
+                    if (column == null) {
+                        throw new AnalysisException(String.format("column %s 
is not found in table %s",
+                                cn, table.getName()));
+                    }
+                    return column;
+                }).collect(Collectors.toList());
+    }
+
+    private static class SlotReplacer extends 
DefaultExpressionRewriter<Map<String, NamedExpression>> {
+        public static final SlotReplacer INSTANCE = new SlotReplacer();
+
+        public Expression replace(Expression e, Map<String, NamedExpression> 
replaceMap) {
+            return e.accept(this, replaceMap);
+        }
+
+        @Override
+        public Expression visitUnboundSlot(UnboundSlot unboundSlot, 
Map<String, NamedExpression> replaceMap) {
+            return replaceMap.get(unboundSlot.getName());
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java
index 5dfbd4eba9..277d0dc903 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.Sink;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -54,7 +53,8 @@ public class LogicalDeferMaterializeResultSink<CHILD_TYPE 
extends Plan>
             OlapTable olapTable, long selectedIndexId,
             Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
             CHILD_TYPE child) {
-        super(logicalResultSink.getType(), groupExpression, logicalProperties, 
child);
+        super(logicalResultSink.getType(), logicalResultSink.getOutputExprs(),
+                groupExpression, logicalProperties, child);
         this.logicalResultSink = logicalResultSink;
         this.olapTable = olapTable;
         this.selectedIndexId = selectedIndexId;
@@ -109,11 +109,6 @@ public class LogicalDeferMaterializeResultSink<CHILD_TYPE 
extends Plan>
                 olapTable, selectedIndexId, groupExpression, 
logicalProperties, children.get(0));
     }
 
-    @Override
-    public List<Slot> computeOutput() {
-        return child().getOutput();
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java
index 89918d2317..0dda497d28 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java
@@ -19,15 +19,13 @@ package org.apache.doris.nereids.trees.plans.logical;
 
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.algebra.Sink;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 import java.util.List;
@@ -44,23 +42,29 @@ public class LogicalFileSink<CHILD_TYPE extends Plan> 
extends LogicalSink<CHILD_
     private final String format;
     private final Map<String, String> properties;
 
-    public LogicalFileSink(String filePath, String format, Map<String, String> 
properties, CHILD_TYPE child) {
-        this(filePath, format, properties, Optional.empty(), Optional.empty(), 
child);
+    public LogicalFileSink(String filePath, String format,
+            Map<String, String> properties, List<NamedExpression> outputExprs, 
CHILD_TYPE child) {
+        this(filePath, format, properties, outputExprs, Optional.empty(), 
Optional.empty(), child);
     }
 
     public LogicalFileSink(String filePath, String format, Map<String, String> 
properties,
+            List<NamedExpression> outputExprs,
             Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
             CHILD_TYPE child) {
-        super(PlanType.LOGICAL_FILE_SINK, groupExpression, logicalProperties, 
child);
+        super(PlanType.LOGICAL_FILE_SINK, outputExprs, groupExpression, 
logicalProperties, child);
         this.filePath = Objects.requireNonNull(filePath);
         this.format = Objects.requireNonNull(format);
         this.properties = 
ImmutableMap.copyOf(Objects.requireNonNull(properties));
     }
 
+    public Plan withOutputExprs(List<NamedExpression> outputExprs) {
+        return new LogicalFileSink<>(filePath, format, properties, 
outputExprs, child());
+    }
+
     @Override
     public Plan withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new LogicalFileSink<>(filePath, format, properties, 
children.get(0));
+        return new LogicalFileSink<>(filePath, format, properties, 
outputExprs, children.get(0));
     }
 
     @Override
@@ -68,11 +72,6 @@ public class LogicalFileSink<CHILD_TYPE extends Plan> 
extends LogicalSink<CHILD_
         return visitor.visitLogicalFileSink(this, context);
     }
 
-    @Override
-    public List<? extends Expression> getExpressions() {
-        return ImmutableList.of();
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -81,33 +80,31 @@ public class LogicalFileSink<CHILD_TYPE extends Plan> 
extends LogicalSink<CHILD_
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
+        if (!super.equals(o)) {
+            return false;
+        }
         LogicalFileSink<?> that = (LogicalFileSink<?>) o;
-        return Objects.equals(filePath, that.filePath)
-                && Objects.equals(format, that.format)
+        return Objects.equals(filePath, that.filePath) && 
Objects.equals(format, that.format)
                 && Objects.equals(properties, that.properties);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(filePath, format, properties);
+        return Objects.hash(super.hashCode(), filePath, format, properties);
     }
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new LogicalFileSink<>(filePath, format, properties, 
groupExpression,
-                Optional.of(getLogicalProperties()), child());
+        return new LogicalFileSink<>(filePath, format, properties, outputExprs,
+                groupExpression, Optional.of(getLogicalProperties()), child());
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new LogicalFileSink<>(filePath, format, properties, 
groupExpression, logicalProperties, children.get(0));
-    }
-
-    @Override
-    public List<Slot> computeOutput() {
-        return child().getOutput();
+        return new LogicalFileSink<>(filePath, format, properties, outputExprs,
+                groupExpression, logicalProperties, children.get(0));
     }
 
     public String getFilePath() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java
index 631b6b6ab3..3f59872d36 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java
@@ -22,8 +22,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.algebra.Sink;
@@ -49,18 +48,19 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> 
extends LogicalSink<C
     private final boolean isPartialUpdate;
 
     public LogicalOlapTableSink(Database database, OlapTable targetTable, 
List<Column> cols, List<Long> partitionIds,
-            boolean isPartialUpdate, CHILD_TYPE child) {
-        this(database, targetTable, cols, partitionIds, isPartialUpdate, 
Optional.empty(), Optional.empty(), child);
+            List<NamedExpression> outputExprs, boolean isPartialUpdate, 
CHILD_TYPE child) {
+        this(database, targetTable, cols, partitionIds, outputExprs, 
isPartialUpdate,
+                Optional.empty(), Optional.empty(), child);
     }
 
     /**
      * constructor
      */
     public LogicalOlapTableSink(Database database, OlapTable targetTable, 
List<Column> cols,
-            List<Long> partitionIds, boolean isPartialUpdate, 
Optional<GroupExpression> groupExpression,
-            Optional<LogicalProperties> logicalProperties,
+            List<Long> partitionIds, List<NamedExpression> outputExprs, 
boolean isPartialUpdate,
+            Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
             CHILD_TYPE child) {
-        super(PlanType.LOGICAL_OLAP_TABLE_SINK, groupExpression, 
logicalProperties, child);
+        super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, 
logicalProperties, child);
         this.database = Objects.requireNonNull(database, "database != null in 
LogicalOlapTableSink");
         this.targetTable = Objects.requireNonNull(targetTable, "targetTable != 
null in LogicalOlapTableSink");
         this.cols = Utils.copyRequiredList(cols);
@@ -68,10 +68,18 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> 
extends LogicalSink<C
         this.partitionIds = Utils.copyRequiredList(partitionIds);
     }
 
+    public Plan withChildAndUpdateOutput(Plan child) {
+        List<NamedExpression> output = child.getOutput().stream()
+                .map(NamedExpression.class::cast)
+                .collect(ImmutableList.toImmutableList());
+        return new LogicalOlapTableSink<>(database, targetTable, cols, 
partitionIds, output, isPartialUpdate,
+                Optional.empty(), Optional.empty(), child);
+    }
+
     @Override
     public Plan withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1, 
"LogicalOlapTableSink only accepts one child");
-        return new LogicalOlapTableSink<>(database, targetTable, cols, 
partitionIds, isPartialUpdate,
+        return new LogicalOlapTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs, isPartialUpdate,
                 Optional.empty(), Optional.empty(), children.get(0));
     }
 
@@ -103,17 +111,18 @@ public class LogicalOlapTableSink<CHILD_TYPE extends 
Plan> extends LogicalSink<C
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        LogicalOlapTableSink<?> sink = (LogicalOlapTableSink<?>) o;
-        return isPartialUpdate == sink.isPartialUpdate()
-                && Objects.equals(database, sink.database)
-                && Objects.equals(targetTable, sink.targetTable)
-                && Objects.equals(partitionIds, sink.partitionIds)
-                && Objects.equals(cols, sink.cols);
+        if (!super.equals(o)) {
+            return false;
+        }
+        LogicalOlapTableSink<?> that = (LogicalOlapTableSink<?>) o;
+        return isPartialUpdate == that.isPartialUpdate && 
Objects.equals(database, that.database)
+                && Objects.equals(targetTable, that.targetTable) && 
Objects.equals(cols, that.cols)
+                && Objects.equals(partitionIds, that.partitionIds);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(database, targetTable, partitionIds, cols, 
isPartialUpdate);
+        return Objects.hash(super.hashCode(), database, targetTable, cols, 
partitionIds, isPartialUpdate);
     }
 
     @Override
@@ -121,26 +130,16 @@ public class LogicalOlapTableSink<CHILD_TYPE extends 
Plan> extends LogicalSink<C
         return visitor.visitLogicalOlapTableSink(this, context);
     }
 
-    @Override
-    public List<? extends Expression> getExpressions() {
-        return ImmutableList.of();
-    }
-
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new LogicalOlapTableSink<>(database, targetTable, cols, 
partitionIds, isPartialUpdate, groupExpression,
-                Optional.of(getLogicalProperties()), child());
+        return new LogicalOlapTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs, isPartialUpdate,
+                groupExpression, Optional.of(getLogicalProperties()), child());
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
-        return new LogicalOlapTableSink<>(database, targetTable, cols, 
partitionIds, isPartialUpdate, groupExpression,
-                logicalProperties, children.get(0));
-    }
-
-    @Override
-    public List<Slot> computeOutput() {
-        return child().getOutput();
+        return new LogicalOlapTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs, isPartialUpdate,
+                groupExpression, logicalProperties, children.get(0));
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java
index cc2eb0cf37..6e754b491a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java
@@ -19,9 +19,7 @@ package org.apache.doris.nereids.trees.plans.logical;
 
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
-import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
-import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.algebra.Sink;
@@ -29,10 +27,8 @@ import 
org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.Utils;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -40,22 +36,14 @@ import java.util.Optional;
  */
 public class LogicalResultSink<CHILD_TYPE extends Plan> extends 
LogicalSink<CHILD_TYPE> implements Sink {
 
-    private final List<NamedExpression> outputExprs;
-
     public LogicalResultSink(List<NamedExpression> outputExprs, CHILD_TYPE 
child) {
-        super(PlanType.LOGICAL_RESULT_SINK, child);
-        this.outputExprs = outputExprs;
+        super(PlanType.LOGICAL_RESULT_SINK, outputExprs, child);
     }
 
     public LogicalResultSink(List<NamedExpression> outputExprs,
             Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
-        super(PlanType.LOGICAL_RESULT_SINK, groupExpression, 
logicalProperties, child);
-        this.outputExprs = outputExprs;
-    }
-
-    public List<NamedExpression> getOutputExprs() {
-        return outputExprs;
+        super(PlanType.LOGICAL_RESULT_SINK, outputExprs, groupExpression, 
logicalProperties, child);
     }
 
     @Override
@@ -70,11 +58,6 @@ public class LogicalResultSink<CHILD_TYPE extends Plan> 
extends LogicalSink<CHIL
         return visitor.visitLogicalResultSink(this, context);
     }
 
-    @Override
-    public List<? extends Expression> getExpressions() {
-        return outputExprs;
-    }
-
     @Override
     public LogicalResultSink<Plan> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
         return new LogicalResultSink<>(outputExprs, groupExpression, 
Optional.of(getLogicalProperties()), child());
@@ -87,33 +70,9 @@ public class LogicalResultSink<CHILD_TYPE extends Plan> 
extends LogicalSink<CHIL
         return new LogicalResultSink<>(outputExprs, groupExpression, 
logicalProperties, children.get(0));
     }
 
-    @Override
-    public List<Slot> computeOutput() {
-        return outputExprs.stream()
-                .map(NamedExpression::toSlot)
-                .collect(ImmutableList.toImmutableList());
-    }
-
     @Override
     public String toString() {
         return Utils.toSqlString("LogicalResultSink[" + id.asInt() + "]",
                 "outputExprs", outputExprs);
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        LogicalResultSink<?> that = (LogicalResultSink<?>) o;
-        return Objects.equals(outputExprs, that.outputExprs);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(outputExprs);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java
index fd98c29a05..3d10c639b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java
@@ -19,21 +19,68 @@ package org.apache.doris.nereids.trees.plans.logical;
 
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 
 /** abstract logical sink */
 public abstract class LogicalSink<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> {
 
-    public LogicalSink(PlanType type, CHILD_TYPE child) {
+    protected final List<NamedExpression> outputExprs;
+
+    public LogicalSink(PlanType type, List<NamedExpression> outputExprs, 
CHILD_TYPE child) {
         super(type, child);
+        this.outputExprs = 
ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should 
not null"));
     }
 
-    public LogicalSink(PlanType type,
+    public LogicalSink(PlanType type, List<NamedExpression> outputExprs,
             Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
         super(type, groupExpression, logicalProperties, child);
+        this.outputExprs = 
ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should 
not null"));
+    }
+
+    public List<NamedExpression> getOutputExprs() {
+        return outputExprs;
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return outputExprs;
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        return outputExprs.stream()
+                .map(NamedExpression::toSlot)
+                .collect(ImmutableList.toImmutableList());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        LogicalSink<?> that = (LogicalSink<?>) o;
+        return Objects.equals(outputExprs, that.outputExprs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), outputExprs);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to