This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8d693cdcb90 [fix](group commit) fix group commit use prepared 
statement and connect to observer (#46206) (#46344)
8d693cdcb90 is described below

commit 8d693cdcb90c83bb3ac7a8db07439bb4de03c39c
Author: meiyi <me...@selectdb.com>
AuthorDate: Fri Jan 3 15:48:47 2025 +0800

    [fix](group commit) fix group commit use prepared statement and connect to 
observer (#46206) (#46344)
    
    pick https://github.com/apache/doris/pull/46206
---
 .../commands/insert/InsertIntoTableCommand.java     | 10 ++++++++++
 .../insert/OlapGroupCommitInsertExecutor.java       | 21 +++++++++++++++++++--
 .../main/java/org/apache/doris/qe/StmtExecutor.java |  2 ++
 .../org/apache/doris/regression/suite/Suite.groovy  |  6 +++++-
 .../insert_group_commit_with_exception.groovy       |  4 ++--
 .../insert_group_commit_with_prepare_stmt.groovy    |  2 +-
 6 files changed, 39 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 17c84580d15..023b205ac53 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.analysis.StmtType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
@@ -331,6 +332,15 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         return StmtType.INSERT;
     }
 
+    @Override
+    public RedirectStatus toRedirectStatus() {
+        if (ConnectContext.get().isGroupCommit()) {
+            return RedirectStatus.NO_FORWARD;
+        } else {
+            return RedirectStatus.FORWARD_WITH_SYNC;
+        }
+    }
+
     private static class BuildInsertExecutorResult {
         private final NereidsPlanner planner;
         private final AbstractInsertExecutor executor;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index 239328ce93d..ed13d5dcb23 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -33,6 +33,7 @@ import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
 import org.apache.doris.planner.GroupCommitPlanner;
@@ -63,6 +64,20 @@ public class OlapGroupCommitInsertExecutor extends 
OlapInsertExecutor {
         super(ctx, table, labelName, planner, insertCtx, emptyInsert);
     }
 
+    /**
+     * check if the sql can run in group commit mode
+     * @param logicalPlan plan of sql
+     */
+    public static void analyzeGroupCommit(LogicalPlan logicalPlan) {
+        ConnectContext ctx = ConnectContext.get();
+        if (ctx.getSessionVariable().isEnableInsertGroupCommit() && 
logicalPlan instanceof InsertIntoTableCommand) {
+            LogicalPlan logicalQuery = ((InsertIntoTableCommand) 
logicalPlan).getLogicalQuery();
+            TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, 
ctx);
+            OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, 
targetTableIf, logicalQuery,
+                    Optional.empty());
+        }
+    }
+
     protected static void analyzeGroupCommit(ConnectContext ctx, TableIf 
table, LogicalPlan logicalQuery,
             Optional<InsertCommandContext> insertCtx) {
         // The flag is set to false before execute sql, if it is true, this is 
a http stream
@@ -91,8 +106,10 @@ public class OlapGroupCommitInsertExecutor extends 
OlapInsertExecutor {
             conditions.add(Pair.of(() -> !(insertCtx.isPresent() && 
insertCtx.get() instanceof OlapInsertCommandContext
                     && ((OlapInsertCommandContext) 
insertCtx.get()).isOverwrite()), () -> "is overwrite command"));
             conditions.add(Pair.of(
-                    () -> tableSink.child() instanceof OneRowRelation || 
tableSink.child() instanceof LogicalUnion,
-                    () -> "not one row relation or union, class: " + 
tableSink.child().getClass().getName()));
+                    () -> tableSink.child() instanceof OneRowRelation || 
tableSink.child() instanceof LogicalUnion
+                            || tableSink.child() instanceof LogicalInlineTable,
+                    () -> "not one row relation or union or inline table, 
class: " + tableSink.child().getClass()
+                            .getName()));
             ctx.setGroupCommit(conditions.stream().allMatch(p -> 
p.first.getAsBoolean()));
             if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) {
                 for (Pair<BooleanSupplier, Supplier<String>> pair : 
conditions) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 7ae5994e431..41d5ba98e6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -160,6 +160,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor;
 import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
@@ -723,6 +724,7 @@ public class StmtExecutor {
         }
         if (logicalPlan instanceof Command) {
             if (logicalPlan instanceof Forward) {
+                OlapGroupCommitInsertExecutor.analyzeGroupCommit(logicalPlan);
                 redirectStatus = ((Forward) logicalPlan).toRedirectStatus();
                 if (isForwardToMaster()) {
                     // before forward to master, we also need to set 
profileType in this node
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index a7b8421635e..333720bc5b9 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1361,8 +1361,12 @@ class Suite implements GroovyInterceptable {
     }
 
     String getServerPrepareJdbcUrl(String jdbcUrl, String database) {
+        return getServerPrepareJdbcUrl(jdbcUrl, database, true)
+    }
+
+    String getServerPrepareJdbcUrl(String jdbcUrl, String database, boolean 
useMasterIp) {
         String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
-        def sql_ip = getMasterIp()
+        def sql_ip = useMasterIp ? getMasterIp() : 
urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
         def sql_port
         if (urlWithoutSchema.indexOf("/") >= 0) {
             // e.g: jdbc:mysql://locahost:8080/?a=b
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
index 054add11d9f..f14b28a7509 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
@@ -111,12 +111,12 @@ suite("insert_group_commit_with_exception") {
 
         // prepare insert
         def db = context.config.defaultDb + "_insert_p0"
-        String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
+        String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db, false)
 
         try (Connection connection = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword)) {
             Statement statement = connection.createStatement();
             statement.execute("use ${db}");
-            statement.execute("set group_commit = eventual_consistency;");
+            statement.execute("set group_commit = sync_mode");
             statement.execute("set enable_server_side_prepared_statement = 
true")
             // without column
             try (PreparedStatement ps = connection.prepareStatement("insert 
into ${table} values(?, ?, ?, ?)")) {
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
index 7f2919f8118..e93e157aa5d 100644
--- 
a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
+++ 
b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
@@ -127,7 +127,7 @@ suite("insert_group_commit_with_prepare_stmt") {
         return serverStatementIds
     }
 
-    def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb)
+    def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb, false)
     logger.info("url: " + url)
 
     def result1 = connect(user, password, url + 
"&sessionVariables=group_commit=async_mode") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to