morrySnow commented on code in PR #23485:
URL: https://github.com/apache/doris/pull/23485#discussion_r1322311929


##########
fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4:
##########
@@ -101,6 +99,53 @@ planType
     | ALL // default type
     ;
 
+loadStmt
+    : LOAD LABEL lableName=identifier
+        LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+        (withRemoteStorageSystem)?
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    | LOAD LABEL lableName=identifier
+        LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+        resourceDesc
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    | LOAD mysqlDataDesc
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    ;

Review Comment:
   u could merge them into one line and put it directly under statement



##########
fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4:
##########
@@ -101,6 +99,53 @@ planType
     | ALL // default type
     ;
 
+loadStmt
+    : LOAD LABEL lableName=identifier
+        LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+        (withRemoteStorageSystem)?
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    | LOAD LABEL lableName=identifier
+        LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+        resourceDesc
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    | LOAD mysqlDataDesc
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    ;
+
+dataDesc
+    : ((WITH)? mergeType)? DATA INFILE LEFT_PAREN filePaths+=STRING_LITERAL 
(COMMA filePath+=STRING_LITERAL)* RIGHT_PAREN
+        INTO TABLE tableName=multipartIdentifier
+        (PARTITION partition=identifierList)?
+        (COLUMNS TERMINATED BY comma=STRING_LITERAL)?
+        (LINES TERMINATED BY separator=STRING_LITERAL)?
+        (FORMAT AS format=identifier)?
+        (columns=identifierList)?
+        (columnsFromPath=colFromPath)?
+        (columnMapping=colMappingList)?
+        (preFilter=preFilterClause)?
+        (where=whereClause)?
+        (deleteOn=deleteOnClause)?
+        (sequenceColumn=sequenceColClause)?
+        (PROPERTIES LEFT_PAREN propertyItemList RIGHT_PAREN)?
+    | ((WITH)? mergeType)? DATA FROM TABLE tableName=multipartIdentifier
+        INTO TABLE tableName=multipartIdentifier
+        (PARTITION partition=identifierList)?
+        (columnMapping=colMappingList)?
+        (where=whereClause)?
+        (deleteOn=deleteOnClause)?
+        (PROPERTIES LEFT_PAREN propertyItemList RIGHT_PAREN)?

Review Comment:
   ```suggestion
           (propertyClause)?
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/BulkStorageDesc.java:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.datasource.property.S3ClientBEProperties;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+// Broker descriptor
+//
+// Broker example:
+// WITH S3/HDFS
+// (
+//   "username" = "user0",
+//   "password" = "password0"
+// )
+public class BulkStorageDesc implements Writable {

Review Comment:
   why writable?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java:
##########
@@ -533,6 +540,78 @@ public List<Pair<LogicalPlan, StatementContext>> 
visitMultiStatements(MultiState
         return logicalPlans;
     }
 
+    /**
+     * Visit load-statements.
+     */
+    public LogicalPlan visitLoad(LoadContext ctx) {
+
+        BulkStorageDesc bulkDesc = null;
+        DorisParser.LoadStmtContext loadStmt = ctx.loadStmt();
+        if (loadStmt.withRemoteStorageSystem() != null) {
+            Map<String, String> bulkProperties =

Review Comment:
   why copy to a new map?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java:
##########
@@ -533,6 +540,78 @@ public List<Pair<LogicalPlan, StatementContext>> 
visitMultiStatements(MultiState
         return logicalPlans;
     }
 
+    /**
+     * Visit load-statements.
+     */
+    public LogicalPlan visitLoad(LoadContext ctx) {
+
+        BulkStorageDesc bulkDesc = null;
+        DorisParser.LoadStmtContext loadStmt = ctx.loadStmt();
+        if (loadStmt.withRemoteStorageSystem() != null) {
+            Map<String, String> bulkProperties =
+                    new 
HashMap<>(visitPropertyItemList(loadStmt.withRemoteStorageSystem().brokerProperties));
+            if (loadStmt.withRemoteStorageSystem().S3() != null) {
+                bulkDesc = new BulkStorageDesc("S3", 
BulkStorageDesc.StorageType.S3, bulkProperties);
+            } else if (loadStmt.withRemoteStorageSystem().HDFS() != null) {
+                bulkDesc = new BulkStorageDesc("HDFS", 
BulkStorageDesc.StorageType.HDFS, bulkProperties);
+            } else if (loadStmt.withRemoteStorageSystem().LOCAL() != null) {
+                bulkDesc = new BulkStorageDesc("LOCAL_HDFS", 
BulkStorageDesc.StorageType.LOCAL, bulkProperties);
+            } else if (loadStmt.withRemoteStorageSystem().BROKER() != null
+                    && 
loadStmt.withRemoteStorageSystem().identifierOrText().getText() != null) {
+                bulkDesc = new 
BulkStorageDesc(loadStmt.withRemoteStorageSystem().identifierOrText().getText(),
+                        bulkProperties);
+            }
+        }
+        List<BulkLoadDataDesc> dataDescriptions = new ArrayList<>();
+        for (DorisParser.DataDescContext ddc : ctx.loadStmt().dataDescs) {
+            List<String> tableName = 
RelationUtil.getQualifierName(ConnectContext.get(),
+                    visitMultipartIdentifier(ddc.tableName));
+            List<String> colNames = (ddc.columns == null ? ImmutableList.of() 
: visitIdentifierList(ddc.columns));
+            List<String> columnsFromPath = (ddc.columnsFromPath == null ? 
ImmutableList.of()
+                        : 
visitIdentifierList(ddc.columnsFromPath.identifierList()));
+            List<String> partitions = ddc.partition == null ? 
ImmutableList.of() : visitIdentifierList(ddc.partition);
+            // TODO: multi location
+            List<String> multiFilePaths = new ArrayList<>();
+            for (Token filePath : ddc.filePaths) {
+                multiFilePaths.add(filePath.getText().substring(1, 
filePath.getText().length() - 1));
+            }
+            List<String> filePaths = ddc.filePath == null ? ImmutableList.of() 
: multiFilePaths;
+            List<Expression> colMappingList = (ddc.columnMapping == null ? 
ImmutableList.of()
+                        : visit(ddc.columnMapping.mappingSet, 
Expression.class));
+
+            LoadTask.MergeType mergeType = ddc.mergeType() == null ? 
LoadTask.MergeType.APPEND
+                        : 
LoadTask.MergeType.valueOf(ddc.mergeType().getText());
+
+            String fileFormat = ddc.format == null ? null : 
ddc.format.getText();

Review Comment:
   nereids use Optional rather than null as much as possible to avoid NPE which 
is hard to debug



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");

Review Comment:
   should use `ImmutableXXX.copyOf()` to init collection vars



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getState().setNereids(true);
+        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+        // TODO: begin txn form multi insert sql
+        profile.getSummaryProfile().setQueryBeginTime();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, 
dataDesc), Optional.of(labelName), false));
+        }
+        profile.getSummaryProfile().setQueryPlanFinishTime();
+        executeInsertStmtPlan(ctx, executor, plans);
+    }
+
+    private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc 
dataDesc)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("nereids load stmt before conversion: {}", 
dataDesc.toSql());
+        }

Review Comment:
   ```suggestion
               LOG.debug("nereids load stmt before conversion: {}", () -> 
dataDesc.toSql());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/BulkStorageDesc.java:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.datasource.property.S3ClientBEProperties;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+// Broker descriptor
+//
+// Broker example:
+// WITH S3/HDFS
+// (
+//   "username" = "user0",
+//   "password" = "password0"
+// )

Review Comment:
   use java doc format comment



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getState().setNereids(true);
+        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+        // TODO: begin txn form multi insert sql
+        profile.getSummaryProfile().setQueryBeginTime();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, 
dataDesc), Optional.of(labelName), false));
+        }
+        profile.getSummaryProfile().setQueryPlanFinishTime();
+        executeInsertStmtPlan(ctx, executor, plans);
+    }
+
+    private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc 
dataDesc)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("nereids load stmt before conversion: {}", 
dataDesc.toSql());
+        }
+        // build source columns, add select and insert node.
+        List<String> sinkCols = new ArrayList<>();
+        // contains the map of olap column to FileFieldNames, 
ColumnMappingList, ColumnsFromPath.
+        Map<String, NamedExpression> originSinkToSourceMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        OlapTable olapTable = getOlapTable(ctx, dataDesc);
+        Map<String, String> tvfProperties = getTvfProperties(dataDesc, 
bulkStorageDesc);
+
+        buildSinkColumns(olapTable, dataDesc, tvfProperties, sinkCols, 
originSinkToSourceMap);
+        // mapping sink colum to mapping expression
+        if (!dataDesc.getColumnMappingList().isEmpty()) {
+            for (Expression mappingExpr : dataDesc.getColumnMappingList()) {
+                if (!(mappingExpr instanceof EqualTo)) {
+                    continue;
+                }
+                EqualTo equalTo = (EqualTo) mappingExpr;
+                if (!(equalTo.left() instanceof UnboundSlot)) {
+                    // if not column, skip
+                    continue;
+                }
+                String mappingColumn = equalTo.left().toSql();

Review Comment:
   why use toSql as key? we should use Expression as key directly. if we could 
not use expression as key. we should use normalized name to ensure equals work 
fine



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getState().setNereids(true);
+        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+        // TODO: begin txn form multi insert sql
+        profile.getSummaryProfile().setQueryBeginTime();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, 
dataDesc), Optional.of(labelName), false));

Review Comment:
   why use multi times insert into? we could load more than one table in one 
statement? how to process atomic?



##########
fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4:
##########
@@ -101,6 +99,53 @@ planType
     | ALL // default type
     ;
 
+loadStmt
+    : LOAD LABEL lableName=identifier
+        LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+        (withRemoteStorageSystem)?
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    | LOAD LABEL lableName=identifier
+        LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+        resourceDesc
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    | LOAD mysqlDataDesc
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?
+    ;
+
+dataDesc

Review Comment:
   if u not need a  statement alias to distingush two different syntax, u 
should merge them into one line



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/BulkStorageDesc.java:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.datasource.property.S3ClientBEProperties;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+// Broker descriptor
+//
+// Broker example:
+// WITH S3/HDFS
+// (
+//   "username" = "user0",
+//   "password" = "password0"
+// )
+public class BulkStorageDesc implements Writable {

Review Comment:
   if this class only used in Nereids, please put them under nereids package



##########
fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4:
##########
@@ -422,9 +424,12 @@ REPLACE_IF_NOT_NULL: 'REPLACE_IF_NOT_NULL';
 REPLICA: 'REPLICA';
 REPOSITORIES: 'REPOSITORIES';
 REPOSITORY: 'REPOSITORY';
+RESET: 'RESET';
 RESOURCE: 'RESOURCE';
 RESOURCES: 'RESOURCES';
+RESPECT: 'RESPECT';
 RESTORE: 'RESTORE';
+RESTRICT: 'RESTRICT';

Review Comment:
   why add these keywords and never use 



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java:
##########
@@ -533,6 +540,78 @@ public List<Pair<LogicalPlan, StatementContext>> 
visitMultiStatements(MultiState
         return logicalPlans;
     }
 
+    /**
+     * Visit load-statements.
+     */
+    public LogicalPlan visitLoad(LoadContext ctx) {
+
+        BulkStorageDesc bulkDesc = null;
+        DorisParser.LoadStmtContext loadStmt = ctx.loadStmt();
+        if (loadStmt.withRemoteStorageSystem() != null) {
+            Map<String, String> bulkProperties =
+                    new 
HashMap<>(visitPropertyItemList(loadStmt.withRemoteStorageSystem().brokerProperties));
+            if (loadStmt.withRemoteStorageSystem().S3() != null) {
+                bulkDesc = new BulkStorageDesc("S3", 
BulkStorageDesc.StorageType.S3, bulkProperties);
+            } else if (loadStmt.withRemoteStorageSystem().HDFS() != null) {
+                bulkDesc = new BulkStorageDesc("HDFS", 
BulkStorageDesc.StorageType.HDFS, bulkProperties);
+            } else if (loadStmt.withRemoteStorageSystem().LOCAL() != null) {
+                bulkDesc = new BulkStorageDesc("LOCAL_HDFS", 
BulkStorageDesc.StorageType.LOCAL, bulkProperties);
+            } else if (loadStmt.withRemoteStorageSystem().BROKER() != null
+                    && 
loadStmt.withRemoteStorageSystem().identifierOrText().getText() != null) {
+                bulkDesc = new 
BulkStorageDesc(loadStmt.withRemoteStorageSystem().identifierOrText().getText(),
+                        bulkProperties);
+            }
+        }
+        List<BulkLoadDataDesc> dataDescriptions = new ArrayList<>();

Review Comment:
   all collection in Nereids should as far as immutable 



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/BulkLoadDataDesc.java:
##########
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+// used to describe data info which is needed to import.
+//
+//      data_desc:
+//          DATA INFILE ('file_path', ...)
+//          [NEGATIVE]
+//          INTO TABLE tbl_name
+//          [PARTITION (p1, p2)]
+//          [COLUMNS TERMINATED BY separator]
+//          [FORMAT AS format]
+//          [(tmp_col1, tmp_col2, col3, ...)]
+//          [COLUMNS FROM PATH AS (col1, ...)]
+//          [SET (k1=f1(xx), k2=f2(xxx))]
+//          [where_clause]
+//
+//          DATA FROM TABLE external_hive_tbl_name
+//          [NEGATIVE]
+//          INTO TABLE tbl_name
+//          [PARTITION (p1, p2)]
+//          [SET (k1=f1(xx), k2=f2(xxx))]
+//          [where_clause]
+
+/**
+ * The transform of columns should be added after the keyword named COLUMNS.
+ * The transform after the keyword named SET is the old ways which only 
supports the hadoop function.
+ * It old way of transform will be removed gradually. It
+ */
+public class BulkLoadDataDesc {

Review Comment:
   if this class only used in Nereids, please put them under nereids package



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java:
##########
@@ -122,5 +122,6 @@ public enum PlanType {
     INSERT_INTO_TABLE_COMMAND,
     SELECT_INTO_OUTFILE_COMMAND,
     UPDATE_COMMAND,
+    LOAD_COMMAND,
     EXPORT_COMMAND

Review Comment:
   add then in lexicographical order



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java:
##########
@@ -52,6 +53,10 @@ default R visitDeleteCommand(DeleteCommand deleteCommand, C 
context) {
         return visitCommand(deleteCommand, context);
     }
 
+    default R visitLoadCommand(LoadCommand loadCommand, C context) {
+        return visitCommand(loadCommand, context);
+    }
+
     default R visitExportCommand(ExportCommand exportCommand, C context) {
         return visitCommand(exportCommand, context);
     }

Review Comment:
   add them by lexicographical order



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getState().setNereids(true);
+        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+        // TODO: begin txn form multi insert sql
+        profile.getSummaryProfile().setQueryBeginTime();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, 
dataDesc), Optional.of(labelName), false));
+        }
+        profile.getSummaryProfile().setQueryPlanFinishTime();
+        executeInsertStmtPlan(ctx, executor, plans);
+    }
+
+    private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc 
dataDesc)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("nereids load stmt before conversion: {}", 
dataDesc.toSql());
+        }
+        // build source columns, add select and insert node.
+        List<String> sinkCols = new ArrayList<>();
+        // contains the map of olap column to FileFieldNames, 
ColumnMappingList, ColumnsFromPath.
+        Map<String, NamedExpression> originSinkToSourceMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        OlapTable olapTable = getOlapTable(ctx, dataDesc);
+        Map<String, String> tvfProperties = getTvfProperties(dataDesc, 
bulkStorageDesc);
+
+        buildSinkColumns(olapTable, dataDesc, tvfProperties, sinkCols, 
originSinkToSourceMap);
+        // mapping sink colum to mapping expression
+        if (!dataDesc.getColumnMappingList().isEmpty()) {
+            for (Expression mappingExpr : dataDesc.getColumnMappingList()) {
+                if (!(mappingExpr instanceof EqualTo)) {
+                    continue;
+                }
+                EqualTo equalTo = (EqualTo) mappingExpr;
+                if (!(equalTo.left() instanceof UnboundSlot)) {
+                    // if not column, skip
+                    continue;
+                }
+                String mappingColumn = equalTo.left().toSql();
+                if (originSinkToSourceMap.containsKey(mappingColumn)) {
+                    Expression mappingExprAlias = equalTo.right();
+                    originSinkToSourceMap.put(mappingColumn, new 
UnboundAlias(mappingExprAlias, mappingColumn));
+                }
+            }
+        }
+        // build source columns
+        List<NamedExpression> selectLists = new ArrayList<>();
+
+        if (sinkCols.isEmpty()) {
+            // build 'insert into tgt_tbl select * from src_tbl'
+            selectLists.add(new UnboundStar(new ArrayList<>()));
+        } else {
+            for (String columnFromPath : dataDesc.getColumnsFromPath()) {
+                sinkCols.add(columnFromPath);
+                // columnFromPath will be parsed by BE, put columns as 
placeholder.
+                originSinkToSourceMap.put(columnFromPath, new 
UnboundSlot(columnFromPath));
+            }
+            checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, 
originSinkToSourceMap);
+            selectLists.addAll(originSinkToSourceMap.values());
+        }
+        Set<Expression> conjuncts = buildTvfFilter(dataDesc, 
originSinkToSourceMap);
+        if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties) 
&& !conjuncts.isEmpty()) {
+            throw new VerifyException("Required property 'csv_schema' for csv 
file, "
+                    + "when use PRECEDING FILTER, WHERE, ORDER BY AND DELETE 
ON clause.");
+        }
+        LogicalPlan tvfLogicalPlan = new LogicalProject<>(selectLists,         
                   // add select
+                new LogicalFilter<>(conjuncts,                                 
                   // add filter
+                        new 
LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties))));            // add 
relation

Review Comment:
   u could use inline comment to make this comment more clear
   ```suggestion
           LogicalPlan tvfLogicalPlan = new LogicalProject<>(/*add 
select*/selectLists,
                   /*add filter*/new LogicalFilter<>(conjuncts,
                           /*add relation*/new 
LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties))));            
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan

Review Comment:
   ```suggestion
        * for test print.
        *
        * @param ctx context
        * @return parsed insert into plan
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;

Review Comment:
   ```suggestion
       
       public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
       
       private final String labelName;
       private final BulkStorageDesc bulkStorageDesc;
       private final List<BulkLoadDataDesc> sourceInfos;
       private final Map<String, String> properties;
       private final String comment;
       private final List<LogicalPlan> plans = new ArrayList<>();
       private Profile profile;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getState().setNereids(true);

Review Comment:
   why set this? we should not set this in command



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;

Review Comment:
   ```suggestion
           return sourceInfos.stream()
                   .map(dataDesc -> this.completeQueryPlan(ctx, dataDesc))
                   .collect(ImmutableList.toImmutableList());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getState().setNereids(true);
+        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+        // TODO: begin txn form multi insert sql
+        profile.getSummaryProfile().setQueryBeginTime();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, 
dataDesc), Optional.of(labelName), false));
+        }
+        profile.getSummaryProfile().setQueryPlanFinishTime();
+        executeInsertStmtPlan(ctx, executor, plans);
+    }
+
+    private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc 
dataDesc)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("nereids load stmt before conversion: {}", 
dataDesc.toSql());
+        }
+        // build source columns, add select and insert node.
+        List<String> sinkCols = new ArrayList<>();
+        // contains the map of olap column to FileFieldNames, 
ColumnMappingList, ColumnsFromPath.
+        Map<String, NamedExpression> originSinkToSourceMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        OlapTable olapTable = getOlapTable(ctx, dataDesc);
+        Map<String, String> tvfProperties = getTvfProperties(dataDesc, 
bulkStorageDesc);
+
+        buildSinkColumns(olapTable, dataDesc, tvfProperties, sinkCols, 
originSinkToSourceMap);
+        // mapping sink colum to mapping expression
+        if (!dataDesc.getColumnMappingList().isEmpty()) {
+            for (Expression mappingExpr : dataDesc.getColumnMappingList()) {
+                if (!(mappingExpr instanceof EqualTo)) {
+                    continue;
+                }
+                EqualTo equalTo = (EqualTo) mappingExpr;
+                if (!(equalTo.left() instanceof UnboundSlot)) {
+                    // if not column, skip
+                    continue;
+                }
+                String mappingColumn = equalTo.left().toSql();
+                if (originSinkToSourceMap.containsKey(mappingColumn)) {
+                    Expression mappingExprAlias = equalTo.right();
+                    originSinkToSourceMap.put(mappingColumn, new 
UnboundAlias(mappingExprAlias, mappingColumn));
+                }
+            }
+        }
+        // build source columns
+        List<NamedExpression> selectLists = new ArrayList<>();
+
+        if (sinkCols.isEmpty()) {
+            // build 'insert into tgt_tbl select * from src_tbl'
+            selectLists.add(new UnboundStar(new ArrayList<>()));
+        } else {
+            for (String columnFromPath : dataDesc.getColumnsFromPath()) {
+                sinkCols.add(columnFromPath);
+                // columnFromPath will be parsed by BE, put columns as 
placeholder.
+                originSinkToSourceMap.put(columnFromPath, new 
UnboundSlot(columnFromPath));
+            }
+            checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, 
originSinkToSourceMap);
+            selectLists.addAll(originSinkToSourceMap.values());
+        }
+        Set<Expression> conjuncts = buildTvfFilter(dataDesc, 
originSinkToSourceMap);
+        if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties) 
&& !conjuncts.isEmpty()) {
+            throw new VerifyException("Required property 'csv_schema' for csv 
file, "
+                    + "when use PRECEDING FILTER, WHERE, ORDER BY AND DELETE 
ON clause.");
+        }
+        LogicalPlan tvfLogicalPlan = new LogicalProject<>(selectLists,         
                   // add select
+                new LogicalFilter<>(conjuncts,                                 
                   // add filter
+                        new 
LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties))));            // add 
relation
+        boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite()
+                && sinkCols.size() < olapTable.getColumns().size();
+        return new UnboundOlapTableSink<>(dataDesc.getNameParts(), sinkCols, 
ImmutableList.of(),
+                dataDesc.getPartitionNames(), isPartialUpdate, tvfLogicalPlan);
+    }
+
+    private static boolean isCsvType(Map<String, String> tvfProperties) {
+        return 
tvfProperties.get(ExternalFileTableValuedFunction.FORMAT).equalsIgnoreCase("csv");
+    }
+
+    private static void parseCsvSchemaToSelect(List<Column> olapColumns, 
Map<String, String> sourceProperties,
+                                               List<String> sinkCols,
+                                               Map<String, NamedExpression> 
originSinkToSourceMap)
+            throws AnalysisException {
+        List<Column> csvSchema = new ArrayList<>();
+        ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, 
sourceProperties);
+        int sinkColSize = Math.min(csvSchema.size(), olapColumns.size());
+        for (int i = 0; i < sinkColSize; i++) {
+            Column olapColumn = olapColumns.get(i);
+            Column csvColumn = csvSchema.get(i);
+            sinkCols.add(olapColumn.getName());
+            originSinkToSourceMap.put(olapColumn.getName(), new 
UnboundSlot(csvColumn.getName()));
+        }
+
+    }
+
+    /**
+     * fill all column that need to be loaded to sinkCols.
+     * fill the map with sink columns and generated source columns.
+     * sink columns use for 'INSERT INTO'
+     * generated source columns use for 'SELECT'
+     *
+     * @param olapTable             olapTable
+     * @param dataDesc              dataDesc
+     * @param tvfProperties         generated tvfProperties
+     * @param sinkCols              sinkCols for insert into table
+     * @param originSinkToSourceMap sink column map to source column
+     */
+    private static void buildSinkColumns(OlapTable olapTable,
+                                         BulkLoadDataDesc dataDesc,
+                                         Map<String, String> tvfProperties,
+                                         List<String> sinkCols,
+                                         Map<String, NamedExpression> 
originSinkToSourceMap) throws AnalysisException {
+        List<Column> fullSchema = olapTable.getFullSchema();
+        if (dataDesc.getFileFieldNames().size() > fullSchema.size()) {
+            throw new VerifyException("Source columns size should less than 
sink columns size");
+        }
+        checkDeleteOnConditions(dataDesc.getMergeType(), 
dataDesc.getDeleteCondition());
+        Map<String, String> sourceProperties = dataDesc.getProperties();
+        if (dataDesc.getFileFieldNames().isEmpty() && 
isCsvType(tvfProperties)) {
+            String csvSchemaStr = 
sourceProperties.get(ExternalFileTableValuedFunction.CSV_SCHEMA);
+            if (csvSchemaStr != null) {
+                tvfProperties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, 
csvSchemaStr);
+                parseCsvSchemaToSelect(fullSchema, sourceProperties, sinkCols, 
originSinkToSourceMap);
+            }
+            return;
+        }
+        for (int i = 0; i < dataDesc.getFileFieldNames().size(); i++) {
+            String targetColumn = fullSchema.get(i).getName();
+            String sourceColumn = dataDesc.getFileFieldNames().get(i);
+            if (targetColumn.equalsIgnoreCase(Column.VERSION_COL)) {
+                continue;
+            } else if (olapTable.hasDeleteSign()
+                    && targetColumn.equalsIgnoreCase(Column.DELETE_SIGN)
+                    && dataDesc.getDeleteCondition() != null) {
+                sinkCols.add(targetColumn);
+                WhenClause deleteWhen = new 
WhenClause(dataDesc.getDeleteCondition(), new TinyIntLiteral((byte) 1));
+                CaseWhen deleteCase = new 
CaseWhen(ImmutableList.of(deleteWhen), new TinyIntLiteral((byte) 0));
+                originSinkToSourceMap.put(targetColumn, new 
UnboundAlias(deleteCase, targetColumn));
+                continue;
+            }
+            sinkCols.add(targetColumn);
+            originSinkToSourceMap.put(targetColumn, new 
UnboundSlot(sourceColumn));
+        }
+    }
+
+    private static void checkDeleteOnConditions(LoadTask.MergeType mergeType, 
Expression deleteCondition) {
+        if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) {
+            throw new 
IllegalArgumentException(BulkLoadDataDesc.EXPECT_MERGE_DELETE_ON);

Review Comment:
   throw AnalysisException



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {

Review Comment:
   do not see support exlain in parser, remove `Explainable `



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getState().setNereids(true);
+        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+        // TODO: begin txn form multi insert sql
+        profile.getSummaryProfile().setQueryBeginTime();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, 
dataDesc), Optional.of(labelName), false));
+        }
+        profile.getSummaryProfile().setQueryPlanFinishTime();
+        executeInsertStmtPlan(ctx, executor, plans);
+    }
+
+    private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc 
dataDesc)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("nereids load stmt before conversion: {}", 
dataDesc.toSql());
+        }
+        // build source columns, add select and insert node.
+        List<String> sinkCols = new ArrayList<>();
+        // contains the map of olap column to FileFieldNames, 
ColumnMappingList, ColumnsFromPath.
+        Map<String, NamedExpression> originSinkToSourceMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        OlapTable olapTable = getOlapTable(ctx, dataDesc);
+        Map<String, String> tvfProperties = getTvfProperties(dataDesc, 
bulkStorageDesc);
+
+        buildSinkColumns(olapTable, dataDesc, tvfProperties, sinkCols, 
originSinkToSourceMap);
+        // mapping sink colum to mapping expression
+        if (!dataDesc.getColumnMappingList().isEmpty()) {
+            for (Expression mappingExpr : dataDesc.getColumnMappingList()) {
+                if (!(mappingExpr instanceof EqualTo)) {
+                    continue;
+                }
+                EqualTo equalTo = (EqualTo) mappingExpr;
+                if (!(equalTo.left() instanceof UnboundSlot)) {
+                    // if not column, skip
+                    continue;
+                }
+                String mappingColumn = equalTo.left().toSql();
+                if (originSinkToSourceMap.containsKey(mappingColumn)) {
+                    Expression mappingExprAlias = equalTo.right();
+                    originSinkToSourceMap.put(mappingColumn, new 
UnboundAlias(mappingExprAlias, mappingColumn));
+                }
+            }
+        }
+        // build source columns
+        List<NamedExpression> selectLists = new ArrayList<>();
+
+        if (sinkCols.isEmpty()) {
+            // build 'insert into tgt_tbl select * from src_tbl'
+            selectLists.add(new UnboundStar(new ArrayList<>()));
+        } else {
+            for (String columnFromPath : dataDesc.getColumnsFromPath()) {
+                sinkCols.add(columnFromPath);
+                // columnFromPath will be parsed by BE, put columns as 
placeholder.
+                originSinkToSourceMap.put(columnFromPath, new 
UnboundSlot(columnFromPath));
+            }
+            checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, 
originSinkToSourceMap);
+            selectLists.addAll(originSinkToSourceMap.values());
+        }
+        Set<Expression> conjuncts = buildTvfFilter(dataDesc, 
originSinkToSourceMap);
+        if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties) 
&& !conjuncts.isEmpty()) {
+            throw new VerifyException("Required property 'csv_schema' for csv 
file, "
+                    + "when use PRECEDING FILTER, WHERE, ORDER BY AND DELETE 
ON clause.");
+        }
+        LogicalPlan tvfLogicalPlan = new LogicalProject<>(selectLists,         
                   // add select
+                new LogicalFilter<>(conjuncts,                                 
                   // add filter
+                        new 
LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties))));            // add 
relation
+        boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite()
+                && sinkCols.size() < olapTable.getColumns().size();
+        return new UnboundOlapTableSink<>(dataDesc.getNameParts(), sinkCols, 
ImmutableList.of(),
+                dataDesc.getPartitionNames(), isPartialUpdate, tvfLogicalPlan);
+    }
+
+    private static boolean isCsvType(Map<String, String> tvfProperties) {
+        return 
tvfProperties.get(ExternalFileTableValuedFunction.FORMAT).equalsIgnoreCase("csv");
+    }
+
+    private static void parseCsvSchemaToSelect(List<Column> olapColumns, 
Map<String, String> sourceProperties,
+                                               List<String> sinkCols,
+                                               Map<String, NamedExpression> 
originSinkToSourceMap)
+            throws AnalysisException {
+        List<Column> csvSchema = new ArrayList<>();
+        ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, 
sourceProperties);
+        int sinkColSize = Math.min(csvSchema.size(), olapColumns.size());
+        for (int i = 0; i < sinkColSize; i++) {
+            Column olapColumn = olapColumns.get(i);
+            Column csvColumn = csvSchema.get(i);
+            sinkCols.add(olapColumn.getName());
+            originSinkToSourceMap.put(olapColumn.getName(), new 
UnboundSlot(csvColumn.getName()));
+        }
+
+    }
+
+    /**
+     * fill all column that need to be loaded to sinkCols.
+     * fill the map with sink columns and generated source columns.
+     * sink columns use for 'INSERT INTO'
+     * generated source columns use for 'SELECT'
+     *
+     * @param olapTable             olapTable
+     * @param dataDesc              dataDesc
+     * @param tvfProperties         generated tvfProperties
+     * @param sinkCols              sinkCols for insert into table
+     * @param originSinkToSourceMap sink column map to source column
+     */
+    private static void buildSinkColumns(OlapTable olapTable,
+                                         BulkLoadDataDesc dataDesc,
+                                         Map<String, String> tvfProperties,
+                                         List<String> sinkCols,
+                                         Map<String, NamedExpression> 
originSinkToSourceMap) throws AnalysisException {
+        List<Column> fullSchema = olapTable.getFullSchema();
+        if (dataDesc.getFileFieldNames().size() > fullSchema.size()) {
+            throw new VerifyException("Source columns size should less than 
sink columns size");

Review Comment:
   do not use VerifyException, use AnalysisException in Nereids



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.BulkLoadDataDesc;
+import org.apache.doris.analysis.BulkStorageDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * export table
+ */
+public class LoadCommand extends Command implements ForwardWithSync, 
Explainable {
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = Objects.requireNonNull(sourceInfos, "sourceInfos 
should not null");
+        this.properties = Objects.requireNonNull(properties, "properties 
should not null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        ctx.getState().setNereids(true);
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getState().setNereids(true);
+        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+        // TODO: begin txn form multi insert sql
+        profile.getSummaryProfile().setQueryBeginTime();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, 
dataDesc), Optional.of(labelName), false));
+        }
+        profile.getSummaryProfile().setQueryPlanFinishTime();
+        executeInsertStmtPlan(ctx, executor, plans);
+    }
+
+    private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc 
dataDesc)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("nereids load stmt before conversion: {}", 
dataDesc.toSql());
+        }
+        // build source columns, add select and insert node.
+        List<String> sinkCols = new ArrayList<>();
+        // contains the map of olap column to FileFieldNames, 
ColumnMappingList, ColumnsFromPath.
+        Map<String, NamedExpression> originSinkToSourceMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        OlapTable olapTable = getOlapTable(ctx, dataDesc);
+        Map<String, String> tvfProperties = getTvfProperties(dataDesc, 
bulkStorageDesc);
+
+        buildSinkColumns(olapTable, dataDesc, tvfProperties, sinkCols, 
originSinkToSourceMap);
+        // mapping sink colum to mapping expression
+        if (!dataDesc.getColumnMappingList().isEmpty()) {
+            for (Expression mappingExpr : dataDesc.getColumnMappingList()) {
+                if (!(mappingExpr instanceof EqualTo)) {
+                    continue;
+                }
+                EqualTo equalTo = (EqualTo) mappingExpr;
+                if (!(equalTo.left() instanceof UnboundSlot)) {
+                    // if not column, skip
+                    continue;
+                }
+                String mappingColumn = equalTo.left().toSql();
+                if (originSinkToSourceMap.containsKey(mappingColumn)) {
+                    Expression mappingExprAlias = equalTo.right();
+                    originSinkToSourceMap.put(mappingColumn, new 
UnboundAlias(mappingExprAlias, mappingColumn));
+                }
+            }
+        }
+        // build source columns
+        List<NamedExpression> selectLists = new ArrayList<>();
+
+        if (sinkCols.isEmpty()) {
+            // build 'insert into tgt_tbl select * from src_tbl'
+            selectLists.add(new UnboundStar(new ArrayList<>()));
+        } else {
+            for (String columnFromPath : dataDesc.getColumnsFromPath()) {
+                sinkCols.add(columnFromPath);
+                // columnFromPath will be parsed by BE, put columns as 
placeholder.
+                originSinkToSourceMap.put(columnFromPath, new 
UnboundSlot(columnFromPath));
+            }
+            checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, 
originSinkToSourceMap);
+            selectLists.addAll(originSinkToSourceMap.values());
+        }
+        Set<Expression> conjuncts = buildTvfFilter(dataDesc, 
originSinkToSourceMap);
+        if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties) 
&& !conjuncts.isEmpty()) {
+            throw new VerifyException("Required property 'csv_schema' for csv 
file, "
+                    + "when use PRECEDING FILTER, WHERE, ORDER BY AND DELETE 
ON clause.");
+        }
+        LogicalPlan tvfLogicalPlan = new LogicalProject<>(selectLists,         
                   // add select
+                new LogicalFilter<>(conjuncts,                                 
                   // add filter
+                        new 
LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties))));            // add 
relation
+        boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite()
+                && sinkCols.size() < olapTable.getColumns().size();
+        return new UnboundOlapTableSink<>(dataDesc.getNameParts(), sinkCols, 
ImmutableList.of(),
+                dataDesc.getPartitionNames(), isPartialUpdate, tvfLogicalPlan);
+    }
+
+    private static boolean isCsvType(Map<String, String> tvfProperties) {
+        return 
tvfProperties.get(ExternalFileTableValuedFunction.FORMAT).equalsIgnoreCase("csv");
+    }
+
+    private static void parseCsvSchemaToSelect(List<Column> olapColumns, 
Map<String, String> sourceProperties,
+                                               List<String> sinkCols,
+                                               Map<String, NamedExpression> 
originSinkToSourceMap)
+            throws AnalysisException {
+        List<Column> csvSchema = new ArrayList<>();
+        ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, 
sourceProperties);
+        int sinkColSize = Math.min(csvSchema.size(), olapColumns.size());
+        for (int i = 0; i < sinkColSize; i++) {
+            Column olapColumn = olapColumns.get(i);
+            Column csvColumn = csvSchema.get(i);
+            sinkCols.add(olapColumn.getName());
+            originSinkToSourceMap.put(olapColumn.getName(), new 
UnboundSlot(csvColumn.getName()));
+        }
+
+    }
+
+    /**
+     * fill all column that need to be loaded to sinkCols.
+     * fill the map with sink columns and generated source columns.
+     * sink columns use for 'INSERT INTO'
+     * generated source columns use for 'SELECT'
+     *
+     * @param olapTable             olapTable
+     * @param dataDesc              dataDesc
+     * @param tvfProperties         generated tvfProperties
+     * @param sinkCols              sinkCols for insert into table
+     * @param originSinkToSourceMap sink column map to source column
+     */
+    private static void buildSinkColumns(OlapTable olapTable,
+                                         BulkLoadDataDesc dataDesc,
+                                         Map<String, String> tvfProperties,
+                                         List<String> sinkCols,
+                                         Map<String, NamedExpression> 
originSinkToSourceMap) throws AnalysisException {
+        List<Column> fullSchema = olapTable.getFullSchema();
+        if (dataDesc.getFileFieldNames().size() > fullSchema.size()) {
+            throw new VerifyException("Source columns size should less than 
sink columns size");
+        }
+        checkDeleteOnConditions(dataDesc.getMergeType(), 
dataDesc.getDeleteCondition());
+        Map<String, String> sourceProperties = dataDesc.getProperties();
+        if (dataDesc.getFileFieldNames().isEmpty() && 
isCsvType(tvfProperties)) {
+            String csvSchemaStr = 
sourceProperties.get(ExternalFileTableValuedFunction.CSV_SCHEMA);
+            if (csvSchemaStr != null) {
+                tvfProperties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, 
csvSchemaStr);
+                parseCsvSchemaToSelect(fullSchema, sourceProperties, sinkCols, 
originSinkToSourceMap);
+            }
+            return;
+        }
+        for (int i = 0; i < dataDesc.getFileFieldNames().size(); i++) {
+            String targetColumn = fullSchema.get(i).getName();
+            String sourceColumn = dataDesc.getFileFieldNames().get(i);
+            if (targetColumn.equalsIgnoreCase(Column.VERSION_COL)) {
+                continue;
+            } else if (olapTable.hasDeleteSign()
+                    && targetColumn.equalsIgnoreCase(Column.DELETE_SIGN)
+                    && dataDesc.getDeleteCondition() != null) {
+                sinkCols.add(targetColumn);
+                WhenClause deleteWhen = new 
WhenClause(dataDesc.getDeleteCondition(), new TinyIntLiteral((byte) 1));
+                CaseWhen deleteCase = new 
CaseWhen(ImmutableList.of(deleteWhen), new TinyIntLiteral((byte) 0));
+                originSinkToSourceMap.put(targetColumn, new 
UnboundAlias(deleteCase, targetColumn));
+                continue;
+            }
+            sinkCols.add(targetColumn);
+            originSinkToSourceMap.put(targetColumn, new 
UnboundSlot(sourceColumn));

Review Comment:
   ```suggestion
               } else {
                   sinkCols.add(targetColumn);
                   originSinkToSourceMap.put(targetColumn, new 
UnboundSlot(sourceColumn));
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to