This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new d6757e03de4 [fix](group commit) Group commit http stream should not begin txn (#35494) (#35672) d6757e03de4 is described below commit d6757e03de4605254175b0b8e2fa2b6646f152cb Author: meiyi <myime...@gmail.com> AuthorDate: Thu May 30 20:57:59 2024 +0800 [fix](group commit) Group commit http stream should not begin txn (#35494) (#35672) ## Proposed changes Pick https://github.com/apache/doris/pull/35494 --- .../plans/commands/insert/OlapInsertExecutor.java | 24 +++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 3973fae4d4c..579e04b8e08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -86,6 +86,10 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { @Override public void beginTransaction() { + if (isGroupCommitHttpStream()) { + LOG.info("skip begin transaction for group commit http stream"); + return; + } try { if (ctx.isTxnModel()) { TransactionEntry txnEntry = ctx.getTxnEntry(); @@ -155,13 +159,15 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); } - TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId); - if (state == null) { - throw new AnalysisException("txn does not exist: " + txnId); - } - state.addTableIndexes((OlapTable) table); - if (physicalOlapTableSink.isPartialUpdate()) { - state.setSchemaForPartialUpdate((OlapTable) table); + if (!isGroupCommitHttpStream()) { + TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId); + if (state == null) { + throw new AnalysisException("txn does not exist: " + txnId); + } + state.addTableIndexes((OlapTable) table); + if (physicalOlapTableSink.isPartialUpdate()) { + state.setSchemaForPartialUpdate((OlapTable) table); + } } } @@ -279,4 +285,8 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { // update it, so that user can get loaded rows in fe.audit.log ctx.updateReturnRows((int) loadedRows); } + + private boolean isGroupCommitHttpStream() { + return ConnectContext.get() != null && ConnectContext.get().isGroupCommitStreamLoadSql(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org