This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d20365cdcf [fix](transaction) fix publish txn fake succ (#24273)
d20365cdcf is described below

commit d20365cdcf31786531dc87c059ffe280113778df
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Sep 14 21:04:59 2023 +0800

    [fix](transaction) fix publish txn fake succ (#24273)
---
 be/src/agent/task_worker_pool.cpp                  |  24 ++--
 be/src/common/status.h                             |   1 +
 be/src/olap/data_dir.cpp                           |   2 +-
 be/src/olap/task/engine_publish_version_task.cpp   |  52 +++++----
 be/src/olap/task/engine_publish_version_task.h     |  11 +-
 .../java/org/apache/doris/master/MasterImpl.java   |   6 +
 .../org/apache/doris/task/PublishVersionTask.java  |  13 +++
 .../doris/transaction/DatabaseTransactionMgr.java  |  79 ++++++++-----
 .../doris/transaction/GlobalTransactionMgr.java    |   6 +-
 .../doris/transaction/PublishVersionDaemon.java    | 126 ++-------------------
 .../transaction/DatabaseTransactionMgrTest.java    |  21 +++-
 .../transaction/GlobalTransactionMgrTest.java      |  36 +++---
 gensrc/thrift/MasterService.thrift                 |   1 +
 13 files changed, 174 insertions(+), 204 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 637c92af5a..a62d603fe0 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1525,17 +1525,18 @@ void 
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
         DorisMetrics::instance()->publish_task_request_total->increment(1);
         VLOG_NOTICE << "get publish version task. signature=" << 
agent_task_req.signature;
 
-        std::vector<TTabletId> error_tablet_ids;
-        std::vector<TTabletId> succ_tablet_ids;
+        std::set<TTabletId> error_tablet_ids;
+        std::map<TTabletId, TVersion> succ_tablets;
         // partition_id, tablet_id, publish_version
         std::vector<std::tuple<int64_t, int64_t, int64_t>> 
discontinuous_version_tablets;
         uint32_t retry_time = 0;
         Status status;
         bool is_task_timeout = false;
         while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
+            succ_tablets.clear();
             error_tablet_ids.clear();
             EnginePublishVersionTask engine_task(publish_version_req, 
&error_tablet_ids,
-                                                 &succ_tablet_ids, 
&discontinuous_version_tablets);
+                                                 &succ_tablets, 
&discontinuous_version_tablets);
             status = StorageEngine::instance()->execute_task(&engine_task);
             if (status.ok()) {
                 break;
@@ -1584,25 +1585,22 @@ void 
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
                     .tag("transaction_id", publish_version_req.transaction_id)
                     .tag("error_tablets_num", error_tablet_ids.size())
                     .error(status);
-            finish_task_request.__set_error_tablet_ids(error_tablet_ids);
         } else {
             if (!config::disable_auto_compaction &&
                 !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
-                for (int i = 0; i < succ_tablet_ids.size(); i++) {
+                for (auto [tablet_id, _] : succ_tablets) {
                     TabletSharedPtr tablet =
-                            
StorageEngine::instance()->tablet_manager()->get_tablet(
-                                    succ_tablet_ids[i]);
+                            
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
                     if (tablet != nullptr) {
                         tablet->publised_count++;
                         if (tablet->publised_count % 10 == 0) {
                             StorageEngine::instance()->submit_compaction_task(
                                     tablet, 
CompactionType::CUMULATIVE_COMPACTION, true);
-                            LOG(INFO) << "trigger compaction succ, tabletid:" 
<< succ_tablet_ids[i]
+                            LOG(INFO) << "trigger compaction succ, tablet_id:" 
<< tablet_id
                                       << ", publised:" << 
tablet->publised_count;
                         }
                     } else {
-                        LOG(WARNING)
-                                << "trigger compaction failed, tabletid:" << 
succ_tablet_ids[i];
+                        LOG(WARNING) << "trigger compaction failed, 
tablet_id:" << tablet_id;
                     }
                 }
             }
@@ -1611,7 +1609,7 @@ void 
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
             LOG_INFO("successfully publish version")
                     .tag("signature", agent_task_req.signature)
                     .tag("transaction_id", publish_version_req.transaction_id)
-                    .tag("tablets_num", succ_tablet_ids.size())
+                    .tag("tablets_num", succ_tablets.size())
                     .tag("cost(s)", cost_second);
         }
 
@@ -1620,7 +1618,9 @@ void 
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
         finish_task_request.__set_task_type(agent_task_req.task_type);
         finish_task_request.__set_signature(agent_task_req.signature);
         finish_task_request.__set_report_version(_s_report_version);
-        finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+        finish_task_request.__set_succ_tablets(succ_tablets);
+        finish_task_request.__set_error_tablet_ids(
+                std::vector<TTabletId>(error_tablet_ids.begin(), 
error_tablet_ids.end()));
 
         _finish_task(finish_task_request);
         _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
diff --git a/be/src/common/status.h b/be/src/common/status.h
index e49bffa70f..14c81c6e2c 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -306,6 +306,7 @@ constexpr bool capture_stacktrace(int code) {
         && code != ErrorCode::INVERTED_INDEX_BUILD_WAITTING
         && code != ErrorCode::META_KEY_NOT_FOUND
         && code != ErrorCode::PUSH_VERSION_ALREADY_EXIST
+        && code != ErrorCode::VERSION_NOT_EXIST
         && code != ErrorCode::TABLE_ALREADY_DELETED_ERROR
         && code != ErrorCode::TRANSACTION_NOT_EXIST
         && code != ErrorCode::TRANSACTION_ALREADY_VISIBLE
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 104e549f6a..301c463cbb 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -490,7 +490,7 @@ Status DataDir::load() {
         PendingPublishInfoPB pending_publish_info_pb;
         bool parsed = pending_publish_info_pb.ParseFromString(info);
         if (!parsed) {
-            LOG(WARNING) << "parse pending publish info failed, tablt_id: " << 
tablet_id
+            LOG(WARNING) << "parse pending publish info failed, tablet_id: " 
<< tablet_id
                          << " publish_version: " << publish_version;
         }
         StorageEngine::instance()->add_async_publish_task(
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 1353e66915..a56430fbb1 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -69,22 +69,17 @@ void TabletPublishStatistics::record_in_bvar() {
 }
 
 EnginePublishVersionTask::EnginePublishVersionTask(
-        const TPublishVersionRequest& publish_version_req, 
std::vector<TTabletId>* error_tablet_ids,
-        std::vector<TTabletId>* succ_tablet_ids,
+        const TPublishVersionRequest& publish_version_req, 
std::set<TTabletId>* error_tablet_ids,
+        std::map<TTabletId, TVersion>* succ_tablets,
         std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
discontinuous_version_tablets)
         : _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
-          _succ_tablet_ids(succ_tablet_ids),
+          _succ_tablets(succ_tablets),
           _discontinuous_version_tablets(discontinuous_version_tablets) {}
 
 void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
     std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
-    _error_tablet_ids->push_back(tablet_id);
-}
-
-void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
-    std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
-    _succ_tablet_ids->push_back(tablet_id);
+    _error_tablet_ids->insert(tablet_id);
 }
 
 Status EnginePublishVersionTask::finish() {
@@ -126,7 +121,7 @@ Status EnginePublishVersionTask::finish() {
             // and receive fe's publish version task
             // this be must return as an error tablet
             if (rowset == nullptr) {
-                _error_tablet_ids->push_back(tablet_info.tablet_id);
+                add_error_tablet_id(tablet_info.tablet_id);
                 res = Status::Error<PUSH_ROWSET_NOT_FOUND>(
                         "could not find related rowset for tablet {}, txn id 
{}",
                         tablet_info.tablet_id, transaction_id);
@@ -135,7 +130,7 @@ Status EnginePublishVersionTask::finish() {
             TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
                     tablet_info.tablet_id, tablet_info.tablet_uid);
             if (tablet == nullptr) {
-                _error_tablet_ids->push_back(tablet_info.tablet_id);
+                add_error_tablet_id(tablet_info.tablet_id);
                 res = Status::Error<PUSH_TABLE_NOT_EXIST>(
                         "can't get tablet when publish version. tablet_id={}",
                         tablet_info.tablet_id);
@@ -199,6 +194,7 @@ Status EnginePublishVersionTask::finish() {
     }
     token->wait();
 
+    _succ_tablets->clear();
     // check if the related tablet remained all have the version
     for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
         int64_t partition_id = par_ver_info.partition_id;
@@ -209,18 +205,36 @@ Status EnginePublishVersionTask::finish() {
 
         Version version(par_ver_info.version, par_ver_info.version);
         for (auto& tablet_info : partition_related_tablet_infos) {
-            // has to use strict mode to check if check all tablets
-            if (!_publish_version_req.strict_mode) {
-                break;
-            }
             TabletSharedPtr tablet =
                     
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id);
+            auto tablet_id = tablet_info.tablet_id;
             if (tablet == nullptr) {
-                add_error_tablet_id(tablet_info.tablet_id);
+                add_error_tablet_id(tablet_id);
+                _succ_tablets->erase(tablet_id);
+                LOG(WARNING) << "publish version failed on transaction, not 
found tablet. "
+                             << "transaction_id=" << transaction_id << ", 
tablet_id=" << tablet_id
+                             << ", version=" << par_ver_info.version;
             } else {
                 // check if the version exist, if not exist, then set publish 
failed
-                if (!tablet->check_version_exist(version)) {
-                    add_error_tablet_id(tablet_info.tablet_id);
+                if (_error_tablet_ids->find(tablet_id) == 
_error_tablet_ids->end()) {
+                    if (tablet->check_version_exist(version)) {
+                        // it's better to report the max continous succ 
version,
+                        // but it maybe time cost now.
+                        // current just report 0
+                        (*_succ_tablets)[tablet_id] = 0;
+                    } else {
+                        add_error_tablet_id(tablet_id);
+                        if (res.ok()) {
+                            res = Status::Error<VERSION_NOT_EXIST>(
+                                    "tablet {} not exists version {}", 
tablet_id,
+                                    par_ver_info.version);
+                        }
+                        LOG(WARNING) << "publish version failed on 
transaction, tablet version not "
+                                        "exists. "
+                                     << "transaction_id=" << transaction_id
+                                     << ", tablet_id=" << tablet_id
+                                     << ", version=" << par_ver_info.version;
+                    }
                 }
             }
         }
@@ -280,9 +294,7 @@ void TabletPublishTxnTask::handle() {
         
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
         return;
     }
-    _engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id);
     int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
-    // print stats if publish cost > 500ms
     g_tablet_publish_latency << cost_us;
     _stats.record_in_bvar();
     LOG(INFO) << "publish version successfully on tablet"
diff --git a/be/src/olap/task/engine_publish_version_task.h 
b/be/src/olap/task/engine_publish_version_task.h
index 8acf8099ca..0a270c93d2 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -23,7 +23,9 @@
 
 #include <atomic>
 #include <condition_variable>
+#include <map>
 #include <mutex>
+#include <set>
 #include <vector>
 
 #include "common/status.h"
@@ -83,23 +85,22 @@ private:
 class EnginePublishVersionTask : public EngineTask {
 public:
     EnginePublishVersionTask(
-            const TPublishVersionRequest& publish_version_req, 
vector<TTabletId>* error_tablet_ids,
-            std::vector<TTabletId>* succ_tablet_ids,
+            const TPublishVersionRequest& publish_version_req,
+            std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, 
TVersion>* succ_tablets,
             std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
discontinous_version_tablets);
     ~EnginePublishVersionTask() {}
 
     virtual Status finish() override;
 
     void add_error_tablet_id(int64_t tablet_id);
-    void add_succ_tablet_id(int64_t tablet_id);
 
     int64_t finish_task();
 
 private:
     const TPublishVersionRequest& _publish_version_req;
     std::mutex _tablet_ids_mutex;
-    vector<TTabletId>* _error_tablet_ids;
-    vector<TTabletId>* _succ_tablet_ids;
+    std::set<TTabletId>* _error_tablet_ids;
+    std::map<TTabletId, TVersion>* _succ_tablets;
     std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
_discontinuous_version_tablets;
 };
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 4e031c0dac..23118647cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -67,6 +67,7 @@ import org.apache.thrift.TException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class MasterImpl {
@@ -465,6 +466,10 @@ public class MasterImpl {
     }
 
     private void finishPublishVersion(AgentTask task, TFinishTaskRequest 
request) {
+        Map<Long, Long> succTablets = null;
+        if (request.isSetSuccTablets()) {
+            succTablets = request.getSuccTablets();
+        }
         List<Long> errorTabletIds = null;
         if (request.isSetErrorTabletIds()) {
             errorTabletIds = request.getErrorTabletIds();
@@ -478,6 +483,7 @@ public class MasterImpl {
         }
 
         PublishVersionTask publishVersionTask = (PublishVersionTask) task;
+        publishVersionTask.setSuccTablets(succTablets);
         publishVersionTask.addErrorTablets(errorTabletIds);
         publishVersionTask.setFinished(true);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 1ce9d866b8..8461b1db4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class PublishVersionTask extends AgentTask {
     private static final Logger LOG = 
LogManager.getLogger(PublishVersionTask.class);
@@ -34,11 +35,15 @@ public class PublishVersionTask extends AgentTask {
     private List<TPartitionVersionInfo> partitionVersionInfos;
     private List<Long> errorTablets;
 
+    // tabletId => version, current version = 0
+    private Map<Long, Long> succTablets;
+
     public PublishVersionTask(long backendId, long transactionId, long dbId,
             List<TPartitionVersionInfo> partitionVersionInfos, long 
createTime) {
         super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, 
-1L, transactionId, createTime);
         this.transactionId = transactionId;
         this.partitionVersionInfos = partitionVersionInfos;
+        this.succTablets = null;
         this.errorTablets = new ArrayList<Long>();
         this.isFinished = false;
     }
@@ -57,6 +62,14 @@ public class PublishVersionTask extends AgentTask {
         return partitionVersionInfos;
     }
 
+    public Map<Long, Long> getSuccTablets() {
+        return succTablets;
+    }
+
+    public void setSuccTablets(Map<Long, Long> succTablets) {
+        this.succTablets = succTablets;
+    }
+
     public synchronized List<Long> getErrorTablets() {
         return errorTablets;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 8397499ded..7974cb6a89 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -54,6 +54,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.ClearTransactionTask;
+import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -868,7 +869,7 @@ public class DatabaseTransactionMgr {
         }
     }
 
-    public void finishTransaction(long transactionId, Set<Long> 
errorReplicaIds) throws UserException {
+    public void finishTransaction(long transactionId) throws UserException {
         TransactionState transactionState = null;
         readLock();
         try {
@@ -876,14 +877,10 @@ public class DatabaseTransactionMgr {
         } finally {
             readUnlock();
         }
+
         // add all commit errors and publish errors to a single set
-        if (errorReplicaIds == null) {
-            errorReplicaIds = Sets.newHashSet();
-        }
-        Set<Long> originalErrorReplicas = transactionState.getErrorReplicas();
-        if (originalErrorReplicas != null) {
-            errorReplicaIds.addAll(originalErrorReplicas);
-        }
+        Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
+        Map<Long, PublishVersionTask> publishTasks = 
transactionState.getPublishVersionTasks();
 
         long now = System.currentTimeMillis();
         long firstPublishOneSuccTime = 
transactionState.getFirstPublishOneSuccTime();
@@ -980,21 +977,10 @@ public class DatabaseTransactionMgr {
                             tabletWriteFailedReplicas.clear();
                             tabletVersionFailedReplicas.clear();
                             for (Replica replica : tablet.getReplicas()) {
-                                if 
(!errorReplicaIds.contains(replica.getId())) {
-                                    if 
(replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
-                                        tabletSuccReplicas.add(replica);
-                                    } else {
-                                        
tabletVersionFailedReplicas.add(replica);
-                                    }
-                                } else if (replica.getVersion() >= 
partitionCommitInfo.getVersion()) {
-                                    // the replica's version is larger than or 
equal to current transaction
-                                    // partition's version the replica is 
normal, then remove it from error replica ids
-                                    // TODO(cmy): actually I have no idea why 
we need this check
-                                    tabletSuccReplicas.add(replica);
-                                    errorReplicaIds.remove(replica.getId());
-                                } else {
-                                    tabletWriteFailedReplicas.add(replica);
-                                }
+                                
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
+                                        partitionCommitInfo.getVersion(), 
publishTasks.get(replica.getBackendId()),
+                                        errorReplicaIds, tabletSuccReplicas, 
tabletWriteFailedReplicas,
+                                        tabletVersionFailedReplicas);
                             }
 
                             int healthReplicaNum = tabletSuccReplicas.size();
@@ -1005,7 +991,7 @@ public class DatabaseTransactionMgr {
                                     LOG.info("publish version quorum succ for 
transaction {} on tablet {} with version"
                                             + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
                                             + " tablet detail: {}",
-                                            transactionState, tablet, 
partitionCommitInfo.getVersion(),
+                                            transactionState, tablet.getId(), 
partitionCommitInfo.getVersion(),
                                             quorumReplicaNum, tableId, 
partitionId, writeDetail);
                                 }
                                 continue;
@@ -1033,8 +1019,8 @@ public class DatabaseTransactionMgr {
                                 LOG.info("publish version timeout succ for 
transaction {} on tablet {} with version"
                                         + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
                                         + " tablet detail: {}",
-                                        transactionState, tablet, 
partitionCommitInfo.getVersion(), quorumReplicaNum,
-                                        tableId, partitionId, writeDetail);
+                                        transactionState, tablet.getId(), 
partitionCommitInfo.getVersion(),
+                                        quorumReplicaNum, tableId, 
partitionId, writeDetail);
                             } else {
                                 publishResult = PublishResult.FAILED;
                                 String errMsg = String.format("publish on 
tablet %d failed."
@@ -1046,8 +1032,8 @@ public class DatabaseTransactionMgr {
                                 LOG.info("publish version failed for 
transaction {} on tablet {} with version"
                                         + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
                                         + " tablet detail: {}",
-                                        transactionState, tablet, 
partitionCommitInfo.getVersion(), quorumReplicaNum,
-                                        tableId, partitionId, writeDetail);
+                                        transactionState, tablet.getId(), 
partitionCommitInfo.getVersion(),
+                                        quorumReplicaNum, tableId, 
partitionId, writeDetail);
                             }
                         }
                     }
@@ -1093,6 +1079,43 @@ public class DatabaseTransactionMgr {
         LOG.info("finish transaction {} successfully, publish result: {}", 
transactionState, publishResult.name());
     }
 
+    private void checkReplicaContinuousVersionSucc(long tabletId, Replica 
replica, long version,
+            PublishVersionTask backendPublishTask, Set<Long> errorReplicaIds, 
List<Replica> tabletSuccReplicas,
+            List<Replica> tabletWriteFailedReplicas, List<Replica> 
tabletVersionFailedReplicas) {
+        if (backendPublishTask == null || !backendPublishTask.isFinished()) {
+            errorReplicaIds.add(replica.getId());
+        } else {
+            Map<Long, Long> backendSuccTablets = 
backendPublishTask.getSuccTablets();
+            // new doris BE will report succ tablets
+            if (backendSuccTablets != null) {
+                if (backendSuccTablets.containsKey(tabletId)) {
+                    errorReplicaIds.remove(replica.getId());
+                } else {
+                    errorReplicaIds.add(replica.getId());
+                }
+            } else {
+                // for compatibility, old doris BE report only error tablets
+                List<Long> backendErrorTablets = 
backendPublishTask.getErrorTablets();
+                if (backendErrorTablets != null && 
backendErrorTablets.contains(tabletId)) {
+                    errorReplicaIds.add(replica.getId());
+                }
+            }
+        }
+
+        if (!errorReplicaIds.contains(replica.getId())) {
+            if (replica.checkVersionCatchUp(version - 1, true)) {
+                tabletSuccReplicas.add(replica);
+            } else {
+                tabletVersionFailedReplicas.add(replica);
+            }
+        } else if (replica.getVersion() >= version) {
+            tabletSuccReplicas.add(replica);
+            errorReplicaIds.remove(replica.getId());
+        } else {
+            tabletWriteFailedReplicas.add(replica);
+        }
+    }
+
     protected void unprotectedPreCommitTransaction2PC(TransactionState 
transactionState, Set<Long> errorReplicaIds,
                                                 Map<Long, Set<Long>> 
tableToPartition, Set<Long> totalInvolvedBackends,
                                                 Database db) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 1ad8d2deb8..22a019c4c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -402,13 +402,13 @@ public class GlobalTransactionMgr implements Writable {
     /**
      * if the table is deleted between commit and publish version, then should 
ignore the partition
      *
+     * @param dbId
      * @param transactionId
-     * @param errorReplicaIds
      * @return
      */
-    public void finishTransaction(long dbId, long transactionId, Set<Long> 
errorReplicaIds) throws UserException {
+    public void finishTransaction(long dbId, long transactionId) throws 
UserException {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
-        dbTransactionMgr.finishTransaction(transactionId, errorReplicaIds);
+        dbTransactionMgr.finishTransaction(transactionId);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 4cb3ee5e9e..33ea8de07e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -17,18 +17,11 @@
 
 package org.apache.doris.transaction;
 
-import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
@@ -36,14 +29,12 @@ import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TPartitionVersionInfo;
 import org.apache.doris.thrift.TTaskType;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class PublishVersionDaemon extends MasterDaemon {
@@ -63,15 +54,6 @@ public class PublishVersionDaemon extends MasterDaemon {
         }
     }
 
-    private boolean 
isAllBackendsOfUnfinishedTasksDead(List<PublishVersionTask> unfinishedTasks) {
-        for (PublishVersionTask unfinishedTask : unfinishedTasks) {
-            if 
(Env.getCurrentSystemInfo().checkBackendAlive(unfinishedTask.getBackendId())) {
-                return false;
-            }
-        }
-        return true;
-    }
-
     private void publishVersion() {
         GlobalTransactionMgr globalTransactionMgr = 
Env.getCurrentGlobalTransactionMgr();
         List<TransactionState> readyTransactionStates = 
globalTransactionMgr.getReadyToPublishTransactions();
@@ -81,7 +63,8 @@ public class PublishVersionDaemon extends MasterDaemon {
 
         // ATTN, we publish transaction state to all backends including dead 
backend, if not publish to dead backend
         // then transaction manager will treat it as success
-        List<Long> allBackends = 
Env.getCurrentSystemInfo().getAllBackendIds(false);
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        List<Long> allBackends = infoService.getAllBackendIds(false);
         if (allBackends.isEmpty()) {
             LOG.warn("some transaction state need to publish, but no backend 
exists");
             return;
@@ -138,109 +121,17 @@ public class PublishVersionDaemon extends MasterDaemon {
             AgentTaskExecutor.submit(batchTask);
         }
 
-        TabletInvertedIndex tabletInvertedIndex = 
Env.getCurrentInvertedIndex();
         // try to finish the transaction, if failed just retry in next loop
         for (TransactionState transactionState : readyTransactionStates) {
-            Map<Long, PublishVersionTask> transTasks = 
transactionState.getPublishVersionTasks();
-            Set<Long> publishErrorReplicaIds = Sets.newHashSet();
-            List<PublishVersionTask> unfinishedTasks = Lists.newArrayList();
-            for (PublishVersionTask publishVersionTask : transTasks.values()) {
-                if (publishVersionTask.isFinished()) {
-                    // sometimes backend finish publish version task,
-                    // but it maybe failed to change transactionid to version 
for some tablets
-                    // and it will upload the failed tabletinfo to fe and fe 
will deal with them
-                    List<Long> errorTablets = 
publishVersionTask.getErrorTablets();
-                    if (errorTablets == null || errorTablets.isEmpty()) {
-                        continue;
-                    } else {
-                        for (long tabletId : errorTablets) {
-                            // tablet inverted index also contains rollingup 
index
-                            // if tablet meta not contains the tablet, skip 
this tablet because this tablet is dropped
-                            // from fe
-                            if (tabletInvertedIndex.getTabletMeta(tabletId) == 
null) {
-                                continue;
-                            }
-                            Replica replica = tabletInvertedIndex.getReplica(
-                                    tabletId, 
publishVersionTask.getBackendId());
-                            if (replica != null) {
-                                publishErrorReplicaIds.add(replica.getId());
-                            } else {
-                                LOG.info("could not find related replica with 
tabletid={}, backendid={}",
-                                        tabletId, 
publishVersionTask.getBackendId());
-                            }
-                        }
-                    }
-                } else {
-                    unfinishedTasks.add(publishVersionTask);
-                }
-            }
-
-            boolean shouldFinishTxn = false;
-            if (!unfinishedTasks.isEmpty()) {
-                shouldFinishTxn = 
isAllBackendsOfUnfinishedTasksDead(unfinishedTasks);
-                if (transactionState.isPublishTimeout() || shouldFinishTxn) {
-                    // transaction's publish is timeout, but there still has 
unfinished tasks.
-                    // we need to collect all error replicas, and try to 
finish this txn.
-                    for (PublishVersionTask unfinishedTask : unfinishedTasks) {
-                        // set all replicas in the backend to error state
-                        List<TPartitionVersionInfo> versionInfos = 
unfinishedTask.getPartitionVersionInfos();
-                        Set<Long> errorPartitionIds = Sets.newHashSet();
-                        for (TPartitionVersionInfo versionInfo : versionInfos) 
{
-                            
errorPartitionIds.add(versionInfo.getPartitionId());
-                        }
-                        if (errorPartitionIds.isEmpty()) {
-                            continue;
-                        }
-
-                        Database db = Env.getCurrentInternalCatalog()
-                                .getDbNullable(transactionState.getDbId());
-                        if (db == null) {
-                            LOG.warn("Database [{}] has been dropped.", 
transactionState.getDbId());
-                            continue;
-                        }
-
-                        for (long tableId : transactionState.getTableIdList()) 
{
-                            Table table = db.getTableNullable(tableId);
-                            if (table == null || table.getType() != 
Table.TableType.OLAP) {
-                                LOG.warn("Table [{}] in database [{}] has been 
dropped.", tableId, db.getFullName());
-                                continue;
-                            }
-                            OlapTable olapTable = (OlapTable) table;
-                            olapTable.readLock();
-                            try {
-                                for (Long errorPartitionId : 
errorPartitionIds) {
-                                    Partition partition = 
olapTable.getPartition(errorPartitionId);
-                                    if (partition != null) {
-                                        List<MaterializedIndex> 
materializedIndexList
-                                                = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
-                                        for (MaterializedIndex 
materializedIndex : materializedIndexList) {
-                                            for (Tablet tablet : 
materializedIndex.getTablets()) {
-                                                Replica replica = 
tablet.getReplicaByBackendId(
-                                                        
unfinishedTask.getBackendId());
-                                                if (replica != null) {
-                                                    
publishErrorReplicaIds.add(replica.getId());
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                            } finally {
-                                olapTable.readUnlock();
-                            }
-                        }
-                    }
-                    shouldFinishTxn = true;
-                }
-            } else {
-                // all publish tasks are finished, try to finish this txn.
-                shouldFinishTxn = true;
-            }
+            boolean hasBackendAliveAndUnfinishTask = 
transactionState.getPublishVersionTasks().values().stream()
+                    .anyMatch(task -> !task.isFinished() && 
infoService.checkBackendAlive(task.getBackendId()));
 
+            boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask || 
transactionState.isPublishTimeout();
             if (shouldFinishTxn) {
                 try {
                     // one transaction exception should not affect other 
transaction
                     
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
-                            transactionState.getTransactionId(), 
publishErrorReplicaIds);
+                            transactionState.getTransactionId());
                 } catch (Exception e) {
                     LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
                 }
@@ -248,8 +139,7 @@ public class PublishVersionDaemon extends MasterDaemon {
                     // if finish transaction state failed, then update publish 
version time, should check
                     // to finish after some interval
                     transactionState.updateSendTaskTime();
-                    LOG.debug("publish version for transaction {} failed, has 
{} error replicas during publish",
-                            transactionState, publishErrorReplicaIds.size());
+                    LOG.debug("publish version for transaction {} failed", 
transactionState);
                 }
             }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 0e89e4c86e..b988650fa2 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.meta.MetaContext;
+import org.apache.doris.task.PublishVersionTask;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -56,7 +57,17 @@ public class DatabaseTransactionMgrTest {
     private static Env slaveEnv;
     private static Map<String, Long> LabelToTxnId;
 
-    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+    private TransactionState.TxnCoordinator transactionSource =
+            new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+
+    public static void setTransactionFinishPublish(TransactionState 
transactionState, List<Long> backendIds) {
+        for (long backendId : backendIds) {
+            PublishVersionTask task = new PublishVersionTask(backendId, 
transactionState.getTransactionId(),
+                    transactionState.getDbId(), null, 
System.currentTimeMillis());
+            task.setFinished(true);
+            transactionState.addPublishVersionTask(backendId, task);
+        }
+    }
 
     @Before
     public void setUp() throws InstantiationException, IllegalAccessException, 
IllegalArgumentException,
@@ -100,7 +111,11 @@ public class DatabaseTransactionMgrTest {
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
         masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId1, transTablets);
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId1, null);
+        TransactionState transactionState1 = 
fakeEditLog.getTransaction(transactionId1);
+        setTransactionFinishPublish(transactionState1,
+                Lists.newArrayList(CatalogTestUtil.testBackendId1,
+                        CatalogTestUtil.testBackendId2, 
CatalogTestUtil.testBackendId3));
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId1);
         labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
 
         TransactionState.TxnCoordinator beTransactionSource = new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1");
@@ -120,8 +135,6 @@ public class DatabaseTransactionMgrTest {
         labelToTxnId.put(CatalogTestUtil.testTxnLabel3, transactionId3);
         labelToTxnId.put(CatalogTestUtil.testTxnLabel4, transactionId4);
 
-        TransactionState transactionState1 = 
fakeEditLog.getTransaction(transactionId1);
-
         FakeEnv.setEnv(slaveEnv);
         slaveTransMgr.replayUpsertTransactionState(transactionState1);
         return labelToTxnId;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 953ea3d879..a819c4f030 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -55,7 +55,6 @@ import 
org.apache.doris.transaction.TransactionState.TxnSourceType;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import mockit.Injectable;
 import mockit.Mocked;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -66,10 +65,8 @@ import org.junit.Test;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 
-
 public class GlobalTransactionMgrTest {
 
     private static FakeEditLog fakeEditLog;
@@ -467,9 +464,12 @@ public class GlobalTransactionMgrTest {
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
         slaveTransMgr.replayUpsertTransactionState(transactionState);
-        Set<Long> errorReplicaIds = Sets.newHashSet();
-        errorReplicaIds.add(CatalogTestUtil.testReplicaId1);
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId, errorReplicaIds);
+        
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
+                Lists.newArrayList(CatalogTestUtil.testBackendId1,
+                        CatalogTestUtil.testBackendId2, 
CatalogTestUtil.testBackendId3));
+        transactionState.getPublishVersionTasks()
+                
.get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1);
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId);
         transactionState = fakeEditLog.getTransaction(transactionId);
         Assert.assertEquals(TransactionStatus.VISIBLE, 
transactionState.getTransactionStatus());
         // check replica version
@@ -524,9 +524,13 @@ public class GlobalTransactionMgrTest {
 
         // master finish the transaction failed
         FakeEnv.setEnv(masterEnv);
-        Set<Long> errorReplicaIds = Sets.newHashSet();
-        errorReplicaIds.add(CatalogTestUtil.testReplicaId2);
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId, errorReplicaIds);
+        
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
+                Lists.newArrayList(CatalogTestUtil.testBackendId1, 
CatalogTestUtil.testBackendId2));
+
+        // backend2 publish failed
+        transactionState.getPublishVersionTasks()
+                
.get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1);
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId);
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
         Replica replica1 = 
tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
         Replica replica2 = 
tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
@@ -540,8 +544,12 @@ public class GlobalTransactionMgrTest {
         Assert.assertEquals(-1, replica2.getLastFailedVersion());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, 
replica3.getLastFailedVersion());
 
-        errorReplicaIds = Sets.newHashSet();
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId, errorReplicaIds);
+        // backend2 publish success
+        Map<Long, Long> backend2SuccTablets = Maps.newHashMap();
+        backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L);
+        transactionState.getPublishVersionTasks()
+                
.get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets);
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId);
         Assert.assertEquals(TransactionStatus.VISIBLE, 
transactionState.getTransactionStatus());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, 
replica1.getVersion());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, 
replica2.getVersion());
@@ -603,8 +611,10 @@ public class GlobalTransactionMgrTest {
         Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv));
 
         // master finish the transaction2
-        errorReplicaIds = Sets.newHashSet();
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId2, errorReplicaIds);
+        
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
+                Lists.newArrayList(CatalogTestUtil.testBackendId1,
+                        CatalogTestUtil.testBackendId2, 
CatalogTestUtil.testBackendId3));
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId2);
         Assert.assertEquals(TransactionStatus.VISIBLE, 
transactionState.getTransactionStatus());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, 
replica1.getVersion());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, 
replica2.getVersion());
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index 3eda812e1b..dedc454d33 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -65,6 +65,7 @@ struct TFinishTaskRequest {
     14: optional list<Types.TTabletId> downloaded_tablet_ids
     15: optional i64 copy_size
     16: optional i64 copy_time_ms
+    17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
 }
 
 struct TTablet {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to