This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c3fed216dc [fix](transaction) Fix publish txn wait too long when not 
meet quorum (#26659)
5c3fed216dc is described below

commit 5c3fed216dcf137d83279f68ba7bccf65e83464d
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Fri Nov 10 14:55:26 2023 +0800

    [fix](transaction) Fix publish txn wait too long when not meet quorum 
(#26659)
---
 .../doris/transaction/DatabaseTransactionMgr.java  | 16 ++------
 .../doris/transaction/PublishVersionDaemon.java    |  4 +-
 .../apache/doris/transaction/TransactionState.java | 36 ++++++++---------
 .../data/load/insert/test_publish_one_succ.out     | 10 +++++
 .../load/insert/test_publish_one_succ.groovy       | 45 ++++++++++++++++++++++
 5 files changed, 79 insertions(+), 32 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9b123030465..42cac0c9690 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -294,7 +294,7 @@ public class DatabaseTransactionMgr {
         info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
         info.add(TimeUtils.longToTimeString(txnState.getPreCommitTime()));
         info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
-        info.add(TimeUtils.longToTimeString(txnState.getPublishVersionTime()));
+        
info.add(TimeUtils.longToTimeString(txnState.getLastPublishVersionTime()));
         info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
         info.add(txnState.getReason());
         info.add(String.valueOf(txnState.getErrorReplicas().size()));
@@ -944,10 +944,10 @@ public class DatabaseTransactionMgr {
         Map<Long, PublishVersionTask> publishTasks = 
transactionState.getPublishVersionTasks();
 
         long now = System.currentTimeMillis();
-        long firstPublishOneSuccTime = 
transactionState.getFirstPublishOneSuccTime();
+        long firstPublishVersionTime = 
transactionState.getFirstPublishVersionTime();
         boolean allowPublishOneSucc = false;
-        if (Config.publish_wait_time_second > 0 && firstPublishOneSuccTime > 0
-                && now >= firstPublishOneSuccTime + 
Config.publish_wait_time_second * 1000L) {
+        if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
+                && now >= firstPublishVersionTime + 
Config.publish_wait_time_second * 1000L) {
             allowPublishOneSucc = true;
         }
 
@@ -970,7 +970,6 @@ public class DatabaseTransactionMgr {
         tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
         PublishResult publishResult = PublishResult.QUORUM_SUCC;
         try {
-            boolean allTabletsLeastOneSucc = true;
             Iterator<TableCommitInfo> tableCommitInfoIterator
                     = 
transactionState.getIdToTableCommitInfos().values().iterator();
             while (tableCommitInfoIterator.hasNext()) {
@@ -1058,10 +1057,6 @@ public class DatabaseTransactionMgr {
                                 continue;
                             }
 
-                            if (healthReplicaNum == 0) {
-                                allTabletsLeastOneSucc = false;
-                            }
-
                             String writeDetail = 
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
                                     tabletVersionFailedReplicas);
                             if (allowPublishOneSucc && healthReplicaNum > 0) {
@@ -1100,9 +1095,6 @@ public class DatabaseTransactionMgr {
                     }
                 }
             }
-            if (allTabletsLeastOneSucc && firstPublishOneSuccTime <= 0) {
-                transactionState.setFirstPublishOneSuccTime(now);
-            }
             if (publishResult == PublishResult.FAILED) {
                 return;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 922e8645d9f..06fa71303c1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -121,7 +121,7 @@ public class PublishVersionDaemon extends MasterDaemon {
                 batchTask.addTask(task);
                 transactionState.addPublishVersionTask(backendId, task);
             }
-            transactionState.setHasSendTask(true);
+            transactionState.setSendedTask();
             LOG.info("send publish tasks for transaction: {}, db: {}", 
transactionState.getTransactionId(),
                     transactionState.getDbId());
         }
@@ -174,7 +174,7 @@ public class PublishVersionDaemon extends MasterDaemon {
                     AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION, task.getSignature());
                 }
                 if (MetricRepo.isInit) {
-                    long publishTime = 
transactionState.getPublishVersionTime() - transactionState.getCommitTime();
+                    long publishTime = 
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
                     MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 2b01a612534..17e9f53d609 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -220,13 +220,14 @@ public class TransactionState implements Writable {
     // this state need not be serialized
     private Map<Long, PublishVersionTask> publishVersionTasks;
     private boolean hasSendTask;
-    private long publishVersionTime = -1;
     private TransactionStatus preStatus = null;
 
     // When publish txn, if every tablet has at least 1 replica published 
succ, but not quorum replicas succ,
-    // and time since firstPublishOneSuccTime has exceeds 
Config.publish_wait_time_second,
+    // and time since firstPublishVersionTime has exceeds 
Config.publish_wait_time_second,
     // then this transaction will become visible.
-    private long firstPublishOneSuccTime = -1;
+    private long firstPublishVersionTime = -1;
+
+    private long lastPublishVersionTime = -1;
 
     @SerializedName(value = "callbackId")
     private long callbackId = -1;
@@ -339,17 +340,24 @@ public class TransactionState implements Writable {
         this.publishVersionTasks.put(backendId, task);
     }
 
-    public void setHasSendTask(boolean hasSendTask) {
-        this.hasSendTask = hasSendTask;
-        this.publishVersionTime = System.currentTimeMillis();
+    public void setSendedTask() {
+        this.hasSendTask = true;
+        updateSendTaskTime();
     }
 
     public void updateSendTaskTime() {
-        this.publishVersionTime = System.currentTimeMillis();
+        this.lastPublishVersionTime = System.currentTimeMillis();
+        if (this.firstPublishVersionTime <= 0) {
+            this.firstPublishVersionTime = lastPublishVersionTime;
+        }
+    }
+
+    public long getFirstPublishVersionTime() {
+        return firstPublishVersionTime;
     }
 
-    public long getPublishVersionTime() {
-        return this.publishVersionTime;
+    public long getLastPublishVersionTime() {
+        return this.lastPublishVersionTime;
     }
 
     public boolean hasSendTask() {
@@ -420,14 +428,6 @@ public class TransactionState implements Writable {
         return errorLogUrl;
     }
 
-    public long getFirstPublishOneSuccTime() {
-        return firstPublishOneSuccTime;
-    }
-
-    public void setFirstPublishOneSuccTime(long firstPublishOneSuccTime) {
-        this.firstPublishOneSuccTime = firstPublishOneSuccTime;
-    }
-
     public void setTransactionStatus(TransactionStatus transactionStatus) {
         // status changed
         this.preStatus = this.transactionStatus;
@@ -646,7 +646,7 @@ public class TransactionState implements Writable {
         if (prolongPublishTimeout) {
             timeoutMillis *= 2;
         }
-        return System.currentTimeMillis() - publishVersionTime > timeoutMillis;
+        return System.currentTimeMillis() - lastPublishVersionTime > 
timeoutMillis;
     }
 
     public void prolongPublishTimeout() {
diff --git a/regression-test/data/load/insert/test_publish_one_succ.out 
b/regression-test/data/load/insert/test_publish_one_succ.out
new file mode 100644
index 00000000000..c82f0c7f1da
--- /dev/null
+++ b/regression-test/data/load/insert/test_publish_one_succ.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_1 --
+
+-- !select_2 --
+1      10
+2      20
+3      30
+4      40
+5      50
+
diff --git a/regression-test/suites/load/insert/test_publish_one_succ.groovy 
b/regression-test/suites/load/insert/test_publish_one_succ.groovy
new file mode 100644
index 00000000000..4e331b5c8d2
--- /dev/null
+++ b/regression-test/suites/load/insert/test_publish_one_succ.groovy
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_publish_one_succ') {
+    def options = new ClusterOptions()
+    options.enableDebugPoints()
+    docker(options) {
+        cluster.injectDebugPoints(NodeType.FE, 
['PublishVersionDaemon.stop_publish':null])
+
+        sql 'SET GLOBAL insert_visible_timeout_ms = 1000'
+        sql "ADMIN SET FRONTEND CONFIG ('publish_wait_time_second' = 
'1000000')"
+        sql 'CREATE TABLE tbl (k1 int, k2 int)'
+        for (def i = 1; i <= 5; i++) {
+            sql "INSERT INTO tbl VALUES (${i}, ${10 * i})"
+        }
+
+        cluster.stopBackends(2, 3)
+        cluster.checkBeIsAlive(2, false)
+        cluster.checkBeIsAlive(3, false)
+        cluster.clearFrontendDebugPoints()
+
+        sleep(1000)
+        order_qt_select_1 'SELECT * FROM tbl'
+        sql "ADMIN SET FRONTEND CONFIG ('publish_wait_time_second' = '2')"
+        sleep(2000)
+        order_qt_select_2 'SELECT * FROM tbl'
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to