This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cdba4936b48 [feature](nereids) Support group commit insert (#26075)
cdba4936b48 is described below

commit cdba4936b48b4951d670649a68fef363c9cfab25
Author: 赵硕 <1443539...@qq.com>
AuthorDate: Fri Nov 10 14:20:14 2023 +0800

    [feature](nereids) Support group commit insert (#26075)
---
 .../apache/doris/analysis/NativeInsertStmt.java    |  57 +--
 .../doris/nereids/jobs/executor/Rewriter.java      |   4 -
 .../org/apache/doris/nereids/rules/RuleType.java   |   2 -
 .../rules/analysis/RejectGroupCommitInsert.java    |  53 ---
 .../plans/commands/InsertIntoTableCommand.java     |  78 +++-
 .../apache/doris/planner/GroupCommitPlanner.java   | 196 ++++++++++
 .../java/org/apache/doris/qe/StmtExecutor.java     |  41 +--
 .../data/insert_p0/insert_group_commit_into.out    |  95 +++++
 .../insert_p0/insert_group_commit_into.groovy      | 348 +++++++++---------
 .../insert_group_commit_with_exception.groovy      | 402 +++++++++++----------
 .../insert_group_commit_with_large_data.groovy     |  89 ++---
 11 files changed, 836 insertions(+), 529 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index d4802ea956c..27e4c6bf36c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -50,24 +50,14 @@ import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.ExportSink;
 import org.apache.doris.planner.GroupCommitBlockSink;
 import org.apache.doris.planner.GroupCommitOlapTableSink;
+import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.OlapTableSink;
-import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.planner.external.jdbc.JdbcTableSink;
-import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.tablefunction.GroupCommitTableValuedFunction;
-import org.apache.doris.task.StreamLoadTask;
-import org.apache.doris.thrift.TExecPlanFragmentParams;
-import org.apache.doris.thrift.TExecPlanFragmentParamsList;
-import org.apache.doris.thrift.TFileCompressType;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TMergeType;
 import org.apache.doris.thrift.TQueryOptions;
-import org.apache.doris.thrift.TScanRangeParams;
-import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
@@ -84,10 +74,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -167,6 +155,7 @@ public class NativeInsertStmt extends InsertStmt {
     // true if be generates an insert from group commit tvf stmt and executes 
to load data
     public boolean isGroupCommitTvf = false;
     public boolean isGroupCommitStreamLoadSql = false;
+    private GroupCommitPlanner groupCommitPlanner;
 
     private boolean isFromDeleteOrUpdateStmt = false;
 
@@ -1134,10 +1123,10 @@ public class NativeInsertStmt extends InsertStmt {
         return isGroupCommit;
     }
 
-    public void planForGroupCommit(TUniqueId queryId) throws UserException, 
TException {
+    public GroupCommitPlanner planForGroupCommit(TUniqueId queryId) throws 
UserException, TException {
         OlapTable olapTable = (OlapTable) getTargetTable();
         if (execPlanFragmentParamsBytes != null && 
olapTable.getBaseSchemaVersion() == baseSchemaVersion) {
-            return;
+            return groupCommitPlanner;
         }
         if (!targetColumns.isEmpty()) {
             Analyzer analyzerTmp = analyzer;
@@ -1145,45 +1134,11 @@ public class NativeInsertStmt extends InsertStmt {
             this.analyzer = analyzerTmp;
         }
         analyzeSubquery(analyzer, true);
-        TStreamLoadPutRequest streamLoadPutRequest = new 
TStreamLoadPutRequest();
-        if (targetColumnNames != null) {
-            streamLoadPutRequest.setColumns(String.join(",", 
targetColumnNames));
-            if (targetColumnNames.stream().anyMatch(col -> 
col.equalsIgnoreCase(Column.SEQUENCE_COL))) {
-                streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL);
-            }
-        }
-        streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1)
-                .setTbl(getTbl())
-                
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
-                .setGroupCommit(true);
-        StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
-        StreamLoadPlanner planner = new StreamLoadPlanner((Database) 
getDbObj(), olapTable, streamLoadTask);
-        // Will using load id as query id in fragment
-        TExecPlanFragmentParams tRequest = 
planner.plan(streamLoadTask.getId());
-        for (Map.Entry<Integer, List<TScanRangeParams>> entry : 
tRequest.params.per_node_scan_ranges.entrySet()) {
-            for (TScanRangeParams scanRangeParams : entry.getValue()) {
-                
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
-                        TFileFormatType.FORMAT_PROTO);
-                
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
-                        TFileCompressType.PLAIN);
-            }
-        }
-        List<TScanRangeParams> scanRangeParams = 
tRequest.params.per_node_scan_ranges.values().stream()
-                .flatMap(Collection::stream).collect(Collectors.toList());
-        Preconditions.checkState(scanRangeParams.size() == 1);
+        groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, 
targetColumnNames, queryId);
         // save plan message to be reused for prepare stmt
         loadId = queryId;
         baseSchemaVersion = olapTable.getBaseSchemaVersion();
-        // see BackendServiceProxy#execPlanFragmentsAsync
-        TExecPlanFragmentParamsList paramsList = new 
TExecPlanFragmentParamsList();
-        paramsList.addToParamsList(tRequest);
-        execPlanFragmentParamsBytes = ByteString.copyFrom(new 
TSerializer().serialize(paramsList));
-    }
-
-    public InternalService.PExecPlanFragmentRequest 
getExecPlanFragmentRequest() {
-        return 
InternalService.PExecPlanFragmentRequest.newBuilder().setRequest(execPlanFragmentParamsBytes)
-                
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build();
+        return groupCommitPlanner;
     }
 
     public TUniqueId getLoadId() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index fe6bc13b6c4..802ddaa6c0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -28,7 +28,6 @@ import 
org.apache.doris.nereids.rules.analysis.CheckAfterRewrite;
 import org.apache.doris.nereids.rules.analysis.EliminateGroupByConstant;
 import 
org.apache.doris.nereids.rules.analysis.LogicalSubQueryAliasToLogicalProject;
 import org.apache.doris.nereids.rules.analysis.NormalizeAggregate;
-import org.apache.doris.nereids.rules.analysis.RejectGroupCommitInsert;
 import org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite;
 import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
 import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
@@ -269,9 +268,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
                     topDown(new BuildAggForUnion())
             ),
 
-            // TODO remove it after Nereids support group commit insert
-            topDown(new RejectGroupCommitInsert()),
-
             // topic("Distinct",
             //         
costBased(custom(RuleType.PUSH_DOWN_DISTINCT_THROUGH_JOIN, 
PushdownDistinctThroughJoin::new))
             // ),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 505c7db91a7..94ccef528fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -77,8 +77,6 @@ public enum RuleType {
     ANALYZE_CTE(RuleTypeClass.REWRITE),
     RELATION_AUTHENTICATION(RuleTypeClass.VALIDATION),
 
-    REJECT_GROUP_COMMIT_INSERT(RuleTypeClass.REWRITE),
-
     ADJUST_NULLABLE_FOR_PROJECT_SLOT(RuleTypeClass.REWRITE),
     ADJUST_NULLABLE_FOR_AGGREGATE_SLOT(RuleTypeClass.REWRITE),
     ADJUST_NULLABLE_FOR_HAVING_SLOT(RuleTypeClass.REWRITE),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java
deleted file mode 100644
index 19c9706e03d..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java
+++ /dev/null
@@ -1,53 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.rules.analysis;
-
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-
-/**
- * reject group commit insert added by PR <a 
href="https://github.com/apache/doris/pull/22829/files";>#22829</a>
- */
-public class RejectGroupCommitInsert implements RewriteRuleFactory {
-
-    @Override
-    public List<Rule> buildRules() {
-        return ImmutableList.of(
-                logicalOlapTableSink(logicalOneRowRelation())
-                        .thenApply(ctx -> {
-                            if 
(ctx.connectContext.getSessionVariable().enableInsertGroupCommit) {
-                                throw new AnalysisException("Nereids do not 
support group commit now.");
-                            }
-                            return null;
-                        }).toRule(RuleType.REJECT_GROUP_COMMIT_INSERT),
-                logicalOlapTableSink(logicalUnion().when(u -> u.arity() == 0))
-                        .thenApply(ctx -> {
-                            if 
(ctx.connectContext.getSessionVariable().enableInsertGroupCommit) {
-                                throw new AnalysisException("Nereids do not 
support group commit now.");
-                            }
-                            return null;
-                        }).toRule(RuleType.REJECT_GROUP_COMMIT_INSERT)
-        );
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index f7c30f3820b..79dc9485fc8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -22,9 +22,11 @@ import org.apache.doris.analysis.AlterClause;
 import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DropPartitionClause;
+import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.ReplacePartitionClause;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.DdlException;
@@ -39,6 +41,7 @@ import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.trees.TreeNode;
+import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
@@ -46,12 +49,19 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.txn.Transaction;
+import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.OlapTableSink;
+import org.apache.doris.planner.UnionNode;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.DdlExecutor;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -59,6 +69,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -68,6 +79,8 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
  * insert into select command implementation
@@ -152,7 +165,13 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         }
 
         OlapTableSink sink = ((OlapTableSink) 
planner.getFragments().get(0).getSink());
-
+        if (ctx.getSessionVariable().enableInsertGroupCommit) {
+            // group commit
+            if (analyzeGroupCommit(sink, physicalOlapTableSink)) {
+                handleGroupCommit(ctx, sink, physicalOlapTableSink);
+                return;
+            }
+        }
         Preconditions.checkArgument(!isTxnBegin, "an insert command cannot 
create more than one txn");
         Transaction txn = new Transaction(ctx,
                 physicalOlapTableSink.getDatabase(),
@@ -352,6 +371,63 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         }
     }
 
+    private void handleGroupCommit(ConnectContext ctx, OlapTableSink sink,
+                PhysicalOlapTableSink<?> physicalOlapTableSink)
+                throws UserException, RpcException, TException, 
ExecutionException, InterruptedException {
+
+        List<InternalService.PDataRow> rows = new ArrayList<>();
+        List<List<Expr>> materializedConstExprLists = ((UnionNode) 
sink.getFragment().getPlanRoot())
+                .getMaterializedConstExprLists();
+
+        int filterSize = 0;
+        for (Slot slot : physicalOlapTableSink.getOutput()) {
+            if (slot.getName().contains(Column.DELETE_SIGN)
+                    || slot.getName().contains(Column.VERSION_COL)) {
+                filterSize += 1;
+            }
+        }
+        for (List<Expr> list : materializedConstExprLists) {
+            rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
+        }
+        GroupCommitPlanner groupCommitPlanner = new 
GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
+                physicalOlapTableSink.getTargetTable(), null, ctx.queryId());
+        Future<PGroupCommitInsertResponse> future = 
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
+        PGroupCommitInsertResponse response = future.get();
+        TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+        if (code == TStatusCode.DATA_QUALITY_ERROR) {
+            LOG.info("group commit insert failed. query id: {}, backend id: 
{}, status: {}, "
+                    + "schema version: {}", ctx.queryId(),
+                    groupCommitPlanner.getBackend(), response.getStatus(),
+                    
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
+        } else if (code != TStatusCode.OK) {
+            String errMsg = "group commit insert failed. backend id: "
+                    + groupCommitPlanner.getBackend().getId() + ", status: "
+                    + response.getStatus();
+            ErrorReport.reportDdlException(errMsg, 
ErrorCode.ERR_FAILED_WHEN_INSERT);
+        }
+        TransactionStatus txnStatus = TransactionStatus.PREPARE;
+        StringBuilder sb = new StringBuilder();
+        sb.append("{'label':'").append(response.getLabel()).append("', 
'status':'").append(txnStatus.name());
+        sb.append("', 'txnId':'").append(response.getTxnId()).append("'");
+        sb.append("', 'optimizer':'").append("nereids").append("'");
+        sb.append("}");
+
+        ctx.getState().setOk(response.getLoadedRows(), (int) 
response.getFilteredRows(), sb.toString());
+        ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
+                physicalOlapTableSink.getDatabase().getFullName(), 
physicalOlapTableSink.getTargetTable().getName(),
+                txnStatus, response.getLoadedRows(), (int) 
response.getFilteredRows());
+        // update it, so that user can get loaded rows in fe.audit.log
+        ctx.updateReturnRows((int) response.getLoadedRows());
+    }
+
+    private boolean analyzeGroupCommit(OlapTableSink sink, 
PhysicalOlapTableSink<?> physicalOlapTableSink) {
+        return 
ConnectContext.get().getSessionVariable().enableInsertGroupCommit
+            && physicalOlapTableSink.getTargetTable() instanceof OlapTable
+            && !ConnectContext.get().isTxnModel()
+            && sink.getFragment().getPlanRoot() instanceof UnionNode
+            && physicalOlapTableSink.getPartitionIds().isEmpty();
+    }
+
     @Override
     public Plan getExplainPlan(ConnectContext ctx) {
         return this.logicalQuery;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
new file mode 100644
index 00000000000..15aa639abee
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
+import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
+import org.apache.doris.proto.Types;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.StreamLoadTask;
+import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMergeType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+// Used to generate a plan fragment for a group commit
+// we only support OlapTable now.
+public class GroupCommitPlanner {
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitPlanner.class);
+
+    private Database db;
+    private OlapTable table;
+    private TUniqueId loadId;
+    private Backend backend;
+    private TExecPlanFragmentParamsList paramsList;
+    private ByteString execPlanFragmentParamsBytes;
+
+    public GroupCommitPlanner(Database db, OlapTable table, List<String> 
targetColumnNames, TUniqueId queryId)
+            throws UserException, TException {
+        this.db = db;
+        this.table = table;
+        TStreamLoadPutRequest streamLoadPutRequest = new 
TStreamLoadPutRequest();
+        if (targetColumnNames != null) {
+            streamLoadPutRequest.setColumns(String.join(",", 
targetColumnNames));
+            if (targetColumnNames.stream().anyMatch(col -> 
col.equalsIgnoreCase(Column.SEQUENCE_COL))) {
+                streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL);
+            }
+        }
+        streamLoadPutRequest
+                .setDb(db.getFullName())
+                .setMaxFilterRatio(1)
+                .setTbl(table.getName())
+                
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
+                
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
+                .setGroupCommit(true);
+        StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
+        StreamLoadPlanner planner = new StreamLoadPlanner(db, table, 
streamLoadTask);
+        // Will using load id as query id in fragment
+        TExecPlanFragmentParams tRequest = 
planner.plan(streamLoadTask.getId());
+        for (Map.Entry<Integer, List<TScanRangeParams>> entry : 
tRequest.params.per_node_scan_ranges.entrySet()) {
+            for (TScanRangeParams scanRangeParams : entry.getValue()) {
+                
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+                        TFileFormatType.FORMAT_PROTO);
+                
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+                        TFileCompressType.PLAIN);
+            }
+        }
+        List<TScanRangeParams> scanRangeParams = 
tRequest.params.per_node_scan_ranges.values().stream()
+                .flatMap(Collection::stream).collect(Collectors.toList());
+        Preconditions.checkState(scanRangeParams.size() == 1);
+        loadId = queryId;
+        // see BackendServiceProxy#execPlanFragmentsAsync
+        paramsList = new TExecPlanFragmentParamsList();
+        paramsList.addToParamsList(tRequest);
+        execPlanFragmentParamsBytes = ByteString.copyFrom(new 
TSerializer().serialize(paramsList));
+    }
+
+    public Future<PGroupCommitInsertResponse> 
executeGroupCommitInsert(ConnectContext ctx,
+            List<InternalService.PDataRow> rows) throws TException, 
DdlException, RpcException {
+        backend = ctx.getInsertGroupCommit(this.table.getId());
+        if (backend == null || !backend.isAlive()) {
+            List<Long> allBackendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+            if (allBackendIds.isEmpty()) {
+                throw new DdlException("No alive backend");
+            }
+            Collections.shuffle(allBackendIds);
+            backend = 
Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0));
+            ctx.setInsertGroupCommit(this.table.getId(), backend);
+        }
+        PGroupCommitInsertRequest request = 
PGroupCommitInsertRequest.newBuilder()
+                .setDbId(db.getId())
+                .setTableId(table.getId())
+                .setBaseSchemaVersion(table.getBaseSchemaVersion())
+                
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
+                        .setRequest(execPlanFragmentParamsBytes)
+                        
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())
+                
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
+                .build()).addAllData(rows)
+                .build();
+        Future<PGroupCommitInsertResponse> future = 
BackendServiceProxy.getInstance()
+                .groupCommitInsert(new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort()), request);
+        return future;
+    }
+
+    // only for nereids use
+    public static InternalService.PDataRow getRowStringValue(List<Expr> cols, 
int filterSize) throws UserException {
+        if (cols.isEmpty()) {
+            return null;
+        }
+        InternalService.PDataRow.Builder row = 
InternalService.PDataRow.newBuilder();
+        try {
+            List<Expr> exprs = cols.subList(0, cols.size() - filterSize);
+            for (Expr expr : exprs) {
+                if (!expr.isLiteralOrCastExpr() && !(expr instanceof 
CastExpr)) {
+                    if (expr.getChildren().get(0) instanceof NullLiteral) {
+                        row.addColBuilder().setValue("\\N");
+                        continue;
+                    }
+                    throw new UserException(
+                        "do not support non-literal expr in transactional 
insert operation: " + expr.toSql());
+                }
+                if (expr instanceof NullLiteral) {
+                    row.addColBuilder().setValue("\\N");
+                } else if (expr.getType() instanceof ArrayType) {
+                    
row.addColBuilder().setValue(expr.getStringValueForArray());
+                } else if (!expr.getChildren().isEmpty()) {
+                    expr.getChildren().forEach(child -> processExprVal(child, 
row));
+                } else {
+                    row.addColBuilder().setValue(expr.getStringValue());
+                }
+            }
+        } catch (UserException e) {
+            throw new RuntimeException(e);
+        }
+        return row.build();
+    }
+
+    private static void processExprVal(Expr expr, 
InternalService.PDataRow.Builder row) {
+        if (expr.getChildren().isEmpty()) {
+            row.addColBuilder().setValue(expr.getStringValue());
+            return;
+        }
+        for (Expr child : expr.getChildren()) {
+            processExprVal(child, row);
+        }
+    }
+
+    public Backend getBackend() {
+        return backend;
+    }
+
+    public TExecPlanFragmentParamsList getParamsList() {
+        return paramsList;
+    }
+
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5656ce9d4ff..5fa8a8205cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -126,13 +126,13 @@ import 
org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.Forward;
 import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.OriginalPlanner;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
-import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
 import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
 import org.apache.doris.proto.Types;
 import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
@@ -145,19 +145,16 @@ import org.apache.doris.resource.workloadgroup.QueryQueue;
 import org.apache.doris.resource.workloadgroup.QueueOfferToken;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
-import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.ResultRow;
 import org.apache.doris.statistics.util.InternalQueryBuffer;
-import org.apache.doris.system.Backend;
 import org.apache.doris.task.LoadEtlTask;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TLoadTxnBeginRequest;
 import org.apache.doris.thrift.TLoadTxnBeginResult;
 import org.apache.doris.thrift.TMergeType;
-import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TResultBatch;
@@ -1828,19 +1825,9 @@ public class StmtExecutor {
             txnId = context.getTxnEntry().getTxnConf().getTxnId();
         } else if (insertStmt instanceof NativeInsertStmt && 
((NativeInsertStmt) insertStmt).isGroupCommit()) {
             NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
-            Backend backend = 
context.getInsertGroupCommit(insertStmt.getTargetTable().getId());
-            if (backend == null || !backend.isAlive()) {
-                List<Long> allBackendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-                if (allBackendIds.isEmpty()) {
-                    throw new DdlException("No alive backend");
-                }
-                Collections.shuffle(allBackendIds);
-                backend = 
Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0));
-                
context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend);
-            }
             int maxRetry = 3;
             for (int i = 0; i < maxRetry; i++) {
-                nativeInsertStmt.planForGroupCommit(context.queryId);
+                GroupCommitPlanner groupCommitPlanner = 
nativeInsertStmt.planForGroupCommit(context.queryId);
                 // handle rows
                 List<InternalService.PDataRow> rows = new ArrayList<>();
                 SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt();
@@ -1861,23 +1848,15 @@ public class StmtExecutor {
                     InternalService.PDataRow data = 
getRowStringValue(exprList);
                     rows.add(data);
                 }
-                TUniqueId loadId = nativeInsertStmt.getLoadId();
-                PGroupCommitInsertRequest request = 
PGroupCommitInsertRequest.newBuilder()
-                        
.setDbId(insertStmt.getTargetTable().getDatabase().getId())
-                        .setTableId(insertStmt.getTargetTable().getId())
-                        
.setBaseSchemaVersion(nativeInsertStmt.getBaseSchemaVersion())
-                        
.setExecPlanFragmentRequest(nativeInsertStmt.getExecPlanFragmentRequest())
-                        
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
-                                .build()).addAllData(rows)
-                        .build();
-                Future<PGroupCommitInsertResponse> future = 
BackendServiceProxy.getInstance()
-                        .groupCommitInsert(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                Future<PGroupCommitInsertResponse> future = groupCommitPlanner
+                        .executeGroupCommitInsert(context, rows);
                 PGroupCommitInsertResponse response = future.get();
                 TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
                 if (code == TStatusCode.DATA_QUALITY_ERROR) {
                     LOG.info("group commit insert failed. stmt: {}, backend 
id: {}, status: {}, "
                                     + "schema version: {}, retry: {}", 
insertStmt.getOrigStmt().originStmt,
-                            backend.getId(), response.getStatus(), 
nativeInsertStmt.getBaseSchemaVersion(), i);
+                            groupCommitPlanner.getBackend().getId(),
+                            response.getStatus(), 
nativeInsertStmt.getBaseSchemaVersion(), i);
                     if (i < maxRetry) {
                         List<TableIf> tables = 
Lists.newArrayList(insertStmt.getTargetTable());
                         MetaLockUtils.readLockTables(tables);
@@ -1890,12 +1869,12 @@ public class StmtExecutor {
                         }
                         continue;
                     } else {
-                        errMsg = "group commit insert failed. backend id: " + 
backend.getId() + ", status: "
-                                + response.getStatus();
+                        errMsg = "group commit insert failed. backend id: "
+                                + groupCommitPlanner.getBackend().getId() + ", 
status: " + response.getStatus();
                     }
                 } else if (code != TStatusCode.OK) {
-                    errMsg = "group commit insert failed. backend id: " + 
backend.getId() + ", status: "
-                            + response.getStatus();
+                    errMsg = "group commit insert failed. backend id: " + 
groupCommitPlanner.getBackend().getId()
+                            + ", status: " + response.getStatus();
                     ErrorReport.reportDdlException(errMsg, 
ErrorCode.ERR_FAILED_WHEN_INSERT);
                 }
                 label = response.getLabel();
diff --git a/regression-test/data/insert_p0/insert_group_commit_into.out 
b/regression-test/data/insert_p0/insert_group_commit_into.out
index 97fc1897552..dfb3a3b41c0 100644
--- a/regression-test/data/insert_p0/insert_group_commit_into.out
+++ b/regression-test/data/insert_p0/insert_group_commit_into.out
@@ -94,3 +94,98 @@ q    50
 -- !sql --
 0      service_46da0dab-e27d-4820-aea2-9bfc15741615    1697032066304   0       
3229b7cd-f3a2-4359-aa24-946388c9cc54    0       
CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm
 [...]
 
+-- !sql --
+1      a       10
+2      b       -1
+3      c       -1
+4      \N      -1
+5      q       50
+6      \N      -1
+
+-- !sql --
+1      a       10
+1      a       10
+2      b       -1
+2      b       -1
+3      c       -1
+3      c       -1
+4      e1      -1
+5      q       50
+5      q       50
+6      \N      -1
+6      \N      -1
+
+-- !sql --
+1      a       \N      10
+1      a       \N      10
+1      a       \N      10
+2      b       \N      -1
+2      b       \N      -1
+2      b       \N      -1
+3      c       \N      -1
+3      c       \N      -1
+3      c       \N      -1
+4      \N      \N      -1
+4      e1      \N      -1
+5      q       \N      50
+5      q       \N      50
+5      q       \N      50
+6      \N      \N      -1
+6      \N      \N      -1
+6      \N      \N      -1
+
+-- !sql --
+2      b       \N      -1
+6      \N      \N      -1
+
+-- !sql --
+1      a       10      5
+2      b       -1      \N
+2      b       -1      \N
+3      c       -1      \N
+4      \N      -1      \N
+5      q       50      6
+6      \N      -1      \N
+6      \N      -1      \N
+
+-- !sql --
+1      a       10
+1      a       10
+2      b       -1
+2      b       -1
+2      b       -1
+3      c       -1
+3      c       -1
+4      \N      -1
+4      \N      -1
+5      q       50
+5      q       50
+6      \N      -1
+6      \N      -1
+6      \N      -1
+
+-- !sql --
+\N     -1
+\N     -1
+\N     -1
+\N     -1
+\N     -1
+\N     -1
+\N     -1
+a      10
+a      10
+a      10
+b      -1
+b      -1
+b      -1
+b      -1
+c      -1
+c      -1
+c      -1
+q      50
+q      50
+q      50
+
+-- !sql --
+0      service_46da0dab-e27d-4820-aea2-9bfc15741615    1697032066304   0       
3229b7cd-f3a2-4359-aa24-946388c9cc54    0       
CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm
 [...]
+
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 26cc9ca9776..bea130d1067 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -77,173 +77,193 @@ suite("insert_group_commit_into") {
         assertTrue(serverInfo.contains("'status':'VISIBLE'"))
         assertTrue(!serverInfo.contains("'label':'group_commit_"))
     }
+    for (item in ["legacy", "nereids"]) {
+        try {
+            // create table
+            sql """ drop table if exists ${table}; """
 
-    try {
-        // create table
-        sql """ drop table if exists ${table}; """
-
-        sql """
-        CREATE TABLE ${table} (
-            `id` int(11) NOT NULL,
-            `name` varchar(50) NULL,
-            `score` int(11) NULL default "-1"
-        ) ENGINE=OLAP
-        DUPLICATE KEY(`id`, `name`)
-        PARTITION BY RANGE(id)
-        (
-            FROM (1) TO (100) INTERVAL 10
-        )
-        DISTRIBUTED BY HASH(`id`) BUCKETS 1
-        PROPERTIES (
-            "replication_num" = "1"
-        );
-        """
-
-        connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
-            sql """ set enable_insert_group_commit = true; """
-            // TODO
-            sql """ set enable_nereids_dml = false; """
-
-            // 1. insert into
-            group_commit_insert """ insert into ${table}(name, id) values('c', 
3);  """, 1
-            group_commit_insert """ insert into ${table}(id) values(4);  """, 1
-            group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50); """, 2
-            group_commit_insert """ insert into ${table}(id, name) values(2, 
'b'); """, 1
-            group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
-            getRowCount(6)
-            qt_sql """ select * from ${table} order by id, name, score asc; """
-
-            // 2. insert into and delete
-            sql """ delete from ${table} where id = 4; """
-            group_commit_insert """ insert into ${table}(name, id) values('c', 
3); """, 1
-            /*sql """ insert into ${table}(id, name) values(4, 'd1');  """
-            sql """ insert into ${table}(id, name) values(4, 'd1');  """
-            sql """ delete from ${table} where id = 4; """*/
-            group_commit_insert """ insert into ${table}(id, name) values(4, 
'e1'); """, 1
-            group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50); """, 2
-            group_commit_insert """ insert into ${table}(id, name) values(2, 
'b'); """, 1
-            group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
-            getRowCount(11)
-            qt_sql """ select * from ${table} order by id, name, score asc; """
-
-            // 3. insert into and light schema change: add column
-            group_commit_insert """ insert into ${table}(name, id) values('c', 
3);  """, 1
-            group_commit_insert """ insert into ${table}(id) values(4);  """, 1
-            group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50);  """, 2
-            sql """ alter table ${table} ADD column age int after name; """
-            group_commit_insert """ insert into ${table}(id, name) values(2, 
'b');  """, 1
-            group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
-            assertTrue(getAlterTableState(), "add column should success")
-            getRowCount(17)
-            qt_sql """ select * from ${table} order by id, name,score asc; """
-
-            // 4. insert into and truncate table
-            /*sql """ insert into ${table}(name, id) values('c', 3);  """
-            sql """ insert into ${table}(id) values(4);  """
-            sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 
50);  """*/
-            sql """ truncate table ${table}; """
-            group_commit_insert """ insert into ${table}(id, name) values(2, 
'b');  """, 1
-            group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
-            getRowCount(2)
-            qt_sql """ select * from ${table} order by id, name, score asc; """
-
-            // 5. insert into and schema change: modify column order
-            group_commit_insert """ insert into ${table}(name, id) values('c', 
3);  """, 1
-            group_commit_insert """ insert into ${table}(id) values(4);  """, 1
-            group_commit_insert """ insert into ${table} values (1, 'a', 5, 
10),(5, 'q', 6, 50);  """, 2
-            // sql """ alter table ${table} order by (id, name, score, age); 
"""
-            group_commit_insert """ insert into ${table}(id, name) values(2, 
'b');  """, 1
-            group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
-            // assertTrue(getAlterTableState(), "modify column order should 
success")
-            getRowCount(8)
-            qt_sql """ select id, name, score, age from ${table} order by id, 
name, score asc; """
-
-            // 6. insert into and light schema change: drop column
-            group_commit_insert """ insert into ${table}(name, id) values('c', 
3);  """, 1
-            group_commit_insert """ insert into ${table}(id) values(4);  """, 1
-            group_commit_insert """ insert into ${table} values (1, 'a', 5, 
10),(5, 'q', 6, 50);  """, 2
-            sql """ alter table ${table} DROP column age; """
-            group_commit_insert """ insert into ${table}(id, name) values(2, 
'b');  """, 1
-            group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
-            assertTrue(getAlterTableState(), "drop column should success")
-            getRowCount(14)
-            qt_sql """ select * from ${table} order by id, name, score asc; """
-
-            // 7. insert into and add rollup
-            group_commit_insert """ insert into ${table}(name, id) values('c', 
3);  """, 1
-            group_commit_insert """ insert into ${table}(id) values(4);  """, 1
-            group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50),(101, 'a', 100);  """, 2
-            // sql """ alter table ${table} ADD ROLLUP r1(name, score); """
-            group_commit_insert """ insert into ${table}(id, name) values(2, 
'b');  """, 1
-            group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
-            getRowCount(20)
-            qt_sql """ select name, score from ${table} order by name asc; """
-
-            none_group_commit_insert """ insert into ${table}(id, name, score) 
values(10 + 1, 'h', 100);  """, 1
-            none_group_commit_insert """ insert into ${table}(id, name, score) 
select 10 + 2, 'h', 100;  """, 1
-            none_group_commit_insert """ insert into ${table} with label 
test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 
'h', 100);  """, 1
-            def rowCount = sql "select count(*) from ${table}"
-            logger.info("row count: " + rowCount)
-            assertEquals(rowCount[0][0], 23)
+            sql """
+            CREATE TABLE ${table} (
+                `id` int(11) NOT NULL,
+                `name` varchar(50) NULL,
+                `score` int(11) NULL default "-1"
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`id`, `name`)
+            PARTITION BY RANGE(id)
+            (
+                FROM (1) TO (100) INTERVAL 10
+            )
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+            """
+
+            connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+                sql """ set enable_insert_group_commit = true; """
+                if (item == "nereids") {
+                    sql """ set enable_nereids_dml = true; """
+                    sql """ set enable_nereids_planner=true; """
+                    sql """ set enable_fallback_to_original_planner=false; """
+                } else {
+                    sql """ set enable_nereids_dml = false; """
+                }
+
+                // 1. insert into
+                group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
+                group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
+                group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50); """, 2
+                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b'); """, 1
+                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+
+                getRowCount(6)
+                qt_sql """ select * from ${table} order by id, name, score 
asc; """
+
+                // 2. insert into and delete
+                sql """ delete from ${table} where id = 4; """
+                group_commit_insert """ insert into ${table}(name, id) 
values('c', 3); """, 1
+                /*sql """ insert into ${table}(id, name) values(4, 'd1');  """
+                sql """ insert into ${table}(id, name) values(4, 'd1');  """
+                sql """ delete from ${table} where id = 4; """*/
+                group_commit_insert """ insert into ${table}(id, name) 
values(4, 'e1'); """, 1
+                group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50); """, 2
+                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b'); """, 1
+                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+
+                getRowCount(11)
+                qt_sql """ select * from ${table} order by id, name, score 
asc; """
+
+                // 3. insert into and light schema change: add column
+                group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
+                group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
+                group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50);  """, 2
+                sql """ alter table ${table} ADD column age int after name; """
+                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
+                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+
+                assertTrue(getAlterTableState(), "add column should success")
+                getRowCount(17)
+                qt_sql """ select * from ${table} order by id, name,score asc; 
"""
+
+                // 4. insert into and truncate table
+                /*sql """ insert into ${table}(name, id) values('c', 3);  """
+                sql """ insert into ${table}(id) values(4);  """
+                sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 
6, 50);  """*/
+                sql """ truncate table ${table}; """
+                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
+                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+
+                getRowCount(2)
+                qt_sql """ select * from ${table} order by id, name, score 
asc; """
+
+                // 5. insert into and schema change: modify column order
+                group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
+                group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
+                group_commit_insert """ insert into ${table} values (1, 'a', 
5, 10),(5, 'q', 6, 50);  """, 2
+                // sql """ alter table ${table} order by (id, name, score, 
age); """
+                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
+                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+
+                // assertTrue(getAlterTableState(), "modify column order 
should success")
+                getRowCount(8)
+                qt_sql """ select id, name, score, age from ${table} order by 
id, name, score asc; """
+
+                // 6. insert into and light schema change: drop column
+                group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
+                group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
+                group_commit_insert """ insert into ${table} values (1, 'a', 
5, 10),(5, 'q', 6, 50);  """, 2
+                sql """ alter table ${table} DROP column age; """
+                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
+                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+
+                assertTrue(getAlterTableState(), "drop column should success")
+                getRowCount(14)
+                qt_sql """ select * from ${table} order by id, name, score 
asc; """
+
+                // 7. insert into and add rollup
+                group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
+                group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
+                group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50),(101, 'a', 100);  """, 2
+                // sql """ alter table ${table} ADD ROLLUP r1(name, score); """
+                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
+                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+
+                getRowCount(20)
+                qt_sql """ select name, score from ${table} order by name asc; 
"""
+
+
+                if (item == "nereids") {
+                    group_commit_insert """ insert into ${table}(id, name, 
score) values(10 + 1, 'h', 100);  """, 1
+                    group_commit_insert """ insert into ${table}(id, name, 
score) select 10 + 2, 'h', 100;  """, 1
+                    group_commit_insert """ insert into ${table} with label 
test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 
'h', 100);  """, 1
+                    getRowCount(23)
+                } else {
+                    none_group_commit_insert """ insert into ${table}(id, 
name, score) values(10 + 1, 'h', 100);  """, 1
+                    none_group_commit_insert """ insert into ${table}(id, 
name, score) select 10 + 2, 'h', 100;  """, 1
+                    none_group_commit_insert """ insert into ${table} with 
label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) 
values(13, 'h', 100);  """, 1
+                }
+
+                def rowCount = sql "select count(*) from ${table}"
+                logger.info("row count: " + rowCount)
+                assertEquals(rowCount[0][0], 23)
+            }
+        } finally {
+            // try_sql("DROP TABLE ${table}")
         }
-    } finally {
-        // try_sql("DROP TABLE ${table}")
-    }
 
-    // table with array type
-    tableName = "insert_group_commit_into_duplicate_array"
-    table = dbName + "." + tableName
-    try {
-        // create table
-        sql """ drop table if exists ${table}; """
-
-        sql """
-        CREATE table ${table} (
-            teamID varchar(255),
-            service_id varchar(255),
-            start_time BigInt,
-            time_bucket BigInt ,
-            segment_id String ,
-            trace_id String ,
-            data_binary String ,
-            end_time BigInt ,
-            endpoint_id String ,
-            endpoint_name String ,
-            is_error Boolean ,
-            latency Int ,
-            service_instance_id String ,
-            statement String ,
-            tags Array<String>
-        ) UNIQUE key (`teamID`,`service_id`, `start_time`)
-        DISTRIBUTED BY hash(`start_time`)
-        BUCKETS 1
-        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
-        """
-
-        connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
-            sql """ set enable_insert_group_commit = true; """
-            // TODO
-            sql """ set enable_nereids_dml = false; """
-
-            // 1. insert into
-            group_commit_insert """ 
-            INSERT INTO ${table} (`data_binary`, `end_time`, `endpoint_id`, 
`endpoint_name`, `is_error`, `latency`, `segment_id`, `service_id`, 
`service_instance_id`, `start_time`, `statement`, `tags`, `teamID`, 
`time_bucket`, `trace_id`) 
-            VALUES 
-            
('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2
 [...]
-            1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411', 
'355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3, 
'3229b7cd-f3a2-4359-aa24-946388c9cc54', 
'service_46da0dab-e27d-4820-aea2-9bfc15741615', 
'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304, 
'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5, 
tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, 
tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_11=t 
[...]
-            """, 1
-
-            getRowCount(1)
-            qt_sql """ select * from ${table}; """
+        // table with array type
+        tableName = "insert_group_commit_into_duplicate_array"
+        table = dbName + "." + tableName
+        try {
+            // create table
+            sql """ drop table if exists ${table}; """
+
+            sql """
+            CREATE table ${table} (
+                teamID varchar(255),
+                service_id varchar(255),
+                start_time BigInt,
+                time_bucket BigInt ,
+                segment_id String ,
+                trace_id String ,
+                data_binary String ,
+                end_time BigInt ,
+                endpoint_id String ,
+                endpoint_name String ,
+                is_error Boolean ,
+                latency Int ,
+                service_instance_id String ,
+                statement String ,
+                tags Array<String>
+            ) UNIQUE key (`teamID`,`service_id`, `start_time`)
+            DISTRIBUTED BY hash(`start_time`)
+            BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+            """
+
+            connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+                sql """ set enable_insert_group_commit = true; """
+                if (item == "nereids") {
+                    sql """ set enable_nereids_dml = true; """
+                    sql """ set enable_nereids_planner=true; """
+                    sql """ set enable_fallback_to_original_planner=false; """
+                } else {
+                    sql """ set enable_nereids_dml = false; """
+                }
+
+                // 1. insert into
+                group_commit_insert """ 
+                INSERT INTO ${table} (`data_binary`, `end_time`, 
`endpoint_id`, `endpoint_name`, `is_error`, `latency`, `segment_id`, 
`service_id`, `service_instance_id`, `start_time`, `statement`, `tags`, 
`teamID`, `time_bucket`, `trace_id`) 
+                VALUES 
+                
('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdG
 [...]
+                1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411', 
'355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3, 
'3229b7cd-f3a2-4359-aa24-946388c9cc54', 
'service_46da0dab-e27d-4820-aea2-9bfc15741615', 
'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304, 
'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5, 
tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, 
tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_ [...]
+                """, 1
+
+                getRowCount(1)
+                qt_sql """ select * from ${table}; """
+            }
+        } finally {
+            // try_sql("DROP TABLE ${table}")
         }
-    } finally {
-        // try_sql("DROP TABLE ${table}")
     }
 }
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
index 3eeaeb797f2..b2c2edb204d 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
@@ -52,231 +52,269 @@ suite("insert_group_commit_with_exception") {
         }
         return false
     }
-
-    try {
-        // create table
-        sql """ drop table if exists ${table}; """
-
-        sql """
-        CREATE TABLE `${table}` (
-            `id` int(11) NOT NULL,
-            `name` varchar(1100) NULL,
-            `score` int(11) NULL default "-1"
-        ) ENGINE=OLAP
-        DUPLICATE KEY(`id`, `name`)
-        DISTRIBUTED BY HASH(`id`) BUCKETS 1
-        PROPERTIES (
-            "replication_num" = "1"
-        );
-        """
-
-        sql """ set enable_insert_group_commit = true; """
-        // TODO
-        sql """ set enable_nereids_dml = false; """
-
-        // insert into without column
-        try {
-            def result = sql """ insert into ${table} values(1, 'a', 10, 100)  
"""
-            assertTrue(false)
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Column count doesn't match 
value count"))
-        }
-
-        try {
-            def result = sql """ insert into ${table} values(2, 'b')  """
-            assertTrue(false)
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Column count doesn't match 
value count"))
-        }
-
-        result = sql """ insert into ${table} values(3, 'c', 30)  """
-        logger.info("insert result: " + result)
-
-        // insert into with column
-        result = sql """ insert into ${table}(id, name) values(4, 'd')  """
-        logger.info("insert result: " + result)
-
-        getRowCount(2)
-
-        try {
-            result = sql """ insert into ${table}(id, name) values(5, 'd', 50) 
 """
-            assertTrue(false)
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Column count doesn't match 
value count"))
-        }
-
-        try {
-            result = sql """ insert into ${table}(id, name) values(6)  """
-            assertTrue(false)
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Column count doesn't match 
value count"))
-        }
-
+    for (item in ["legacy", "nereids"]) {
         try {
-            result = sql """ insert into ${table}(id, names) values(7, 'd')  
"""
-            assertTrue(false)
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Unknown column 'names'"))
-        }
-
+            // create table
+            sql """ drop table if exists ${table}; """
+
+            sql """
+            CREATE TABLE `${table}` (
+                `id` int(11) NOT NULL,
+                `name` varchar(1100) NULL,
+                `score` int(11) NULL default "-1"
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`id`, `name`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+            """
+
+            sql """ set enable_insert_group_commit = true; """
+            if (item == "nereids") {
+                sql """ set enable_nereids_dml = true; """
+                sql """ set enable_nereids_planner=true; """
+                sql """ set enable_fallback_to_original_planner=false; """
+            } else {
+                sql """ set enable_nereids_dml = false; """
+            }
 
-        // prepare insert
-        def db = context.config.defaultDb + "_insert_p0"
-        String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
-
-        try (Connection connection = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword)) {
-            Statement statement = connection.createStatement();
-            statement.execute("use ${db}");
-            statement.execute("set enable_insert_group_commit = true;");
-            // without column
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table} values(?, ?, ?, ?)")) {
-                ps.setObject(1, 8);
-                ps.setObject(2, "f");
-                ps.setObject(3, 70);
-                ps.setObject(4, "a");
-                ps.addBatch();
-                int[] result = ps.executeBatch();
+            // insert into without column
+            try {
+                def result = sql """ insert into ${table} values(1, 'a', 10, 
100)  """
                 assertTrue(false)
             } catch (Exception e) {
-                assertTrue(e.getMessage().contains("Column count doesn't match 
value count"))
+                if (item == "nereids") {
+                    assertTrue(e.getMessage().contains("insert into cols 
should be corresponding to the query output"))
+                } else {
+                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                }
             }
 
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table} values(?, ?)")) {
-                ps.setObject(1, 9);
-                ps.setObject(2, "f");
-                ps.addBatch();
-                int[] result = ps.executeBatch();
+            try {
+                def result = sql """ insert into ${table} values(2, 'b')  """
                 assertTrue(false)
             } catch (Exception e) {
-                assertTrue(e.getMessage().contains("Column count doesn't match 
value count"))
+                if (item == "nereids") {
+                    assertTrue(e.getMessage().contains("insert into cols 
should be corresponding to the query output"))
+                } else {
+                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                }
             }
 
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table} values(?, ?, ?)")) {
-                ps.setObject(1, 10);
-                ps.setObject(2, "f");
-                ps.setObject(3, 90);
-                ps.addBatch();
-                int[] result = ps.executeBatch();
-                logger.info("prepare insert result: " + result)
-            }
+            result = sql """ insert into ${table} values(3, 'c', 30)  """
+            logger.info("insert result: " + result)
 
-            // with columns
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table}(id, name) values(?, ?)")) {
-                ps.setObject(1, 11);
-                ps.setObject(2, "f");
-                ps.addBatch();
-                int[] result = ps.executeBatch();
-                logger.info("prepare insert result: " + result)
-            }
+            // insert into with column
+            result = sql """ insert into ${table}(id, name) values(4, 'd')  """
+            logger.info("insert result: " + result)
+
+            getRowCount(2)
 
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table}(id, name) values(?, ?, ?)")) {
-                ps.setObject(1, 12);
-                ps.setObject(2, "f");
-                ps.setObject(3, "f");
-                ps.addBatch();
-                int[] result = ps.executeBatch();
+            try {
+                result = sql """ insert into ${table}(id, name) values(5, 'd', 
50)  """
                 assertTrue(false)
             } catch (Exception e) {
-                assertTrue(e.getMessage().contains("Column count doesn't match 
value count"))
+                if (item == "nereids") {
+                    assertTrue(e.getMessage().contains("insert into cols 
should be corresponding to the query output"))
+                } else {
+                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                }
             }
 
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table}(id, name) values(?)")) {
-                ps.setObject(1, 13);
-                ps.addBatch();
-                int[] result = ps.executeBatch();
+            try {
+                result = sql """ insert into ${table}(id, name) values(6)  """
                 assertTrue(false)
             } catch (Exception e) {
-                assertTrue(e.getMessage().contains("Column count doesn't match 
value count"))
+                if (item == "nereids") {
+                    assertTrue(e.getMessage().contains("insert into cols 
should be corresponding to the query output"))
+                } else {
+                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                }
             }
 
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table}(id, names) values(?, ?)")) {
-                ps.setObject(1, 12);
-                ps.setObject(2, "f");
-                ps.addBatch();
-                int[] result = ps.executeBatch();
+            try {
+                result = sql """ insert into ${table}(id, names) values(7, 
'd')  """
                 assertTrue(false)
             } catch (Exception e) {
-                assertTrue(e.getMessage().contains("Unknown column 'names'"))
+                if (item == "nereids") {
+                    assertTrue(e.getMessage().contains("column names is not 
found in table"))
+                } else {
+                    assertTrue(e.getMessage().contains("Unknown column 
'names'"))
+                }
             }
 
-            getRowCount(4)
 
-            // prepare insert with multi rows
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table} values(?, ?, ?)")) {
-                for (int i = 0; i < 5; i++) {
-                    ps.setObject(1, 13 + i);
+            // prepare insert
+            def db = context.config.defaultDb + "_insert_p0"
+            String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
+
+            if (item == "nereids") {
+                println("nereids does not support prepare insert");
+                continue;
+            };
+
+            try (Connection connection = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword)) {
+                Statement statement = connection.createStatement();
+                statement.execute("use ${db}");
+                statement.execute("set enable_insert_group_commit = true;");
+                if (item == "nereids") {
+                    statement.execute("set enable_nereids_dml = true;");
+                    statement.execute("set enable_nereids_planner=true;");
+                    statement.execute("set 
enable_fallback_to_original_planner=false;");
+                } else {
+                    statement.execute("set enable_nereids_dml = false;");
+                }
+                // without column
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) {
+                    ps.setObject(1, 8);
+                    ps.setObject(2, "f");
+                    ps.setObject(3, 70);
+                    ps.setObject(4, "a");
+                    ps.addBatch();
+                    int[] result = ps.executeBatch();
+                    assertTrue(false)
+                } catch (Exception e) {
+                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                }
+
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table} values(?, ?)")) {
+                    ps.setObject(1, 9);
+                    ps.setObject(2, "f");
+                    ps.addBatch();
+                    int[] result = ps.executeBatch();
+                    assertTrue(false)
+                } catch (Exception e) {
+                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                }
+
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table} values(?, ?, ?)")) {
+                    ps.setObject(1, 10);
                     ps.setObject(2, "f");
                     ps.setObject(3, 90);
                     ps.addBatch();
                     int[] result = ps.executeBatch();
                     logger.info("prepare insert result: " + result)
                 }
-            }
-            getRowCount(9)
 
-            // prepare insert with multi rows
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table} values(?, ?, ?),(?, ?, ?)")) {
-                for (int i = 0; i < 2; i++) {
-                    ps.setObject(1, 18 + i);
+                // with columns
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table}(id, name) values(?, ?)")) {
+                    ps.setObject(1, 11);
                     ps.setObject(2, "f");
-                    ps.setObject(3, 90);
-                    ps.setObject(4, 18 + i + 1);
-                    ps.setObject(5, "f");
-                    ps.setObject(6, 90);
                     ps.addBatch();
                     int[] result = ps.executeBatch();
                     logger.info("prepare insert result: " + result)
                 }
-            }
-            getRowCount(13)
-
-            // prepare insert without column names, and do schema change
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table} values(?, ?, ?)")) {
-                ps.setObject(1, 22)
-                ps.setObject(2, "f")
-                ps.setObject(3, 90)
-                ps.addBatch()
-                int[] result = ps.executeBatch()
-                logger.info("prepare insert result: " + result)
-
-                sql """ alter table ${table} ADD column age int after name; """
-                assertTrue(getAlterTableState(), "add column should success")
-
-                try {
-                    ps.setObject(1, 23)
+
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table}(id, name) values(?, ?, ?)")) {
+                    ps.setObject(1, 12);
+                    ps.setObject(2, "f");
+                    ps.setObject(3, "f");
+                    ps.addBatch();
+                    int[] result = ps.executeBatch();
+                    assertTrue(false)
+                } catch (Exception e) {
+                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                }
+
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table}(id, name) values(?)")) {
+                    ps.setObject(1, 13);
+                    ps.addBatch();
+                    int[] result = ps.executeBatch();
+                    assertTrue(false)
+                } catch (Exception e) {
+                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                }
+
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table}(id, names) values(?, ?)")) {
+                    ps.setObject(1, 12);
+                    ps.setObject(2, "f");
+                    ps.addBatch();
+                    int[] result = ps.executeBatch();
+                    assertTrue(false)
+                } catch (Exception e) {
+                    assertTrue(e.getMessage().contains("Unknown column 
'names'"))
+                }
+
+                getRowCount(4)
+
+                // prepare insert with multi rows
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table} values(?, ?, ?)")) {
+                    for (int i = 0; i < 5; i++) {
+                        ps.setObject(1, 13 + i);
+                        ps.setObject(2, "f");
+                        ps.setObject(3, 90);
+                        ps.addBatch();
+                        int[] result = ps.executeBatch();
+                        logger.info("prepare insert result: " + result)
+                    }
+                }
+                getRowCount(9)
+
+                // prepare insert with multi rows
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table} values(?, ?, ?),(?, ?, ?)")) {
+                    for (int i = 0; i < 2; i++) {
+                        ps.setObject(1, 18 + i);
+                        ps.setObject(2, "f");
+                        ps.setObject(3, 90);
+                        ps.setObject(4, 18 + i + 1);
+                        ps.setObject(5, "f");
+                        ps.setObject(6, 90);
+                        ps.addBatch();
+                        int[] result = ps.executeBatch();
+                        logger.info("prepare insert result: " + result)
+                    }
+                }
+                getRowCount(13)
+
+                // prepare insert without column names, and do schema change
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table} values(?, ?, ?)")) {
+                    ps.setObject(1, 22)
+                    ps.setObject(2, "f")
+                    ps.setObject(3, 90)
+                    ps.addBatch()
+                    int[] result = ps.executeBatch()
+                    logger.info("prepare insert result: " + result)
+
+                    sql """ alter table ${table} ADD column age int after 
name; """
+                    assertTrue(getAlterTableState(), "add column should 
success")
+
+                    try {
+                        ps.setObject(1, 23)
+                        ps.setObject(2, "f")
+                        ps.setObject(3, 90)
+                        ps.addBatch()
+                        result = ps.executeBatch()
+                        assertTrue(false)
+                    } catch (Exception e) {
+                        assertTrue(e.getMessage().contains("Column count 
doesn't match value count"))
+                    }
+                }
+                getRowCount(14)
+
+                // prepare insert with column names, and do schema change
+                try (PreparedStatement ps = 
connection.prepareStatement("insert into ${table}(id, name, score) values(?, ?, 
?)")) {
+                    ps.setObject(1, 24)
+                    ps.setObject(2, "f")
+                    ps.setObject(3, 90)
+                    ps.addBatch()
+                    int[] result = ps.executeBatch()
+                    logger.info("prepare insert result: " + result)
+
+                    sql """ alter table ${table} DROP column age; """
+                    assertTrue(getAlterTableState(), "drop column should 
success")
+
+                    ps.setObject(1, 25)
                     ps.setObject(2, "f")
                     ps.setObject(3, 90)
                     ps.addBatch()
                     result = ps.executeBatch()
-                    assertTrue(false)
-                } catch (Exception e) {
-                    assertTrue(e.getMessage().contains("Column count doesn't 
match value count"))
+                    logger.info("prepare insert result: " + result)
                 }
+                getRowCount(16)
             }
-            getRowCount(14)
-
-            // prepare insert with column names, and do schema change
-            try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table}(id, name, score) values(?, ?, ?)")) {
-                ps.setObject(1, 24)
-                ps.setObject(2, "f")
-                ps.setObject(3, 90)
-                ps.addBatch()
-                int[] result = ps.executeBatch()
-                logger.info("prepare insert result: " + result)
-
-                sql """ alter table ${table} DROP column age; """
-                assertTrue(getAlterTableState(), "drop column should success")
-
-                ps.setObject(1, 25)
-                ps.setObject(2, "f")
-                ps.setObject(3, 90)
-                ps.addBatch()
-                result = ps.executeBatch()
-                logger.info("prepare insert result: " + result)
-            }
-            getRowCount(16)
+        } finally {
+            // try_sql("DROP TABLE ${table}")
         }
-    } finally {
-        // try_sql("DROP TABLE ${table}")
     }
 }
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
index 1805905a2a1..c28b18d3797 100644
--- 
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
+++ 
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
@@ -48,52 +48,59 @@ suite("insert_group_commit_with_large_data") {
         assertTrue(serverInfo.contains("'label':'group_commit_"))
     }
 
-    try {
-        // create table
-        sql """ drop table if exists ${table}; """
+    for (item in ["legacy", "nereids"]) { 
+        try {
+            // create table
+            sql """ drop table if exists ${table}; """
 
-        sql """
-        CREATE TABLE `${table}` (
-            `id` int(11) NOT NULL,
-            `name` varchar(1100) NULL,
-            `score` int(11) NULL default "-1"
-        ) ENGINE=OLAP
-        DUPLICATE KEY(`id`, `name`)
-        DISTRIBUTED BY HASH(`id`) BUCKETS 1
-        PROPERTIES (
-            "replication_num" = "1"
-        );
-        """
+            sql """
+            CREATE TABLE `${table}` (
+                `id` int(11) NOT NULL,
+                `name` varchar(1100) NULL,
+                `score` int(11) NULL default "-1"
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`id`, `name`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+            """
 
-        connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
-            sql """ set enable_insert_group_commit = true; """
-            // TODO
-            sql """ set enable_nereids_dml = false; """
-            sql """ use ${db}; """
+            connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+                sql """ set enable_insert_group_commit = true; """
+                if (item == "nereids") {
+                    sql """ set enable_nereids_dml = true; """
+                    sql """ set enable_nereids_planner=true; """
+                    sql """ set enable_fallback_to_original_planner=false; """
+                } else {
+                    sql """ set enable_nereids_dml = false; """
+                }
+                sql """ use ${db}; """
 
-            // insert into 5000 rows
-            def insert_sql = """ insert into ${table} values(1, 'a', 10)  """
-            for (def i in 2..5000) {
-                insert_sql += """, (${i}, 'a', 10) """
-            }
-            group_commit_insert insert_sql, 5000
-            getRowCount(5000)
+                // insert into 5000 rows
+                def insert_sql = """ insert into ${table} values(1, 'a', 10)  
"""
+                for (def i in 2..5000) {
+                    insert_sql += """, (${i}, 'a', 10) """
+                }
+                group_commit_insert insert_sql, 5000
+                getRowCount(5000)
 
-            // data size is large than 4MB, need " set global 
max_allowed_packet = 5508950 "
-            /*def name_value = ""
-            for (def i in 0..1024) {
-                name_value += 'a'
-            }
-            insert_sql = """ insert into ${table} values(1, '${name_value}', 
10)  """
-            for (def i in 2..5000) {
-                insert_sql += """, (${i}, '${name_value}', 10) """
+                // data size is large than 4MB, need " set global 
max_allowed_packet = 5508950 "
+                /*def name_value = ""
+                for (def i in 0..1024) {
+                    name_value += 'a'
+                }
+                insert_sql = """ insert into ${table} values(1, 
'${name_value}', 10)  """
+                for (def i in 2..5000) {
+                    insert_sql += """, (${i}, '${name_value}', 10) """
+                }
+                result = sql """ ${insert_sql} """
+                group_commit_insert insert_sql, 5000
+                getRowCount(10000)
+                */
             }
-            result = sql """ ${insert_sql} """
-            group_commit_insert insert_sql, 5000
-            getRowCount(10000)
-            */
+        } finally {
+            // try_sql("DROP TABLE ${table}")
         }
-    } finally {
-        // try_sql("DROP TABLE ${table}")
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to