liaoxin01 commented on code in PR #41267:
URL: https://github.com/apache/doris/pull/41267#discussion_r1774747100


##########
fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java:
##########
@@ -1096,17 +1113,24 @@ public void abortTransaction(Long dbId, Long 
transactionId, String reason,
                 
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
             }
             throw new UserException("abortTxn failed, errMsg:" + 
e.getMessage());
-        }
+        } finally {
+            TransactionState txnState = 
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
+            TxnStateChangeCallback cb = null;
+            if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
+                RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
+                cb = 
callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId());
+            } else {
+                cb = callbackFactory.getCallback(txnState.getCallbackId());

Review Comment:
   We should set txnOperated to false when abort failed.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java:
##########
@@ -512,22 +512,34 @@ private void commitTransaction(long dbId, List<Table> 
tableList, long transactio
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
+        boolean txnOperated = false;
+        TransactionState txnState = null;
+        TxnStateChangeCallback cb = null;
+        CommitTxnResponse commitTxnResponse = null;
         try {
-            commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
-        } catch (UserException e) {
-            // For routine load, it is necessary to release the write lock 
when commit transaction fails,
-            // otherwise it will cause the lock added in beforeCommitted to 
not be released.
+            commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList, 
txnState, commitTxnResponse);
+            txnOperated = true;
+        } finally {
             if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
                 RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
-                
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
+                cb = 
callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId());
+            } else {
+                cb = callbackFactory.getCallback(txnState.getCallbackId());
             }
-            throw e;
+
+            if (cb != null) {
+                LOG.info("commitTxn, run txn callback, transactionId:{} 
callbackId:{}, txnState:{}",
+                        txnState.getTransactionId(), txnState.getCallbackId(), 
txnState);
+                cb.afterCommitted(txnState, txnOperated);
+                cb.afterVisible(txnState, txnOperated);
+            }
+            afterCommitTxnResp(commitTxnResponse);

Review Comment:
   `afterCommitTxnResp` shouldn't be placed in finally.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java:
##########
@@ -1096,17 +1113,24 @@ public void abortTransaction(Long dbId, Long 
transactionId, String reason,
                 
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
             }
             throw new UserException("abortTxn failed, errMsg:" + 
e.getMessage());
-        }
+        } finally {
+            TransactionState txnState = 
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
+            TxnStateChangeCallback cb = null;
+            if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
+                RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
+                cb = 
callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId());
+            } else {
+                cb = callbackFactory.getCallback(txnState.getCallbackId());
+            }
 
-        TransactionState txnState = 
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
-        TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());

Review Comment:
   We should reserve this when this isn't routine load task.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java:
##########
@@ -990,9 +994,22 @@ public boolean commitAndPublishTransaction(DatabaseIf db, 
long transactionId,
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        commitTxn(commitTxnRequest, transactionId, false, db.getId(),
-                
subTransactionStates.stream().map(SubTransactionState::getTable)
-                        .collect(Collectors.toList()));
+        TransactionState txnState = null;
+        CommitTxnResponse commitTxnResponse = null;
+        try {
+            commitTxn(commitTxnRequest, transactionId, false, db.getId(),
+                    
subTransactionStates.stream().map(SubTransactionState::getTable)
+                        .collect(Collectors.toList()), txnState, 
commitTxnResponse);
+        } finally {
+            TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());
+            if (cb != null) {
+                LOG.info("commitTxn, run txn callback, transactionId:{} 
callbackId:{}, txnState:{}",
+                        txnState.getTransactionId(), txnState.getCallbackId(), 
txnState);
+                cb.afterCommitted(txnState, true);

Review Comment:
   why `txnOperated` is always true here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to