liaoxin01 commented on code in PR #41267: URL: https://github.com/apache/doris/pull/41267#discussion_r1776314891
########## fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java: ########## @@ -512,22 +512,33 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio } final CommitTxnRequest commitTxnRequest = builder.build(); + boolean txnOperated = false; + TransactionState txnState = null; + TxnStateChangeCallback cb = null; + CommitTxnResponse commitTxnResponse = null; try { - commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList); - } catch (UserException e) { - // For routine load, it is necessary to release the write lock when commit transaction fails, - // otherwise it will cause the lock added in beforeCommitted to not be released. + commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList, txnState, commitTxnResponse); + txnOperated = true; + } finally { if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; - Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); + cb = callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId()); + } else { + cb = callbackFactory.getCallback(txnState.getCallbackId()); + } + + if (cb != null) { + LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}", + txnState.getTransactionId(), txnState.getCallbackId(), txnState); + cb.afterCommitted(txnState, txnOperated); + cb.afterVisible(txnState, txnOperated); } - throw e; } } private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId, - List<Table> tableList) throws UserException { - CommitTxnResponse commitTxnResponse = null; + List<Table> tableList, TransactionState txnState, + CommitTxnResponse commitTxnResponse) throws UserException { Review Comment: ```suggestion private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId, List<Table> tableList, TransactionState txnState) throws UserException ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org