This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f4c4b27c3cb [fix](cloud) abortTransaction does not handle response code (#41275) f4c4b27c3cb is described below commit f4c4b27c3cb0f6f94fde74d8be15e7ef9df1dc09 Author: meiyi <myime...@gmail.com> AuthorDate: Thu Sep 26 14:06:00 2024 +0800 [fix](cloud) abortTransaction does not handle response code (#41275) --- .../transaction/CloudGlobalTransactionMgr.java | 42 ++++++++++++++-------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 5508bca5773..10baf4a135c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1097,6 +1097,33 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); } + afterAbortTxnResp(abortTxnResponse, String.valueOf(transactionId), txnCommitAttachment); + } + + private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String txnIdOrLabel, + TxnCommitAttachment txnCommitAttachment) throws UserException { + if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { + LOG.warn("abortTxn failed, transaction:{}, response:{}", txnIdOrLabel, abortTxnResponse); + // For routine load, it is necessary to release the write lock when abort transaction fails, + // otherwise it will cause the lock added in beforeAborted to not be released. + if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); + } + switch (abortTxnResponse.getStatus().getCode()) { + case TXN_ID_NOT_FOUND: + case TXN_LABEL_NOT_FOUND: + case TXN_INVALID_STATUS: + throw new TransactionNotFoundException("transaction [" + txnIdOrLabel + "] not found"); + case TXN_ALREADY_ABORTED: + throw new TransactionNotFoundException("transaction [" + txnIdOrLabel + "] is already aborted"); + case TXN_ALREADY_VISIBLE: + throw new UserException( + "transaction [" + txnIdOrLabel + "] is already visible, " + ", could not abort"); + default: + throw new UserException(abortTxnResponse.getStatus().getMsg()); + } + } TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); @@ -1149,20 +1176,7 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { LOG.warn("abortTxn failed, label:{}, exception:", label, e); throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); } - - TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); - TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); - if (cb == null) { - LOG.info("no callback to run for this txn, txnId:{} callbackId:{}", txnState.getTransactionId(), - txnState.getCallbackId()); - return; - } - - LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(), txnState.getCallbackId()); - cb.afterAborted(txnState, true, txnState.getReason()); - if (MetricRepo.isInit) { - MetricRepo.COUNTER_TXN_FAILED.increase(1L); - } + afterAbortTxnResp(abortTxnResponse, label, null); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org