liaoxin01 commented on code in PR #41267: URL: https://github.com/apache/doris/pull/41267#discussion_r1774747100
########## fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java: ########## @@ -1096,17 +1113,24 @@ public void abortTransaction(Long dbId, Long transactionId, String reason, Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); } throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); - } + } finally { + TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); + TxnStateChangeCallback cb = null; + if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + cb = callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId()); + } else { + cb = callbackFactory.getCallback(txnState.getCallbackId()); Review Comment: We should set txnOperated to false when abort failed. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java: ########## @@ -512,22 +512,34 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio } final CommitTxnRequest commitTxnRequest = builder.build(); + boolean txnOperated = false; + TransactionState txnState = null; + TxnStateChangeCallback cb = null; + CommitTxnResponse commitTxnResponse = null; try { - commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList); - } catch (UserException e) { - // For routine load, it is necessary to release the write lock when commit transaction fails, - // otherwise it will cause the lock added in beforeCommitted to not be released. + commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList, txnState, commitTxnResponse); + txnOperated = true; + } finally { if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; - Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); + cb = callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId()); + } else { + cb = callbackFactory.getCallback(txnState.getCallbackId()); } - throw e; + + if (cb != null) { + LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}", + txnState.getTransactionId(), txnState.getCallbackId(), txnState); + cb.afterCommitted(txnState, txnOperated); + cb.afterVisible(txnState, txnOperated); + } + afterCommitTxnResp(commitTxnResponse); Review Comment: `afterCommitTxnResp` shouldn't be placed in finally. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java: ########## @@ -1096,17 +1113,24 @@ public void abortTransaction(Long dbId, Long transactionId, String reason, Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); } throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); - } + } finally { + TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); + TxnStateChangeCallback cb = null; + if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + cb = callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId()); + } else { + cb = callbackFactory.getCallback(txnState.getCallbackId()); + } - TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); - TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); Review Comment: We should reserve this when this isn't routine load task. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java: ########## @@ -990,9 +994,22 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, } final CommitTxnRequest commitTxnRequest = builder.build(); - commitTxn(commitTxnRequest, transactionId, false, db.getId(), - subTransactionStates.stream().map(SubTransactionState::getTable) - .collect(Collectors.toList())); + TransactionState txnState = null; + CommitTxnResponse commitTxnResponse = null; + try { + commitTxn(commitTxnRequest, transactionId, false, db.getId(), + subTransactionStates.stream().map(SubTransactionState::getTable) + .collect(Collectors.toList()), txnState, commitTxnResponse); + } finally { + TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); + if (cb != null) { + LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}", + txnState.getTransactionId(), txnState.getCallbackId(), txnState); + cb.afterCommitted(txnState, true); Review Comment: why `txnOperated` is always true here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org