This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f46758814660f09f37e127f7b5b969ab34dc208b Author: hui lai <1353307...@qq.com> AuthorDate: Mon Aug 26 19:45:51 2024 +0800 [fix](cloud) should do before commit check in cloud mode (#39775) In cloud mode, there is no before commit check when commit transaction. For routine load, job may be paused during sub task execution. If these transactions are committed, the problem is: 1. The progress seen by the user may not be accurate. It will consume data repeatedly if they create a new job consumption using this progress. 2. The user's modification offset operation may be overwritten by the offset of the transaction callback. --- .../apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java | 8 ++++++++ .../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 3 --- .../main/java/org/apache/doris/transaction/TransactionState.java | 4 ++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 000c05fde27..3940976f3d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -488,6 +488,14 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { .loadJobFinalOperationToPb(loadJobFinalOperation)); } else if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + TxnStateChangeCallback cb = callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId()); + if (cb != null) { + // use a temporary transaction state to do before commit check, + // what actually works is the transactionId + TransactionState tmpTxnState = new TransactionState(); + tmpTxnState.setTransactionId(transactionId); + cb.beforeCommitted(tmpTxnState); + } builder.setCommitAttachment(TxnUtil .rlTaskTxnCommitAttachmentToPb(rlTaskTxnCommitAttachment)); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 5ca291aa8d7..de1bffe1d56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1084,9 +1084,6 @@ public abstract class RoutineLoadJob @Override public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException { long taskBeId = -1L; - if (Config.isCloudMode()) { - writeLock(); - } try { if (txnOperated) { // find task in job diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 394e7ca4bf7..06738315be2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -429,6 +429,10 @@ public class TransactionState implements Writable { return requestId; } + public void setTransactionId(long transactionId) { + this.transactionId = transactionId; + } + public long getTransactionId() { return transactionId; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org