This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8d693cdcb90 [fix](group commit) fix group commit use prepared statement and connect to observer (#46206) (#46344) 8d693cdcb90 is described below commit 8d693cdcb90c83bb3ac7a8db07439bb4de03c39c Author: meiyi <me...@selectdb.com> AuthorDate: Fri Jan 3 15:48:47 2025 +0800 [fix](group commit) fix group commit use prepared statement and connect to observer (#46206) (#46344) pick https://github.com/apache/doris/pull/46206 --- .../commands/insert/InsertIntoTableCommand.java | 10 ++++++++++ .../insert/OlapGroupCommitInsertExecutor.java | 21 +++++++++++++++++++-- .../main/java/org/apache/doris/qe/StmtExecutor.java | 2 ++ .../org/apache/doris/regression/suite/Suite.groovy | 6 +++++- .../insert_group_commit_with_exception.groovy | 4 ++-- .../insert_group_commit_with_prepare_stmt.groovy | 2 +- 6 files changed, 39 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 17c84580d15..023b205ac53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.StmtType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; @@ -331,6 +332,15 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, return StmtType.INSERT; } + @Override + public RedirectStatus toRedirectStatus() { + if (ConnectContext.get().isGroupCommit()) { + return RedirectStatus.NO_FORWARD; + } else { + return RedirectStatus.FORWARD_WITH_SYNC; + } + } + private static class BuildInsertExecutorResult { private final NereidsPlanner planner; private final AbstractInsertExecutor executor; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 239328ce93d..ed13d5dcb23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import org.apache.doris.planner.GroupCommitPlanner; @@ -63,6 +64,20 @@ public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor { super(ctx, table, labelName, planner, insertCtx, emptyInsert); } + /** + * check if the sql can run in group commit mode + * @param logicalPlan plan of sql + */ + public static void analyzeGroupCommit(LogicalPlan logicalPlan) { + ConnectContext ctx = ConnectContext.get(); + if (ctx.getSessionVariable().isEnableInsertGroupCommit() && logicalPlan instanceof InsertIntoTableCommand) { + LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery(); + TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); + OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, logicalQuery, + Optional.empty()); + } + } + protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, LogicalPlan logicalQuery, Optional<InsertCommandContext> insertCtx) { // The flag is set to false before execute sql, if it is true, this is a http stream @@ -91,8 +106,10 @@ public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor { conditions.add(Pair.of(() -> !(insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext && ((OlapInsertCommandContext) insertCtx.get()).isOverwrite()), () -> "is overwrite command")); conditions.add(Pair.of( - () -> tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion, - () -> "not one row relation or union, class: " + tableSink.child().getClass().getName())); + () -> tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion + || tableSink.child() instanceof LogicalInlineTable, + () -> "not one row relation or union or inline table, class: " + tableSink.child().getClass() + .getName())); ctx.setGroupCommit(conditions.stream().allMatch(p -> p.first.getAsBoolean())); if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) { for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 7ae5994e431..41d5ba98e6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -160,6 +160,7 @@ import org.apache.doris.nereids.trees.plans.commands.UpdateCommand; import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor; import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; @@ -723,6 +724,7 @@ public class StmtExecutor { } if (logicalPlan instanceof Command) { if (logicalPlan instanceof Forward) { + OlapGroupCommitInsertExecutor.analyzeGroupCommit(logicalPlan); redirectStatus = ((Forward) logicalPlan).toRedirectStatus(); if (isForwardToMaster()) { // before forward to master, we also need to set profileType in this node diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index a7b8421635e..333720bc5b9 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1361,8 +1361,12 @@ class Suite implements GroovyInterceptable { } String getServerPrepareJdbcUrl(String jdbcUrl, String database) { + return getServerPrepareJdbcUrl(jdbcUrl, database, true) + } + + String getServerPrepareJdbcUrl(String jdbcUrl, String database, boolean useMasterIp) { String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) - def sql_ip = getMasterIp() + def sql_ip = useMasterIp ? getMasterIp() : urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) def sql_port if (urlWithoutSchema.indexOf("/") >= 0) { // e.g: jdbc:mysql://locahost:8080/?a=b diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy index 054add11d9f..f14b28a7509 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy @@ -111,12 +111,12 @@ suite("insert_group_commit_with_exception") { // prepare insert def db = context.config.defaultDb + "_insert_p0" - String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db) + String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db, false) try (Connection connection = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword)) { Statement statement = connection.createStatement(); statement.execute("use ${db}"); - statement.execute("set group_commit = eventual_consistency;"); + statement.execute("set group_commit = sync_mode"); statement.execute("set enable_server_side_prepared_statement = true") // without column try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) { diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy index 7f2919f8118..e93e157aa5d 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -127,7 +127,7 @@ suite("insert_group_commit_with_prepare_stmt") { return serverStatementIds } - def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb) + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb, false) logger.info("url: " + url) def result1 = connect(user, password, url + "&sessionVariables=group_commit=async_mode") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org