This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 689391cb810 [fix](group commit) support full prepare of group commit 
in nereids (#46533) (#46742)
689391cb810 is described below

commit 689391cb810e5f1b1c059d9b74db2ac607288a6c
Author: meiyi <me...@selectdb.com>
AuthorDate: Mon Jan 13 21:20:53 2025 +0800

    [fix](group commit) support full prepare of group commit in nereids 
(#46533) (#46742)
    
    pick https://github.com/apache/doris/pull/46533
---
 .../apache/doris/analysis/NativeInsertStmt.java    |   7 +-
 .../doris/cloud/catalog/CloudEnvFactory.java       |  13 --
 .../cloud/planner/CloudGroupCommitPlanner.java     |  40 ----
 .../trees/plans/commands/ExecuteCommand.java       |  48 +++--
 .../commands/insert/InsertIntoTableCommand.java    |  32 ++++
 .../insert/OlapGroupCommitInsertExecutor.java      |  42 ++++-
 .../apache/doris/planner/GroupCommitPlanner.java   | 207 +++++++++++++++++----
 .../apache/doris/qe/PreparedStatementContext.java  |   2 +
 .../java/org/apache/doris/qe/SessionVariable.java  |   4 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |  10 +-
 .../insert_group_commit_with_exception.groovy      |   1 -
 .../insert_group_commit_with_prepare_stmt.groovy   |   4 +-
 .../transaction/txn_insert_inject_case.groovy      |   2 +-
 13 files changed, 284 insertions(+), 128 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 2b0dd56b0df..c42a8a5a275 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -154,7 +154,6 @@ public class NativeInsertStmt extends InsertStmt {
 
     // Used for group commit insert
     private boolean isGroupCommit = false;
-    private int baseSchemaVersion = -1;
     private TUniqueId loadId = null;
     private long tableId = -1;
     public boolean isGroupCommitStreamLoadSql = false;
@@ -1305,7 +1304,8 @@ public class NativeInsertStmt extends InsertStmt {
         OlapTable olapTable = (OlapTable) getTargetTable();
         olapTable.readLock();
         try {
-            if (groupCommitPlanner != null && olapTable.getBaseSchemaVersion() 
== baseSchemaVersion) {
+            if (groupCommitPlanner != null
+                    && olapTable.getBaseSchemaVersion() == 
groupCommitPlanner.baseSchemaVersion) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("reuse group commit plan, table={}", olapTable);
                 }
@@ -1323,7 +1323,6 @@ public class NativeInsertStmt extends InsertStmt {
                     targetColumnNames, queryId, 
ConnectContext.get().getSessionVariable().getGroupCommit());
             // save plan message to be reused for prepare stmt
             loadId = queryId;
-            baseSchemaVersion = olapTable.getBaseSchemaVersion();
             return groupCommitPlanner;
         } finally {
             olapTable.readUnlock();
@@ -1335,7 +1334,7 @@ public class NativeInsertStmt extends InsertStmt {
     }
 
     public int getBaseSchemaVersion() {
-        return baseSchemaVersion;
+        return groupCommitPlanner.baseSchemaVersion;
     }
 
     public void setIsFromDeleteOrUpdateStmt(boolean isFromDeleteOrUpdateStmt) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
index 32992307a8b..764ea49f329 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
@@ -22,11 +22,9 @@ import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.CloudTabletStatMgr;
-import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DynamicPartitionProperty;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ReplicaAllocation;
@@ -37,12 +35,10 @@ import org.apache.doris.cloud.load.CleanCopyJobScheduler;
 import org.apache.doris.cloud.load.CloudBrokerLoadJob;
 import org.apache.doris.cloud.load.CloudLoadManager;
 import org.apache.doris.cloud.load.CloudRoutineLoadManager;
-import org.apache.doris.cloud.planner.CloudGroupCommitPlanner;
 import org.apache.doris.cloud.qe.CloudCoordinator;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.datasource.InternalCatalog;
@@ -51,7 +47,6 @@ import org.apache.doris.load.loadv2.LoadJobScheduler;
 import org.apache.doris.load.loadv2.LoadManager;
 import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
-import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ScanNode;
@@ -62,8 +57,6 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.GlobalTransactionMgrIface;
 
-import org.apache.thrift.TException;
-
 import java.lang.reflect.Type;
 import java.util.List;
 import java.util.Map;
@@ -168,12 +161,6 @@ public class CloudEnvFactory extends EnvFactory {
                                 enableProfile);
     }
 
-    @Override
-    public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable 
table, List<String> targetColumnNames,
-            TUniqueId queryId, String groupCommit) throws UserException, 
TException {
-        return  new CloudGroupCommitPlanner(db, table, targetColumnNames, 
queryId, groupCommit);
-    }
-
     @Override
     public RoutineLoadManager createRoutineLoadManager() {
         return new CloudRoutineLoadManager();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
deleted file mode 100644
index 0388ca5c5d6..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cloud.planner;
-
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.GroupCommitPlanner;
-import org.apache.doris.thrift.TUniqueId;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-
-import java.util.List;
-
-public class CloudGroupCommitPlanner extends GroupCommitPlanner {
-    private static final Logger LOG = 
LogManager.getLogger(CloudGroupCommitPlanner.class);
-
-    public CloudGroupCommitPlanner(Database db, OlapTable table, List<String> 
targetColumnNames, TUniqueId queryId,
-            String groupCommit)
-            throws UserException, TException {
-        super(db, table, targetColumnNames, queryId, groupCommit);
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java
index 47ba6ed3f4c..c4031c0f9e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java
@@ -24,7 +24,9 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.PointQueryExecutor;
 import org.apache.doris.qe.PreparedStatementContext;
@@ -72,27 +74,35 @@ public class ExecuteCommand extends Command {
         executor.setParsedStmt(planAdapter);
         // If it's not a short circuit query or schema version is 
different(indicates schema changed) or
         // has nondeterministic functions in statement, then need to do 
reanalyze and plan
-        boolean isShortCircuit = 
executor.getContext().getStatementContext().isShortCircuitQuery();
-        boolean hasShortCircuitContext = 
preparedStmtCtx.shortCircuitQueryContext.isPresent();
-        boolean schemaVersionMismatch = hasShortCircuitContext
-                    && 
preparedStmtCtx.shortCircuitQueryContext.get().tbl.getBaseSchemaVersion()
-                    != 
preparedStmtCtx.shortCircuitQueryContext.get().schemaVersion;
-        boolean needAnalyze = !isShortCircuit || schemaVersionMismatch || 
!hasShortCircuitContext
-                        || 
executor.getContext().getStatementContext().hasNondeterministic();
-        if (needAnalyze) {
-            // execute real statement
-            preparedStmtCtx.shortCircuitQueryContext = Optional.empty();
-            statementContext.setShortCircuitQueryContext(null);
-            executor.execute();
-            if 
(executor.getContext().getStatementContext().isShortCircuitQuery()) {
-                // cache short-circuit plan
-                preparedStmtCtx.shortCircuitQueryContext = Optional.of(
-                        new ShortCircuitQueryContext(executor.planner(), 
(Queriable) executor.getParsedStmt()));
-                
statementContext.setShortCircuitQueryContext(preparedStmtCtx.shortCircuitQueryContext.get());
-            }
+        if (executor.getContext().getStatementContext().isShortCircuitQuery()
+                && preparedStmtCtx.shortCircuitQueryContext.isPresent()
+                && 
preparedStmtCtx.shortCircuitQueryContext.get().tbl.getBaseSchemaVersion()
+                == 
preparedStmtCtx.shortCircuitQueryContext.get().schemaVersion && 
!executor.getContext()
+                .getStatementContext().hasNondeterministic()) {
+            PointQueryExecutor.directExecuteShortCircuitQuery(executor, 
preparedStmtCtx, statementContext);
             return;
         }
-        PointQueryExecutor.directExecuteShortCircuitQuery(executor, 
preparedStmtCtx, statementContext);
+        if (ctx.getSessionVariable().enableGroupCommitFullPrepare) {
+            if (preparedStmtCtx.groupCommitPlanner.isPresent()) {
+                OlapGroupCommitInsertExecutor.fastAnalyzeGroupCommit(ctx, 
prepareCommand);
+            } else {
+                OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, 
prepareCommand);
+            }
+            if (ctx.isGroupCommit()) {
+                GroupCommitPlanner.executeGroupCommitInsert(ctx, 
preparedStmtCtx, statementContext);
+                return;
+            }
+        }
+        // execute real statement
+        preparedStmtCtx.shortCircuitQueryContext = Optional.empty();
+        statementContext.setShortCircuitQueryContext(null);
+        executor.execute();
+        if (executor.getContext().getStatementContext().isShortCircuitQuery()) 
{
+            // cache short-circuit plan
+            preparedStmtCtx.shortCircuitQueryContext = Optional.of(
+                    new ShortCircuitQueryContext(executor.planner(), 
(Queriable) executor.getParsedStmt()));
+            
statementContext.setShortCircuitQueryContext(preparedStmtCtx.shortCircuitQueryContext.get());
+        }
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 7b1121e5d14..3a0e7d7c7f3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -43,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.Command;
 import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
@@ -60,6 +61,7 @@ import org.apache.doris.qe.StmtExecutor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -312,6 +314,36 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         return !(logicalQuery instanceof UnboundTableSink);
     }
 
+    /**
+     * get the target table of the insert command
+     */
+    public TableIf getTable(ConnectContext ctx) throws Exception {
+        TableIf targetTableIf = 
InsertUtils.getTargetTable(originalLogicalQuery, ctx);
+        if (!Env.getCurrentEnv().getAccessManager()
+                .checkTblPriv(ConnectContext.get(), 
targetTableIf.getDatabase().getCatalog().getName(),
+                        targetTableIf.getDatabase().getFullName(), 
targetTableIf.getName(),
+                        PrivPredicate.LOAD)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
+                    ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
+                    targetTableIf.getDatabase().getFullName() + "." + 
targetTableIf.getName());
+        }
+        return targetTableIf;
+    }
+
+    /**
+     * get the target columns of the insert command
+     */
+    public List<String> getTargetColumns() {
+        if (originalLogicalQuery instanceof UnboundTableSink) {
+            UnboundLogicalSink<? extends Plan> unboundTableSink
+                    = (UnboundTableSink<? extends Plan>) originalLogicalQuery;
+            return CollectionUtils.isEmpty(unboundTableSink.getColNames()) ? 
null : unboundTableSink.getColNames();
+        } else {
+            throw new AnalysisException(
+                    "the root of plan should be [UnboundTableSink], but it is 
" + originalLogicalQuery.getType());
+        }
+    }
+
     @Override
     public Plan getExplainPlan(ConnectContext ctx) {
         Plan plan = InsertUtils.getPlanForExplain(ctx, this.logicalQuery);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index ed13d5dcb23..0387f308b15 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -33,6 +33,7 @@ import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
+import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
@@ -66,15 +67,40 @@ public class OlapGroupCommitInsertExecutor extends 
OlapInsertExecutor {
 
     /**
      * check if the sql can run in group commit mode
-     * @param logicalPlan plan of sql
      */
-    public static void analyzeGroupCommit(LogicalPlan logicalPlan) {
-        ConnectContext ctx = ConnectContext.get();
-        if (ctx.getSessionVariable().isEnableInsertGroupCommit() && 
logicalPlan instanceof InsertIntoTableCommand) {
-            LogicalPlan logicalQuery = ((InsertIntoTableCommand) 
logicalPlan).getLogicalQuery();
-            TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, 
ctx);
-            OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, 
targetTableIf, logicalQuery,
-                    Optional.empty());
+    public static void fastAnalyzeGroupCommit(ConnectContext ctx, LogicalPlan 
logicalPlan) {
+        try {
+            if (ctx.getSessionVariable().isEnableInsertGroupCommit() && 
!ctx.isTxnModel() && !ctx.getSessionVariable()
+                    .isEnableUniqueKeyPartialUpdate()) {
+                ctx.setGroupCommit(true);
+            }
+        } catch (Throwable e) {
+            LOG.warn("analyze group commit failed", e);
+        }
+    }
+
+    /**
+     * check if the sql can run in group commit mode
+     */
+    public static void analyzeGroupCommit(ConnectContext ctx, LogicalPlan 
logicalPlan) {
+        try {
+            if (ctx.isGroupCommit()) {
+                return;
+            }
+            if (!ctx.getSessionVariable().isEnableInsertGroupCommit()) {
+                return;
+            }
+            if (logicalPlan instanceof PrepareCommand) {
+                logicalPlan = ((PrepareCommand) logicalPlan).getLogicalPlan();
+            }
+            if (logicalPlan instanceof InsertIntoTableCommand) {
+                LogicalPlan logicalQuery = ((InsertIntoTableCommand) 
logicalPlan).getLogicalQuery();
+                TableIf targetTableIf = 
InsertUtils.getTargetTable(logicalQuery, ctx);
+                OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, 
targetTableIf, logicalQuery,
+                        Optional.empty());
+            }
+        } catch (Throwable e) {
+            LOG.warn("analyze group commit failed", e);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 234f8e99a88..82f7864fd3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -24,16 +24,27 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FormatOptions;
 import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
 import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
 import org.apache.doris.proto.Types;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.PreparedStatementContext;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
@@ -47,11 +58,15 @@ import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPipelineFragmentParams;
 import org.apache.doris.thrift.TPipelineFragmentParamsList;
 import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.ProtocolStringList;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -61,6 +76,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -70,12 +86,14 @@ import java.util.stream.Collectors;
 public class GroupCommitPlanner {
     private static final Logger LOG = 
LogManager.getLogger(GroupCommitPlanner.class);
     public static final String SCHEMA_CHANGE = " is blocked on schema change";
+    private static final int MAX_RETRY = 3;
 
-    protected Database db;
-    protected OlapTable table;
-    protected TUniqueId loadId;
-    protected Backend backend;
-    private TPipelineFragmentParamsList paramsList;
+    private Database db;
+    private OlapTable table;
+    public int baseSchemaVersion;
+    private int targetColumnSize;
+    private TUniqueId loadId;
+    private long backendId;
     private ByteString execPlanFragmentParamsBytes;
 
     public GroupCommitPlanner(Database db, OlapTable table, List<String> 
targetColumnNames, TUniqueId queryId,
@@ -83,6 +101,7 @@ public class GroupCommitPlanner {
             throws UserException, TException {
         this.db = db;
         this.table = table;
+        this.baseSchemaVersion = table.getBaseSchemaVersion();
         if 
(Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) {
             String msg = "insert table " + this.table.getId() + SCHEMA_CHANGE;
             LOG.info(msg);
@@ -123,16 +142,16 @@ public class GroupCommitPlanner {
         Preconditions.checkState(scanRangeParams.size() == 1);
         loadId = queryId;
         // see BackendServiceProxy#execPlanFragmentsAsync
-        paramsList = new TPipelineFragmentParamsList();
+        TPipelineFragmentParamsList paramsList = new 
TPipelineFragmentParamsList();
         paramsList.addToParamsList(tRequest);
         execPlanFragmentParamsBytes = ByteString.copyFrom(new 
TSerializer().serialize(paramsList));
     }
 
     public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext 
ctx,
             List<InternalService.PDataRow> rows)
-            throws DdlException, RpcException, ExecutionException, 
InterruptedException {
-        selectBackends(ctx);
-
+            throws DdlException, RpcException, ExecutionException, 
InterruptedException, LoadException {
+        Backend backend = 
Env.getCurrentEnv().getGroupCommitManager().selectBackendForGroupCommit(table.getId(),
 ctx);
+        backendId = backend.getId();
         PGroupCommitInsertRequest request = 
PGroupCommitInsertRequest.newBuilder()
                 
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
                         .setRequest(execPlanFragmentParamsBytes)
@@ -145,21 +164,8 @@ public class GroupCommitPlanner {
         return future.get();
     }
 
-    protected void selectBackends(ConnectContext ctx) throws DdlException {
-        try {
-            backend = Env.getCurrentEnv().getGroupCommitManager()
-                    .selectBackendForGroupCommit(this.table.getId(), ctx);
-        } catch (LoadException e) {
-            throw new DdlException("No suitable backend");
-        }
-    }
-
-    public Backend getBackend() {
-        return backend;
-    }
-
-    public TPipelineFragmentParamsList getParamsList() {
-        return paramsList;
+    public long getBackendId() {
+        return backendId;
     }
 
     public List<InternalService.PDataRow> getRows(NativeInsertStmt stmt) 
throws UserException {
@@ -167,12 +173,7 @@ public class GroupCommitPlanner {
         SelectStmt selectStmt = (SelectStmt) (stmt.getQueryStmt());
         if (selectStmt.getValueList() != null) {
             for (List<Expr> row : selectStmt.getValueList().getRows()) {
-                InternalService.PDataRow data = 
StmtExecutor.getRowStringValue(row, FormatOptions.getDefault());
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("add row: [{}]", 
data.getColList().stream().map(c -> c.getValue())
-                            .collect(Collectors.joining(",")));
-                }
-                rows.add(data);
+                rows.add(getOneRow(row));
             }
         } else {
             List<Expr> exprList = new ArrayList<>();
@@ -183,13 +184,147 @@ public class GroupCommitPlanner {
                     exprList.add(resultExpr);
                 }
             }
-            InternalService.PDataRow data = 
StmtExecutor.getRowStringValue(exprList, FormatOptions.getDefault());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("add row: [{}]", data.getColList().stream().map(c -> 
c.getValue())
-                        .collect(Collectors.joining(",")));
-            }
-            rows.add(data);
+            rows.add(getOneRow(exprList));
         }
         return rows;
     }
+
+    private static InternalService.PDataRow getOneRow(List<Expr> row) throws 
UserException {
+        InternalService.PDataRow data = StmtExecutor.getRowStringValue(row, 
FormatOptions.getDefault());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("add row: [{}]", data.getColList().stream().map(c -> 
c.getValue())
+                    .collect(Collectors.joining(",")));
+        }
+        return data;
+    }
+
+    private static List<InternalService.PDataRow> getRows(int 
targetColumnSize, List<Expr> rows) throws UserException {
+        List<InternalService.PDataRow> data = new ArrayList<>();
+        for (int i = 0; i < rows.size(); i += targetColumnSize) {
+            List<Expr> row = rows.subList(i, Math.min(i + targetColumnSize, 
rows.size()));
+            data.add(getOneRow(row));
+        }
+        return data;
+    }
+
+    // prepare command
+    public static void executeGroupCommitInsert(ConnectContext ctx, 
PreparedStatementContext preparedStmtCtx,
+            StatementContext statementContext) throws Exception {
+        PrepareCommand prepareCommand = preparedStmtCtx.command;
+        InsertIntoTableCommand command = (InsertIntoTableCommand) 
(prepareCommand.getLogicalPlan());
+        OlapTable table = (OlapTable) command.getTable(ctx);
+        for (int retry = 0; retry < MAX_RETRY; retry++) {
+            if 
(Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
+                String msg = "insert table " + table.getId() + SCHEMA_CHANGE;
+                LOG.info(msg);
+                throw new DdlException(msg);
+            }
+            boolean reuse = false;
+            GroupCommitPlanner groupCommitPlanner;
+            if (preparedStmtCtx.groupCommitPlanner.isPresent()
+                    && table.getBaseSchemaVersion() == 
preparedStmtCtx.groupCommitPlanner.get().baseSchemaVersion) {
+                groupCommitPlanner = preparedStmtCtx.groupCommitPlanner.get();
+                reuse = true;
+            } else {
+                // call nereids planner to check to sql
+                command.initPlan(ctx, new StmtExecutor(new ConnectContext(), 
""), false);
+                List<String> targetColumnNames = command.getTargetColumns();
+                groupCommitPlanner = EnvFactory.getInstance()
+                        .createGroupCommitPlanner((Database) 
table.getDatabase(), table,
+                                targetColumnNames, ctx.queryId(),
+                                
ConnectContext.get().getSessionVariable().getGroupCommit());
+                // TODO use planner column size
+                groupCommitPlanner.targetColumnSize = targetColumnNames == 
null ? table.getBaseSchema().size() :
+                        targetColumnNames.size();
+                preparedStmtCtx.groupCommitPlanner = 
Optional.of(groupCommitPlanner);
+            }
+            if (statementContext.getIdToPlaceholderRealExpr().size() % 
groupCommitPlanner.targetColumnSize != 0) {
+                throw new DdlException("Column size: " + 
statementContext.getIdToPlaceholderRealExpr().size()
+                        + " does not match with target column size: " + 
groupCommitPlanner.targetColumnSize);
+            }
+            List<Expr> valueExprs = 
statementContext.getIdToPlaceholderRealExpr().values().stream()
+                    .map(v -> ((Literal) 
v).toLegacyLiteral()).collect(Collectors.toList());
+            List<InternalService.PDataRow> rows = 
getRows(groupCommitPlanner.targetColumnSize, valueExprs);
+            PGroupCommitInsertResponse response = 
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
+            Pair<Boolean, Boolean> needRetryAndReplan = 
groupCommitPlanner.handleResponse(ctx, retry + 1 < MAX_RETRY,
+                    reuse, response);
+            if (needRetryAndReplan.first) {
+                if (needRetryAndReplan.second) {
+                    preparedStmtCtx.groupCommitPlanner = Optional.empty();
+                }
+            } else {
+                break;
+            }
+        }
+    }
+
+    // return <need_retry, need_replan>
+    private Pair<Boolean, Boolean> handleResponse(ConnectContext ctx, boolean 
canRetry, boolean reuse,
+            PGroupCommitInsertResponse response) throws DdlException {
+        TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+        ProtocolStringList errorMsgsList = 
response.getStatus().getErrorMsgsList();
+        if (canRetry && code != TStatusCode.OK && !errorMsgsList.isEmpty()) {
+            if (errorMsgsList.get(0).contains("schema version not match")) {
+                LOG.info("group commit insert failed. query: {}, db: {}, 
table: {}, schema version: {}, "
+                                + "backend: {}, status: {}", 
DebugUtil.printId(ctx.queryId()), db.getId(),
+                        table.getId(), baseSchemaVersion, backendId, 
response.getStatus());
+                return Pair.of(true, true);
+            } else if (errorMsgsList.get(0).contains("can not get a block 
queue")) {
+                return Pair.of(true, false);
+            }
+        }
+        if (code != TStatusCode.OK) {
+            handleInsertFailed(ctx, response);
+        } else {
+            setReturnInfo(ctx, reuse, response);
+        }
+        return Pair.of(false, false);
+    }
+
+    private void handleInsertFailed(ConnectContext ctx, 
PGroupCommitInsertResponse response) throws DdlException {
+        String errMsg = "group commit insert failed. db: " + db.getId() + ", 
table: " + table.getId()
+                + ", query: " + DebugUtil.printId(ctx.queryId()) + ", backend: 
" + backendId
+                + ", status: " + response.getStatus();
+        if (response.hasErrorUrl()) {
+            errMsg += ", error url: " + response.getErrorUrl();
+        }
+        ErrorReport.reportDdlException(errMsg.replaceAll("%", "%%"), 
ErrorCode.ERR_FAILED_WHEN_INSERT);
+    }
+
+    private void setReturnInfo(ConnectContext ctx, boolean reuse, 
PGroupCommitInsertResponse response) {
+        String labelName = response.getLabel();
+        TransactionStatus txnStatus = TransactionStatus.PREPARE;
+        long txnId = response.getTxnId();
+        long loadedRows = response.getLoadedRows();
+        long filteredRows = (int) response.getFilteredRows();
+        String errorUrl = response.getErrorUrl();
+        // the same as {@OlapInsertExecutor#setReturnInfo}
+        // {'label':'my_label1', 'status':'visible', 'txnId':'123'}
+        // {'label':'my_label1', 'status':'visible', 'txnId':'123' 
'err':'error messages'}
+        StringBuilder sb = new StringBuilder();
+        sb.append("{'label':'").append(labelName).append("', 
'status':'").append(txnStatus.name());
+        sb.append("', 'txnId':'").append(txnId).append("'");
+        if (table.getType() == TableType.MATERIALIZED_VIEW) {
+            sb.append("', 'rows':'").append(loadedRows).append("'");
+        }
+        /*if (!Strings.isNullOrEmpty(errMsg)) {
+            sb.append(", 'err':'").append(errMsg).append("'");
+        }*/
+        if (!Strings.isNullOrEmpty(errorUrl)) {
+            sb.append(", 'err_url':'").append(errorUrl).append("'");
+        }
+        sb.append(", 
'query_id':'").append(DebugUtil.printId(ctx.queryId())).append("'");
+        if (reuse) {
+            sb.append(", 
'reuse_group_commit_plan':'").append(true).append("'");
+        }
+        sb.append("}");
+
+        ctx.getState().setOk(loadedRows, (int) filteredRows, sb.toString());
+        // set insert result in connection context,
+        // so that user can use `show insert result` to get info of the last 
insert operation.
+        ctx.setOrUpdateInsertResult(txnId, labelName, db.getFullName(), 
table.getName(),
+                txnStatus, loadedRows, (int) filteredRows);
+        // update it, so that user can get loaded rows in fe.audit.log
+        ctx.updateReturnRows((int) loadedRows);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java
index 8decad79917..0174befee5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
+import org.apache.doris.planner.GroupCommitPlanner;
 
 import java.util.Optional;
 
@@ -28,6 +29,7 @@ public class PreparedStatementContext {
     StatementContext statementContext;
     public String stmtString;
     public Optional<ShortCircuitQueryContext> shortCircuitQueryContext = 
Optional.empty();
+    public Optional<GroupCommitPlanner> groupCommitPlanner = Optional.empty();
 
     // Timestamp in millisecond last command starts at
     protected volatile long startTime;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 00566f27a10..fc8b098ef8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -143,6 +143,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_SERVER_SIDE_PREPARED_STATEMENT = 
"enable_server_side_prepared_statement";
     public static final String MAX_PREPARED_STMT_COUNT = 
"max_prepared_stmt_count";
+    public static final String ENABLE_GROUP_COMMIT_FULL_PREPARE = 
"enable_group_commit_full_prepare";
     public static final String PREFER_JOIN_METHOD = "prefer_join_method";
 
     public static final String ENABLE_FOLD_CONSTANT_BY_BE = 
"enable_fold_constant_by_be";
@@ -1612,6 +1613,9 @@ public class SessionVariable implements Serializable, 
Writable {
                 "服务端prepared statement最大个数", "the maximum prepared statements 
server holds."})
     public int maxPreparedStmtCount = 100000;
 
+    @VariableMgr.VarAttr(name = ENABLE_GROUP_COMMIT_FULL_PREPARE)
+    public boolean enableGroupCommitFullPrepare = true;
+
     // Default value is false, which means the group by and having clause
     // should first use column name not alias. According to mysql.
     @VariableMgr.VarAttr(name = GROUP_BY_AND_HAVING_USE_ALIAS_FIRST, varType = 
VariableAnnotation.DEPRECATED)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 41d5ba98e6b..798bdb5d0c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -724,7 +724,7 @@ public class StmtExecutor {
         }
         if (logicalPlan instanceof Command) {
             if (logicalPlan instanceof Forward) {
-                OlapGroupCommitInsertExecutor.analyzeGroupCommit(logicalPlan);
+                OlapGroupCommitInsertExecutor.analyzeGroupCommit(context, 
logicalPlan);
                 redirectStatus = ((Forward) logicalPlan).toRedirectStatus();
                 if (isForwardToMaster()) {
                     // before forward to master, we also need to set 
profileType in this node
@@ -2384,7 +2384,7 @@ public class StmtExecutor {
                     LOG.info("group commit insert failed. stmt: {}, query_id: 
{}, db_id: {}, table_id: {}"
                                     + ", schema version: {}, backend_id: {}, 
status: {}, retry: {}",
                             insertStmt.getOrigStmt().originStmt, 
DebugUtil.printId(context.queryId()), dbId, tableId,
-                            nativeInsertStmt.getBaseSchemaVersion(), 
groupCommitPlanner.getBackend().getId(),
+                            nativeInsertStmt.getBaseSchemaVersion(), 
groupCommitPlanner.getBackendId(),
                             response.getStatus(), i);
                     if (i < maxRetry) {
                         List<TableIf> tables = 
Lists.newArrayList(insertStmt.getTargetTable());
@@ -2401,15 +2401,15 @@ public class StmtExecutor {
                     } else {
                         errMsg = "group commit insert failed. db_id: " + dbId 
+ ", table_id: " + tableId
                                 + ", query_id: " + 
DebugUtil.printId(context.queryId()) + ", backend_id: "
-                                + groupCommitPlanner.getBackend().getId() + ", 
status: " + response.getStatus();
+                                + groupCommitPlanner.getBackendId() + ", 
status: " + response.getStatus();
                         if (response.hasErrorUrl()) {
                             errMsg += ", error url: " + response.getErrorUrl();
                         }
                     }
                 } else if (code != TStatusCode.OK) {
                     errMsg = "group commit insert failed. db_id: " + dbId + ", 
table_id: " + tableId + ", query_id: "
-                            + DebugUtil.printId(context.queryId()) + ", 
backend_id: " + groupCommitPlanner.getBackend()
-                            .getId() + ", status: " + response.getStatus();
+                            + DebugUtil.printId(context.queryId()) + ", 
backend_id: "
+                            + groupCommitPlanner.getBackendId() + ", status: " 
+ response.getStatus();
                     if (response.hasErrorUrl()) {
                         errMsg += ", error url: " + response.getErrorUrl();
                     }
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
index f14b28a7509..9d5abdca1de 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
@@ -117,7 +117,6 @@ suite("insert_group_commit_with_exception") {
             Statement statement = connection.createStatement();
             statement.execute("use ${db}");
             statement.execute("set group_commit = sync_mode");
-            statement.execute("set enable_server_side_prepared_statement = 
true")
             // without column
             try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table} values(?, ?, ?, ?)")) {
                 ps.setObject(1, 8);
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
index e93e157aa5d..44e8d173ca7 100644
--- 
a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
+++ 
b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
@@ -89,7 +89,9 @@ suite("insert_group_commit_with_prepare_stmt") {
             assertTrue(serverInfo.contains("'status':'PREPARE'"))
             assertTrue(serverInfo.contains("'label':'group_commit_"))
             // TODO: currently if enable_server_side_prepared_statement = 
true, will not reuse plan
-            // assertEquals(reuse_plan, 
serverInfo.contains("reuse_group_commit_plan"))
+            if (reuse_plan) {
+                assertEquals(reuse_plan, 
serverInfo.contains("reuse_group_commit_plan"))
+            }
         } else {
             // for batch insert
             ConnectionImpl connection = (ConnectionImpl) stmt.getConnection()
diff --git 
a/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy
index 347f99c2004..65556a91301 100644
--- a/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy
+++ b/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy
@@ -134,7 +134,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {
         }
 
         def result = sql "SELECT COUNT(*) FROM ${table}_0"
-        rowCount = result[0][0]
+        def rowCount = result[0][0]
         assertEquals(0, rowCount)
         // sleep(10000)
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to