This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f409a45d0ba [fix](mtmv)resolve the issue of table version updates in 
concurrent situations (#32487)
f409a45d0ba is described below

commit f409a45d0baead1ae7c8195e2b5c60013a151315
Author: zhangdong <493738...@qq.com>
AuthorDate: Thu Mar 21 14:53:10 2024 +0800

    [fix](mtmv)resolve the issue of table version updates in concurrent 
situations (#32487)
    
    Move the logic for version+1 from `unprotectedCommitTransaction 
`to`FinishTransaction`, as the write lock for the table was obtained in 
`FinishTransaction`
---
 .../doris/transaction/DatabaseTransactionMgr.java   | 21 +++++++++++++++++----
 .../apache/doris/transaction/TableCommitInfo.java   |  4 +---
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index af94917f97d..0ae471bffa8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1040,6 +1040,7 @@ public class DatabaseTransactionMgr {
                 transactionState.setFinishTime(System.currentTimeMillis());
                 transactionState.clearErrorMsg();
                 
transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
+                setTableVersion(transactionState, db);
                 unprotectUpsertTransactionState(transactionState, false);
                 txnOperated = true;
                 // TODO(cmy): We found a very strange problem. When 
delete-related transactions are processed here,
@@ -1070,6 +1071,20 @@ public class DatabaseTransactionMgr {
                 transactionState, transactionState.getPublishCount(), 
publishResult.name());
     }
 
+    private void setTableVersion(TransactionState transactionState, Database 
db) {
+        Map<Long, TableCommitInfo> idToTableCommitInfos = 
transactionState.getIdToTableCommitInfos();
+        for (Entry<Long, TableCommitInfo> entry : 
idToTableCommitInfos.entrySet()) {
+            OlapTable table = (OlapTable) db.getTableNullable(entry.getKey());
+            if (table == null) {
+                LOG.warn("table {} does not exist when setTableVersion. 
transaction: {}, db: {}",
+                        entry.getKey(), transactionState.getTransactionId(), 
db.getId());
+                continue;
+            }
+            entry.getValue().setVersion(table.getNextVersion());
+            entry.getValue().setVersionTime(System.currentTimeMillis());
+        }
+    }
+
     private boolean finishCheckPartitionVersion(TransactionState 
transactionState, Database db,
             List<Pair<OlapTable, Partition>> relatedTblPartitions) {
         Iterator<TableCommitInfo> tableCommitInfoIterator
@@ -1323,8 +1338,7 @@ public class DatabaseTransactionMgr {
         transactionState.setErrorReplicas(errorReplicaIds);
         for (long tableId : tableToPartition.keySet()) {
             OlapTable table = (OlapTable) db.getTableNullable(tableId);
-            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId, 
table.getNextVersion(),
-                    System.currentTimeMillis());
+            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
             PartitionInfo tblPartitionInfo = table.getPartitionInfo();
             for (long partitionId : tableToPartition.get(tableId)) {
                 String partitionRange = "";
@@ -1364,8 +1378,7 @@ public class DatabaseTransactionMgr {
         transactionState.setErrorReplicas(errorReplicaIds);
         for (long tableId : tableToPartition.keySet()) {
             OlapTable table = (OlapTable) db.getTableNullable(tableId);
-            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId, 
table.getNextVersion(),
-                    System.currentTimeMillis());
+            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
             PartitionInfo tblPartitionInfo = table.getPartitionInfo();
             for (long partitionId : tableToPartition.get(tableId)) {
                 Partition partition = table.getPartition(partitionId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
index b8d968f3ca0..dbe8bdd68d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
@@ -46,11 +46,9 @@ public class TableCommitInfo implements Writable {
 
     }
 
-    public TableCommitInfo(long tableId, long version, long visibleTime) {
+    public TableCommitInfo(long tableId) {
         this.tableId = tableId;
         idToPartitionCommitInfo = Maps.newHashMap();
-        this.version = version;
-        this.versionTime = visibleTime;
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to