This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 689391cb810 [fix](group commit) support full prepare of group commit in nereids (#46533) (#46742) 689391cb810 is described below commit 689391cb810e5f1b1c059d9b74db2ac607288a6c Author: meiyi <me...@selectdb.com> AuthorDate: Mon Jan 13 21:20:53 2025 +0800 [fix](group commit) support full prepare of group commit in nereids (#46533) (#46742) pick https://github.com/apache/doris/pull/46533 --- .../apache/doris/analysis/NativeInsertStmt.java | 7 +- .../doris/cloud/catalog/CloudEnvFactory.java | 13 -- .../cloud/planner/CloudGroupCommitPlanner.java | 40 ---- .../trees/plans/commands/ExecuteCommand.java | 48 +++-- .../commands/insert/InsertIntoTableCommand.java | 32 ++++ .../insert/OlapGroupCommitInsertExecutor.java | 42 ++++- .../apache/doris/planner/GroupCommitPlanner.java | 207 +++++++++++++++++---- .../apache/doris/qe/PreparedStatementContext.java | 2 + .../java/org/apache/doris/qe/SessionVariable.java | 4 + .../java/org/apache/doris/qe/StmtExecutor.java | 10 +- .../insert_group_commit_with_exception.groovy | 1 - .../insert_group_commit_with_prepare_stmt.groovy | 4 +- .../transaction/txn_insert_inject_case.groovy | 2 +- 13 files changed, 284 insertions(+), 128 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 2b0dd56b0df..c42a8a5a275 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -154,7 +154,6 @@ public class NativeInsertStmt extends InsertStmt { // Used for group commit insert private boolean isGroupCommit = false; - private int baseSchemaVersion = -1; private TUniqueId loadId = null; private long tableId = -1; public boolean isGroupCommitStreamLoadSql = false; @@ -1305,7 +1304,8 @@ public class NativeInsertStmt extends InsertStmt { OlapTable olapTable = (OlapTable) getTargetTable(); olapTable.readLock(); try { - if (groupCommitPlanner != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) { + if (groupCommitPlanner != null + && olapTable.getBaseSchemaVersion() == groupCommitPlanner.baseSchemaVersion) { if (LOG.isDebugEnabled()) { LOG.debug("reuse group commit plan, table={}", olapTable); } @@ -1323,7 +1323,6 @@ public class NativeInsertStmt extends InsertStmt { targetColumnNames, queryId, ConnectContext.get().getSessionVariable().getGroupCommit()); // save plan message to be reused for prepare stmt loadId = queryId; - baseSchemaVersion = olapTable.getBaseSchemaVersion(); return groupCommitPlanner; } finally { olapTable.readUnlock(); @@ -1335,7 +1334,7 @@ public class NativeInsertStmt extends InsertStmt { } public int getBaseSchemaVersion() { - return baseSchemaVersion; + return groupCommitPlanner.baseSchemaVersion; } public void setIsFromDeleteOrUpdateStmt(boolean isFromDeleteOrUpdateStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java index 32992307a8b..764ea49f329 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java @@ -22,11 +22,9 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.CloudTabletStatMgr; -import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DynamicPartitionProperty; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.EnvFactory; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; @@ -37,12 +35,10 @@ import org.apache.doris.cloud.load.CleanCopyJobScheduler; import org.apache.doris.cloud.load.CloudBrokerLoadJob; import org.apache.doris.cloud.load.CloudLoadManager; import org.apache.doris.cloud.load.CloudRoutineLoadManager; -import org.apache.doris.cloud.planner.CloudGroupCommitPlanner; import org.apache.doris.cloud.qe.CloudCoordinator; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalCatalog; @@ -51,7 +47,6 @@ import org.apache.doris.load.loadv2.LoadJobScheduler; import org.apache.doris.load.loadv2.LoadManager; import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.nereids.stats.StatsErrorEstimator; -import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; @@ -62,8 +57,6 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.GlobalTransactionMgrIface; -import org.apache.thrift.TException; - import java.lang.reflect.Type; import java.util.List; import java.util.Map; @@ -168,12 +161,6 @@ public class CloudEnvFactory extends EnvFactory { enableProfile); } - @Override - public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, - TUniqueId queryId, String groupCommit) throws UserException, TException { - return new CloudGroupCommitPlanner(db, table, targetColumnNames, queryId, groupCommit); - } - @Override public RoutineLoadManager createRoutineLoadManager() { return new CloudRoutineLoadManager(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java deleted file mode 100644 index 0388ca5c5d6..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java +++ /dev/null @@ -1,40 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.cloud.planner; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.UserException; -import org.apache.doris.planner.GroupCommitPlanner; -import org.apache.doris.thrift.TUniqueId; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; - -import java.util.List; - -public class CloudGroupCommitPlanner extends GroupCommitPlanner { - private static final Logger LOG = LogManager.getLogger(CloudGroupCommitPlanner.class); - - public CloudGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, TUniqueId queryId, - String groupCommit) - throws UserException, TException { - super(db, table, targetColumnNames, queryId, groupCommit); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java index 47ba6ed3f4c..c4031c0f9e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java @@ -24,7 +24,9 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.PointQueryExecutor; import org.apache.doris.qe.PreparedStatementContext; @@ -72,27 +74,35 @@ public class ExecuteCommand extends Command { executor.setParsedStmt(planAdapter); // If it's not a short circuit query or schema version is different(indicates schema changed) or // has nondeterministic functions in statement, then need to do reanalyze and plan - boolean isShortCircuit = executor.getContext().getStatementContext().isShortCircuitQuery(); - boolean hasShortCircuitContext = preparedStmtCtx.shortCircuitQueryContext.isPresent(); - boolean schemaVersionMismatch = hasShortCircuitContext - && preparedStmtCtx.shortCircuitQueryContext.get().tbl.getBaseSchemaVersion() - != preparedStmtCtx.shortCircuitQueryContext.get().schemaVersion; - boolean needAnalyze = !isShortCircuit || schemaVersionMismatch || !hasShortCircuitContext - || executor.getContext().getStatementContext().hasNondeterministic(); - if (needAnalyze) { - // execute real statement - preparedStmtCtx.shortCircuitQueryContext = Optional.empty(); - statementContext.setShortCircuitQueryContext(null); - executor.execute(); - if (executor.getContext().getStatementContext().isShortCircuitQuery()) { - // cache short-circuit plan - preparedStmtCtx.shortCircuitQueryContext = Optional.of( - new ShortCircuitQueryContext(executor.planner(), (Queriable) executor.getParsedStmt())); - statementContext.setShortCircuitQueryContext(preparedStmtCtx.shortCircuitQueryContext.get()); - } + if (executor.getContext().getStatementContext().isShortCircuitQuery() + && preparedStmtCtx.shortCircuitQueryContext.isPresent() + && preparedStmtCtx.shortCircuitQueryContext.get().tbl.getBaseSchemaVersion() + == preparedStmtCtx.shortCircuitQueryContext.get().schemaVersion && !executor.getContext() + .getStatementContext().hasNondeterministic()) { + PointQueryExecutor.directExecuteShortCircuitQuery(executor, preparedStmtCtx, statementContext); return; } - PointQueryExecutor.directExecuteShortCircuitQuery(executor, preparedStmtCtx, statementContext); + if (ctx.getSessionVariable().enableGroupCommitFullPrepare) { + if (preparedStmtCtx.groupCommitPlanner.isPresent()) { + OlapGroupCommitInsertExecutor.fastAnalyzeGroupCommit(ctx, prepareCommand); + } else { + OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, prepareCommand); + } + if (ctx.isGroupCommit()) { + GroupCommitPlanner.executeGroupCommitInsert(ctx, preparedStmtCtx, statementContext); + return; + } + } + // execute real statement + preparedStmtCtx.shortCircuitQueryContext = Optional.empty(); + statementContext.setShortCircuitQueryContext(null); + executor.execute(); + if (executor.getContext().getStatementContext().isShortCircuitQuery()) { + // cache short-circuit plan + preparedStmtCtx.shortCircuitQueryContext = Optional.of( + new ShortCircuitQueryContext(executor.planner(), (Queriable) executor.getParsedStmt())); + statementContext.setShortCircuitQueryContext(preparedStmtCtx.shortCircuitQueryContext.get()); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 7b1121e5d14..3a0e7d7c7f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -43,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; @@ -60,6 +61,7 @@ import org.apache.doris.qe.StmtExecutor; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -312,6 +314,36 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, return !(logicalQuery instanceof UnboundTableSink); } + /** + * get the target table of the insert command + */ + public TableIf getTable(ConnectContext ctx) throws Exception { + TableIf targetTableIf = InsertUtils.getTargetTable(originalLogicalQuery, ctx); + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), targetTableIf.getDatabase().getCatalog().getName(), + targetTableIf.getDatabase().getFullName(), targetTableIf.getName(), + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + targetTableIf.getDatabase().getFullName() + "." + targetTableIf.getName()); + } + return targetTableIf; + } + + /** + * get the target columns of the insert command + */ + public List<String> getTargetColumns() { + if (originalLogicalQuery instanceof UnboundTableSink) { + UnboundLogicalSink<? extends Plan> unboundTableSink + = (UnboundTableSink<? extends Plan>) originalLogicalQuery; + return CollectionUtils.isEmpty(unboundTableSink.getColNames()) ? null : unboundTableSink.getColNames(); + } else { + throw new AnalysisException( + "the root of plan should be [UnboundTableSink], but it is " + originalLogicalQuery.getType()); + } + } + @Override public Plan getExplainPlan(ConnectContext ctx) { Plan plan = InsertUtils.getPlanForExplain(ctx, this.logicalQuery); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index ed13d5dcb23..0387f308b15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; +import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; @@ -66,15 +67,40 @@ public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor { /** * check if the sql can run in group commit mode - * @param logicalPlan plan of sql */ - public static void analyzeGroupCommit(LogicalPlan logicalPlan) { - ConnectContext ctx = ConnectContext.get(); - if (ctx.getSessionVariable().isEnableInsertGroupCommit() && logicalPlan instanceof InsertIntoTableCommand) { - LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery(); - TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); - OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, logicalQuery, - Optional.empty()); + public static void fastAnalyzeGroupCommit(ConnectContext ctx, LogicalPlan logicalPlan) { + try { + if (ctx.getSessionVariable().isEnableInsertGroupCommit() && !ctx.isTxnModel() && !ctx.getSessionVariable() + .isEnableUniqueKeyPartialUpdate()) { + ctx.setGroupCommit(true); + } + } catch (Throwable e) { + LOG.warn("analyze group commit failed", e); + } + } + + /** + * check if the sql can run in group commit mode + */ + public static void analyzeGroupCommit(ConnectContext ctx, LogicalPlan logicalPlan) { + try { + if (ctx.isGroupCommit()) { + return; + } + if (!ctx.getSessionVariable().isEnableInsertGroupCommit()) { + return; + } + if (logicalPlan instanceof PrepareCommand) { + logicalPlan = ((PrepareCommand) logicalPlan).getLogicalPlan(); + } + if (logicalPlan instanceof InsertIntoTableCommand) { + LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery(); + TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); + OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, logicalQuery, + Optional.empty()); + } + } catch (Throwable e) { + LOG.warn("analyze group commit failed", e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 234f8e99a88..82f7864fd3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -24,16 +24,27 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.EnvFactory; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FormatOptions; import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.proto.Types; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.PreparedStatementContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; @@ -47,11 +58,15 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineFragmentParamsList; import org.apache.doris.thrift.TScanRangeParams; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.protobuf.ByteString; +import com.google.protobuf.ProtocolStringList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -61,6 +76,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -70,12 +86,14 @@ import java.util.stream.Collectors; public class GroupCommitPlanner { private static final Logger LOG = LogManager.getLogger(GroupCommitPlanner.class); public static final String SCHEMA_CHANGE = " is blocked on schema change"; + private static final int MAX_RETRY = 3; - protected Database db; - protected OlapTable table; - protected TUniqueId loadId; - protected Backend backend; - private TPipelineFragmentParamsList paramsList; + private Database db; + private OlapTable table; + public int baseSchemaVersion; + private int targetColumnSize; + private TUniqueId loadId; + private long backendId; private ByteString execPlanFragmentParamsBytes; public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, TUniqueId queryId, @@ -83,6 +101,7 @@ public class GroupCommitPlanner { throws UserException, TException { this.db = db; this.table = table; + this.baseSchemaVersion = table.getBaseSchemaVersion(); if (Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) { String msg = "insert table " + this.table.getId() + SCHEMA_CHANGE; LOG.info(msg); @@ -123,16 +142,16 @@ public class GroupCommitPlanner { Preconditions.checkState(scanRangeParams.size() == 1); loadId = queryId; // see BackendServiceProxy#execPlanFragmentsAsync - paramsList = new TPipelineFragmentParamsList(); + TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList(); paramsList.addToParamsList(tRequest); execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList)); } public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, List<InternalService.PDataRow> rows) - throws DdlException, RpcException, ExecutionException, InterruptedException { - selectBackends(ctx); - + throws DdlException, RpcException, ExecutionException, InterruptedException, LoadException { + Backend backend = Env.getCurrentEnv().getGroupCommitManager().selectBackendForGroupCommit(table.getId(), ctx); + backendId = backend.getId(); PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder() .setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder() .setRequest(execPlanFragmentParamsBytes) @@ -145,21 +164,8 @@ public class GroupCommitPlanner { return future.get(); } - protected void selectBackends(ConnectContext ctx) throws DdlException { - try { - backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(this.table.getId(), ctx); - } catch (LoadException e) { - throw new DdlException("No suitable backend"); - } - } - - public Backend getBackend() { - return backend; - } - - public TPipelineFragmentParamsList getParamsList() { - return paramsList; + public long getBackendId() { + return backendId; } public List<InternalService.PDataRow> getRows(NativeInsertStmt stmt) throws UserException { @@ -167,12 +173,7 @@ public class GroupCommitPlanner { SelectStmt selectStmt = (SelectStmt) (stmt.getQueryStmt()); if (selectStmt.getValueList() != null) { for (List<Expr> row : selectStmt.getValueList().getRows()) { - InternalService.PDataRow data = StmtExecutor.getRowStringValue(row, FormatOptions.getDefault()); - if (LOG.isDebugEnabled()) { - LOG.debug("add row: [{}]", data.getColList().stream().map(c -> c.getValue()) - .collect(Collectors.joining(","))); - } - rows.add(data); + rows.add(getOneRow(row)); } } else { List<Expr> exprList = new ArrayList<>(); @@ -183,13 +184,147 @@ public class GroupCommitPlanner { exprList.add(resultExpr); } } - InternalService.PDataRow data = StmtExecutor.getRowStringValue(exprList, FormatOptions.getDefault()); - if (LOG.isDebugEnabled()) { - LOG.debug("add row: [{}]", data.getColList().stream().map(c -> c.getValue()) - .collect(Collectors.joining(","))); - } - rows.add(data); + rows.add(getOneRow(exprList)); } return rows; } + + private static InternalService.PDataRow getOneRow(List<Expr> row) throws UserException { + InternalService.PDataRow data = StmtExecutor.getRowStringValue(row, FormatOptions.getDefault()); + if (LOG.isDebugEnabled()) { + LOG.debug("add row: [{}]", data.getColList().stream().map(c -> c.getValue()) + .collect(Collectors.joining(","))); + } + return data; + } + + private static List<InternalService.PDataRow> getRows(int targetColumnSize, List<Expr> rows) throws UserException { + List<InternalService.PDataRow> data = new ArrayList<>(); + for (int i = 0; i < rows.size(); i += targetColumnSize) { + List<Expr> row = rows.subList(i, Math.min(i + targetColumnSize, rows.size())); + data.add(getOneRow(row)); + } + return data; + } + + // prepare command + public static void executeGroupCommitInsert(ConnectContext ctx, PreparedStatementContext preparedStmtCtx, + StatementContext statementContext) throws Exception { + PrepareCommand prepareCommand = preparedStmtCtx.command; + InsertIntoTableCommand command = (InsertIntoTableCommand) (prepareCommand.getLogicalPlan()); + OlapTable table = (OlapTable) command.getTable(ctx); + for (int retry = 0; retry < MAX_RETRY; retry++) { + if (Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) { + String msg = "insert table " + table.getId() + SCHEMA_CHANGE; + LOG.info(msg); + throw new DdlException(msg); + } + boolean reuse = false; + GroupCommitPlanner groupCommitPlanner; + if (preparedStmtCtx.groupCommitPlanner.isPresent() + && table.getBaseSchemaVersion() == preparedStmtCtx.groupCommitPlanner.get().baseSchemaVersion) { + groupCommitPlanner = preparedStmtCtx.groupCommitPlanner.get(); + reuse = true; + } else { + // call nereids planner to check to sql + command.initPlan(ctx, new StmtExecutor(new ConnectContext(), ""), false); + List<String> targetColumnNames = command.getTargetColumns(); + groupCommitPlanner = EnvFactory.getInstance() + .createGroupCommitPlanner((Database) table.getDatabase(), table, + targetColumnNames, ctx.queryId(), + ConnectContext.get().getSessionVariable().getGroupCommit()); + // TODO use planner column size + groupCommitPlanner.targetColumnSize = targetColumnNames == null ? table.getBaseSchema().size() : + targetColumnNames.size(); + preparedStmtCtx.groupCommitPlanner = Optional.of(groupCommitPlanner); + } + if (statementContext.getIdToPlaceholderRealExpr().size() % groupCommitPlanner.targetColumnSize != 0) { + throw new DdlException("Column size: " + statementContext.getIdToPlaceholderRealExpr().size() + + " does not match with target column size: " + groupCommitPlanner.targetColumnSize); + } + List<Expr> valueExprs = statementContext.getIdToPlaceholderRealExpr().values().stream() + .map(v -> ((Literal) v).toLegacyLiteral()).collect(Collectors.toList()); + List<InternalService.PDataRow> rows = getRows(groupCommitPlanner.targetColumnSize, valueExprs); + PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); + Pair<Boolean, Boolean> needRetryAndReplan = groupCommitPlanner.handleResponse(ctx, retry + 1 < MAX_RETRY, + reuse, response); + if (needRetryAndReplan.first) { + if (needRetryAndReplan.second) { + preparedStmtCtx.groupCommitPlanner = Optional.empty(); + } + } else { + break; + } + } + } + + // return <need_retry, need_replan> + private Pair<Boolean, Boolean> handleResponse(ConnectContext ctx, boolean canRetry, boolean reuse, + PGroupCommitInsertResponse response) throws DdlException { + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList(); + if (canRetry && code != TStatusCode.OK && !errorMsgsList.isEmpty()) { + if (errorMsgsList.get(0).contains("schema version not match")) { + LOG.info("group commit insert failed. query: {}, db: {}, table: {}, schema version: {}, " + + "backend: {}, status: {}", DebugUtil.printId(ctx.queryId()), db.getId(), + table.getId(), baseSchemaVersion, backendId, response.getStatus()); + return Pair.of(true, true); + } else if (errorMsgsList.get(0).contains("can not get a block queue")) { + return Pair.of(true, false); + } + } + if (code != TStatusCode.OK) { + handleInsertFailed(ctx, response); + } else { + setReturnInfo(ctx, reuse, response); + } + return Pair.of(false, false); + } + + private void handleInsertFailed(ConnectContext ctx, PGroupCommitInsertResponse response) throws DdlException { + String errMsg = "group commit insert failed. db: " + db.getId() + ", table: " + table.getId() + + ", query: " + DebugUtil.printId(ctx.queryId()) + ", backend: " + backendId + + ", status: " + response.getStatus(); + if (response.hasErrorUrl()) { + errMsg += ", error url: " + response.getErrorUrl(); + } + ErrorReport.reportDdlException(errMsg.replaceAll("%", "%%"), ErrorCode.ERR_FAILED_WHEN_INSERT); + } + + private void setReturnInfo(ConnectContext ctx, boolean reuse, PGroupCommitInsertResponse response) { + String labelName = response.getLabel(); + TransactionStatus txnStatus = TransactionStatus.PREPARE; + long txnId = response.getTxnId(); + long loadedRows = response.getLoadedRows(); + long filteredRows = (int) response.getFilteredRows(); + String errorUrl = response.getErrorUrl(); + // the same as {@OlapInsertExecutor#setReturnInfo} + // {'label':'my_label1', 'status':'visible', 'txnId':'123'} + // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'} + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name()); + sb.append("', 'txnId':'").append(txnId).append("'"); + if (table.getType() == TableType.MATERIALIZED_VIEW) { + sb.append("', 'rows':'").append(loadedRows).append("'"); + } + /*if (!Strings.isNullOrEmpty(errMsg)) { + sb.append(", 'err':'").append(errMsg).append("'"); + }*/ + if (!Strings.isNullOrEmpty(errorUrl)) { + sb.append(", 'err_url':'").append(errorUrl).append("'"); + } + sb.append(", 'query_id':'").append(DebugUtil.printId(ctx.queryId())).append("'"); + if (reuse) { + sb.append(", 'reuse_group_commit_plan':'").append(true).append("'"); + } + sb.append("}"); + + ctx.getState().setOk(loadedRows, (int) filteredRows, sb.toString()); + // set insert result in connection context, + // so that user can use `show insert result` to get info of the last insert operation. + ctx.setOrUpdateInsertResult(txnId, labelName, db.getFullName(), table.getName(), + txnStatus, loadedRows, (int) filteredRows); + // update it, so that user can get loaded rows in fe.audit.log + ctx.updateReturnRows((int) loadedRows); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java index 8decad79917..0174befee5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java @@ -19,6 +19,7 @@ package org.apache.doris.qe; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; +import org.apache.doris.planner.GroupCommitPlanner; import java.util.Optional; @@ -28,6 +29,7 @@ public class PreparedStatementContext { StatementContext statementContext; public String stmtString; public Optional<ShortCircuitQueryContext> shortCircuitQueryContext = Optional.empty(); + public Optional<GroupCommitPlanner> groupCommitPlanner = Optional.empty(); // Timestamp in millisecond last command starts at protected volatile long startTime; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 00566f27a10..fc8b098ef8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -143,6 +143,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_SERVER_SIDE_PREPARED_STATEMENT = "enable_server_side_prepared_statement"; public static final String MAX_PREPARED_STMT_COUNT = "max_prepared_stmt_count"; + public static final String ENABLE_GROUP_COMMIT_FULL_PREPARE = "enable_group_commit_full_prepare"; public static final String PREFER_JOIN_METHOD = "prefer_join_method"; public static final String ENABLE_FOLD_CONSTANT_BY_BE = "enable_fold_constant_by_be"; @@ -1612,6 +1613,9 @@ public class SessionVariable implements Serializable, Writable { "服务端prepared statement最大个数", "the maximum prepared statements server holds."}) public int maxPreparedStmtCount = 100000; + @VariableMgr.VarAttr(name = ENABLE_GROUP_COMMIT_FULL_PREPARE) + public boolean enableGroupCommitFullPrepare = true; + // Default value is false, which means the group by and having clause // should first use column name not alias. According to mysql. @VariableMgr.VarAttr(name = GROUP_BY_AND_HAVING_USE_ALIAS_FIRST, varType = VariableAnnotation.DEPRECATED) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 41d5ba98e6b..798bdb5d0c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -724,7 +724,7 @@ public class StmtExecutor { } if (logicalPlan instanceof Command) { if (logicalPlan instanceof Forward) { - OlapGroupCommitInsertExecutor.analyzeGroupCommit(logicalPlan); + OlapGroupCommitInsertExecutor.analyzeGroupCommit(context, logicalPlan); redirectStatus = ((Forward) logicalPlan).toRedirectStatus(); if (isForwardToMaster()) { // before forward to master, we also need to set profileType in this node @@ -2384,7 +2384,7 @@ public class StmtExecutor { LOG.info("group commit insert failed. stmt: {}, query_id: {}, db_id: {}, table_id: {}" + ", schema version: {}, backend_id: {}, status: {}, retry: {}", insertStmt.getOrigStmt().originStmt, DebugUtil.printId(context.queryId()), dbId, tableId, - nativeInsertStmt.getBaseSchemaVersion(), groupCommitPlanner.getBackend().getId(), + nativeInsertStmt.getBaseSchemaVersion(), groupCommitPlanner.getBackendId(), response.getStatus(), i); if (i < maxRetry) { List<TableIf> tables = Lists.newArrayList(insertStmt.getTargetTable()); @@ -2401,15 +2401,15 @@ public class StmtExecutor { } else { errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId + ", query_id: " + DebugUtil.printId(context.queryId()) + ", backend_id: " - + groupCommitPlanner.getBackend().getId() + ", status: " + response.getStatus(); + + groupCommitPlanner.getBackendId() + ", status: " + response.getStatus(); if (response.hasErrorUrl()) { errMsg += ", error url: " + response.getErrorUrl(); } } } else if (code != TStatusCode.OK) { errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId + ", query_id: " - + DebugUtil.printId(context.queryId()) + ", backend_id: " + groupCommitPlanner.getBackend() - .getId() + ", status: " + response.getStatus(); + + DebugUtil.printId(context.queryId()) + ", backend_id: " + + groupCommitPlanner.getBackendId() + ", status: " + response.getStatus(); if (response.hasErrorUrl()) { errMsg += ", error url: " + response.getErrorUrl(); } diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy index f14b28a7509..9d5abdca1de 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy @@ -117,7 +117,6 @@ suite("insert_group_commit_with_exception") { Statement statement = connection.createStatement(); statement.execute("use ${db}"); statement.execute("set group_commit = sync_mode"); - statement.execute("set enable_server_side_prepared_statement = true") // without column try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) { ps.setObject(1, 8); diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy index e93e157aa5d..44e8d173ca7 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -89,7 +89,9 @@ suite("insert_group_commit_with_prepare_stmt") { assertTrue(serverInfo.contains("'status':'PREPARE'")) assertTrue(serverInfo.contains("'label':'group_commit_")) // TODO: currently if enable_server_side_prepared_statement = true, will not reuse plan - // assertEquals(reuse_plan, serverInfo.contains("reuse_group_commit_plan")) + if (reuse_plan) { + assertEquals(reuse_plan, serverInfo.contains("reuse_group_commit_plan")) + } } else { // for batch insert ConnectionImpl connection = (ConnectionImpl) stmt.getConnection() diff --git a/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy index 347f99c2004..65556a91301 100644 --- a/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy @@ -134,7 +134,7 @@ suite("txn_insert_inject_case", "nonConcurrent") { } def result = sql "SELECT COUNT(*) FROM ${table}_0" - rowCount = result[0][0] + def rowCount = result[0][0] assertEquals(0, rowCount) // sleep(10000) } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org