This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f46758814660f09f37e127f7b5b969ab34dc208b
Author: hui lai <1353307...@qq.com>
AuthorDate: Mon Aug 26 19:45:51 2024 +0800

    [fix](cloud) should do before commit check in cloud mode (#39775)
    
    In cloud mode, there is no before commit check when commit transaction.
    For routine load, job may be paused during sub task execution. If these
    transactions are committed, the problem is:
    1. The progress seen by the user may not be accurate. It will consume
    data repeatedly if they create a new job consumption using this
    progress.
    2. The user's modification offset operation may be overwritten by the
    offset of the transaction callback.
---
 .../apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java | 8 ++++++++
 .../java/org/apache/doris/load/routineload/RoutineLoadJob.java    | 3 ---
 .../main/java/org/apache/doris/transaction/TransactionState.java  | 4 ++++
 3 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 000c05fde27..3940976f3d9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -488,6 +488,14 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                         .loadJobFinalOperationToPb(loadJobFinalOperation));
             } else if (txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
                 RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
+                TxnStateChangeCallback cb = 
callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId());
+                if (cb != null) {
+                    // use a temporary transaction state to do before commit 
check,
+                    // what actually works is the transactionId
+                    TransactionState tmpTxnState = new TransactionState();
+                    tmpTxnState.setTransactionId(transactionId);
+                    cb.beforeCommitted(tmpTxnState);
+                }
                 builder.setCommitAttachment(TxnUtil
                         
.rlTaskTxnCommitAttachmentToPb(rlTaskTxnCommitAttachment));
             } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 5ca291aa8d7..de1bffe1d56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1084,9 +1084,6 @@ public abstract class RoutineLoadJob
     @Override
     public void afterCommitted(TransactionState txnState, boolean txnOperated) 
throws UserException {
         long taskBeId = -1L;
-        if (Config.isCloudMode()) {
-            writeLock();
-        }
         try {
             if (txnOperated) {
                 // find task in job
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 394e7ca4bf7..06738315be2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -429,6 +429,10 @@ public class TransactionState implements Writable {
         return requestId;
     }
 
+    public void setTransactionId(long transactionId) {
+        this.transactionId = transactionId;
+    }
+
     public long getTransactionId() {
         return transactionId;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to