This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d20365cdcf [fix](transaction) fix publish txn fake succ (#24273) d20365cdcf is described below commit d20365cdcf31786531dc87c059ffe280113778df Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Sep 14 21:04:59 2023 +0800 [fix](transaction) fix publish txn fake succ (#24273) --- be/src/agent/task_worker_pool.cpp | 24 ++-- be/src/common/status.h | 1 + be/src/olap/data_dir.cpp | 2 +- be/src/olap/task/engine_publish_version_task.cpp | 52 +++++---- be/src/olap/task/engine_publish_version_task.h | 11 +- .../java/org/apache/doris/master/MasterImpl.java | 6 + .../org/apache/doris/task/PublishVersionTask.java | 13 +++ .../doris/transaction/DatabaseTransactionMgr.java | 79 ++++++++----- .../doris/transaction/GlobalTransactionMgr.java | 6 +- .../doris/transaction/PublishVersionDaemon.java | 126 ++------------------- .../transaction/DatabaseTransactionMgrTest.java | 21 +++- .../transaction/GlobalTransactionMgrTest.java | 36 +++--- gensrc/thrift/MasterService.thrift | 1 + 13 files changed, 174 insertions(+), 204 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 637c92af5a..a62d603fe0 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1525,17 +1525,18 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { DorisMetrics::instance()->publish_task_request_total->increment(1); VLOG_NOTICE << "get publish version task. signature=" << agent_task_req.signature; - std::vector<TTabletId> error_tablet_ids; - std::vector<TTabletId> succ_tablet_ids; + std::set<TTabletId> error_tablet_ids; + std::map<TTabletId, TVersion> succ_tablets; // partition_id, tablet_id, publish_version std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets; uint32_t retry_time = 0; Status status; bool is_task_timeout = false; while (retry_time < PUBLISH_VERSION_MAX_RETRY) { + succ_tablets.clear(); error_tablet_ids.clear(); EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, - &succ_tablet_ids, &discontinuous_version_tablets); + &succ_tablets, &discontinuous_version_tablets); status = StorageEngine::instance()->execute_task(&engine_task); if (status.ok()) { break; @@ -1584,25 +1585,22 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { .tag("transaction_id", publish_version_req.transaction_id) .tag("error_tablets_num", error_tablet_ids.size()) .error(status); - finish_task_request.__set_error_tablet_ids(error_tablet_ids); } else { if (!config::disable_auto_compaction && !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { - for (int i = 0; i < succ_tablet_ids.size(); i++) { + for (auto [tablet_id, _] : succ_tablets) { TabletSharedPtr tablet = - StorageEngine::instance()->tablet_manager()->get_tablet( - succ_tablet_ids[i]); + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); if (tablet != nullptr) { tablet->publised_count++; if (tablet->publised_count % 10 == 0) { StorageEngine::instance()->submit_compaction_task( tablet, CompactionType::CUMULATIVE_COMPACTION, true); - LOG(INFO) << "trigger compaction succ, tabletid:" << succ_tablet_ids[i] + LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id << ", publised:" << tablet->publised_count; } } else { - LOG(WARNING) - << "trigger compaction failed, tabletid:" << succ_tablet_ids[i]; + LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id; } } } @@ -1611,7 +1609,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { LOG_INFO("successfully publish version") .tag("signature", agent_task_req.signature) .tag("transaction_id", publish_version_req.transaction_id) - .tag("tablets_num", succ_tablet_ids.size()) + .tag("tablets_num", succ_tablets.size()) .tag("cost(s)", cost_second); } @@ -1620,7 +1618,9 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_report_version(_s_report_version); - finish_task_request.__set_error_tablet_ids(error_tablet_ids); + finish_task_request.__set_succ_tablets(succ_tablets); + finish_task_request.__set_error_tablet_ids( + std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end())); _finish_task(finish_task_request); _remove_task_info(agent_task_req.task_type, agent_task_req.signature); diff --git a/be/src/common/status.h b/be/src/common/status.h index e49bffa70f..14c81c6e2c 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -306,6 +306,7 @@ constexpr bool capture_stacktrace(int code) { && code != ErrorCode::INVERTED_INDEX_BUILD_WAITTING && code != ErrorCode::META_KEY_NOT_FOUND && code != ErrorCode::PUSH_VERSION_ALREADY_EXIST + && code != ErrorCode::VERSION_NOT_EXIST && code != ErrorCode::TABLE_ALREADY_DELETED_ERROR && code != ErrorCode::TRANSACTION_NOT_EXIST && code != ErrorCode::TRANSACTION_ALREADY_VISIBLE diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 104e549f6a..301c463cbb 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -490,7 +490,7 @@ Status DataDir::load() { PendingPublishInfoPB pending_publish_info_pb; bool parsed = pending_publish_info_pb.ParseFromString(info); if (!parsed) { - LOG(WARNING) << "parse pending publish info failed, tablt_id: " << tablet_id + LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id << " publish_version: " << publish_version; } StorageEngine::instance()->add_async_publish_task( diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 1353e66915..a56430fbb1 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -69,22 +69,17 @@ void TabletPublishStatistics::record_in_bvar() { } EnginePublishVersionTask::EnginePublishVersionTask( - const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids, - std::vector<TTabletId>* succ_tablet_ids, + const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids, + std::map<TTabletId, TVersion>* succ_tablets, std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets) : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids), - _succ_tablet_ids(succ_tablet_ids), + _succ_tablets(succ_tablets), _discontinuous_version_tablets(discontinuous_version_tablets) {} void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) { std::lock_guard<std::mutex> lck(_tablet_ids_mutex); - _error_tablet_ids->push_back(tablet_id); -} - -void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) { - std::lock_guard<std::mutex> lck(_tablet_ids_mutex); - _succ_tablet_ids->push_back(tablet_id); + _error_tablet_ids->insert(tablet_id); } Status EnginePublishVersionTask::finish() { @@ -126,7 +121,7 @@ Status EnginePublishVersionTask::finish() { // and receive fe's publish version task // this be must return as an error tablet if (rowset == nullptr) { - _error_tablet_ids->push_back(tablet_info.tablet_id); + add_error_tablet_id(tablet_info.tablet_id); res = Status::Error<PUSH_ROWSET_NOT_FOUND>( "could not find related rowset for tablet {}, txn id {}", tablet_info.tablet_id, transaction_id); @@ -135,7 +130,7 @@ Status EnginePublishVersionTask::finish() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( tablet_info.tablet_id, tablet_info.tablet_uid); if (tablet == nullptr) { - _error_tablet_ids->push_back(tablet_info.tablet_id); + add_error_tablet_id(tablet_info.tablet_id); res = Status::Error<PUSH_TABLE_NOT_EXIST>( "can't get tablet when publish version. tablet_id={}", tablet_info.tablet_id); @@ -199,6 +194,7 @@ Status EnginePublishVersionTask::finish() { } token->wait(); + _succ_tablets->clear(); // check if the related tablet remained all have the version for (auto& par_ver_info : _publish_version_req.partition_version_infos) { int64_t partition_id = par_ver_info.partition_id; @@ -209,18 +205,36 @@ Status EnginePublishVersionTask::finish() { Version version(par_ver_info.version, par_ver_info.version); for (auto& tablet_info : partition_related_tablet_infos) { - // has to use strict mode to check if check all tablets - if (!_publish_version_req.strict_mode) { - break; - } TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id); + auto tablet_id = tablet_info.tablet_id; if (tablet == nullptr) { - add_error_tablet_id(tablet_info.tablet_id); + add_error_tablet_id(tablet_id); + _succ_tablets->erase(tablet_id); + LOG(WARNING) << "publish version failed on transaction, not found tablet. " + << "transaction_id=" << transaction_id << ", tablet_id=" << tablet_id + << ", version=" << par_ver_info.version; } else { // check if the version exist, if not exist, then set publish failed - if (!tablet->check_version_exist(version)) { - add_error_tablet_id(tablet_info.tablet_id); + if (_error_tablet_ids->find(tablet_id) == _error_tablet_ids->end()) { + if (tablet->check_version_exist(version)) { + // it's better to report the max continous succ version, + // but it maybe time cost now. + // current just report 0 + (*_succ_tablets)[tablet_id] = 0; + } else { + add_error_tablet_id(tablet_id); + if (res.ok()) { + res = Status::Error<VERSION_NOT_EXIST>( + "tablet {} not exists version {}", tablet_id, + par_ver_info.version); + } + LOG(WARNING) << "publish version failed on transaction, tablet version not " + "exists. " + << "transaction_id=" << transaction_id + << ", tablet_id=" << tablet_id + << ", version=" << par_ver_info.version; + } } } } @@ -280,9 +294,7 @@ void TabletPublishTxnTask::handle() { _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); return; } - _engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id); int64_t cost_us = MonotonicMicros() - _stats.submit_time_us; - // print stats if publish cost > 500ms g_tablet_publish_latency << cost_us; _stats.record_in_bvar(); LOG(INFO) << "publish version successfully on tablet" diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 8acf8099ca..0a270c93d2 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -23,7 +23,9 @@ #include <atomic> #include <condition_variable> +#include <map> #include <mutex> +#include <set> #include <vector> #include "common/status.h" @@ -83,23 +85,22 @@ private: class EnginePublishVersionTask : public EngineTask { public: EnginePublishVersionTask( - const TPublishVersionRequest& publish_version_req, vector<TTabletId>* error_tablet_ids, - std::vector<TTabletId>* succ_tablet_ids, + const TPublishVersionRequest& publish_version_req, + std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets, std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets); ~EnginePublishVersionTask() {} virtual Status finish() override; void add_error_tablet_id(int64_t tablet_id); - void add_succ_tablet_id(int64_t tablet_id); int64_t finish_task(); private: const TPublishVersionRequest& _publish_version_req; std::mutex _tablet_ids_mutex; - vector<TTabletId>* _error_tablet_ids; - vector<TTabletId>* _succ_tablet_ids; + std::set<TTabletId>* _error_tablet_ids; + std::map<TTabletId, TVersion>* _succ_tablets; std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 4e031c0dac..23118647cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -67,6 +67,7 @@ import org.apache.thrift.TException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public class MasterImpl { @@ -465,6 +466,10 @@ public class MasterImpl { } private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) { + Map<Long, Long> succTablets = null; + if (request.isSetSuccTablets()) { + succTablets = request.getSuccTablets(); + } List<Long> errorTabletIds = null; if (request.isSetErrorTabletIds()) { errorTabletIds = request.getErrorTabletIds(); @@ -478,6 +483,7 @@ public class MasterImpl { } PublishVersionTask publishVersionTask = (PublishVersionTask) task; + publishVersionTask.setSuccTablets(succTablets); publishVersionTask.addErrorTablets(errorTabletIds); publishVersionTask.setFinished(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java index 1ce9d866b8..8461b1db4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class PublishVersionTask extends AgentTask { private static final Logger LOG = LogManager.getLogger(PublishVersionTask.class); @@ -34,11 +35,15 @@ public class PublishVersionTask extends AgentTask { private List<TPartitionVersionInfo> partitionVersionInfos; private List<Long> errorTablets; + // tabletId => version, current version = 0 + private Map<Long, Long> succTablets; + public PublishVersionTask(long backendId, long transactionId, long dbId, List<TPartitionVersionInfo> partitionVersionInfos, long createTime) { super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime); this.transactionId = transactionId; this.partitionVersionInfos = partitionVersionInfos; + this.succTablets = null; this.errorTablets = new ArrayList<Long>(); this.isFinished = false; } @@ -57,6 +62,14 @@ public class PublishVersionTask extends AgentTask { return partitionVersionInfos; } + public Map<Long, Long> getSuccTablets() { + return succTablets; + } + + public void setSuccTablets(Map<Long, Long> succTablets) { + this.succTablets = succTablets; + } + public synchronized List<Long> getErrorTablets() { return errorTablets; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 8397499ded..7974cb6a89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -54,6 +54,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.ClearTransactionTask; +import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TUniqueId; import com.google.common.annotations.VisibleForTesting; @@ -868,7 +869,7 @@ public class DatabaseTransactionMgr { } } - public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) throws UserException { + public void finishTransaction(long transactionId) throws UserException { TransactionState transactionState = null; readLock(); try { @@ -876,14 +877,10 @@ public class DatabaseTransactionMgr { } finally { readUnlock(); } + // add all commit errors and publish errors to a single set - if (errorReplicaIds == null) { - errorReplicaIds = Sets.newHashSet(); - } - Set<Long> originalErrorReplicas = transactionState.getErrorReplicas(); - if (originalErrorReplicas != null) { - errorReplicaIds.addAll(originalErrorReplicas); - } + Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); + Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks(); long now = System.currentTimeMillis(); long firstPublishOneSuccTime = transactionState.getFirstPublishOneSuccTime(); @@ -980,21 +977,10 @@ public class DatabaseTransactionMgr { tabletWriteFailedReplicas.clear(); tabletVersionFailedReplicas.clear(); for (Replica replica : tablet.getReplicas()) { - if (!errorReplicaIds.contains(replica.getId())) { - if (replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) { - tabletSuccReplicas.add(replica); - } else { - tabletVersionFailedReplicas.add(replica); - } - } else if (replica.getVersion() >= partitionCommitInfo.getVersion()) { - // the replica's version is larger than or equal to current transaction - // partition's version the replica is normal, then remove it from error replica ids - // TODO(cmy): actually I have no idea why we need this check - tabletSuccReplicas.add(replica); - errorReplicaIds.remove(replica.getId()); - } else { - tabletWriteFailedReplicas.add(replica); - } + checkReplicaContinuousVersionSucc(tablet.getId(), replica, + partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()), + errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas, + tabletVersionFailedReplicas); } int healthReplicaNum = tabletSuccReplicas.size(); @@ -1005,7 +991,7 @@ public class DatabaseTransactionMgr { LOG.info("publish version quorum succ for transaction {} on tablet {} with version" + " {}, and has failed replicas, quorum num {}. table {}, partition {}," + " tablet detail: {}", - transactionState, tablet, partitionCommitInfo.getVersion(), + transactionState, tablet.getId(), partitionCommitInfo.getVersion(), quorumReplicaNum, tableId, partitionId, writeDetail); } continue; @@ -1033,8 +1019,8 @@ public class DatabaseTransactionMgr { LOG.info("publish version timeout succ for transaction {} on tablet {} with version" + " {}, and has failed replicas, quorum num {}. table {}, partition {}," + " tablet detail: {}", - transactionState, tablet, partitionCommitInfo.getVersion(), quorumReplicaNum, - tableId, partitionId, writeDetail); + transactionState, tablet.getId(), partitionCommitInfo.getVersion(), + quorumReplicaNum, tableId, partitionId, writeDetail); } else { publishResult = PublishResult.FAILED; String errMsg = String.format("publish on tablet %d failed." @@ -1046,8 +1032,8 @@ public class DatabaseTransactionMgr { LOG.info("publish version failed for transaction {} on tablet {} with version" + " {}, and has failed replicas, quorum num {}. table {}, partition {}," + " tablet detail: {}", - transactionState, tablet, partitionCommitInfo.getVersion(), quorumReplicaNum, - tableId, partitionId, writeDetail); + transactionState, tablet.getId(), partitionCommitInfo.getVersion(), + quorumReplicaNum, tableId, partitionId, writeDetail); } } } @@ -1093,6 +1079,43 @@ public class DatabaseTransactionMgr { LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name()); } + private void checkReplicaContinuousVersionSucc(long tabletId, Replica replica, long version, + PublishVersionTask backendPublishTask, Set<Long> errorReplicaIds, List<Replica> tabletSuccReplicas, + List<Replica> tabletWriteFailedReplicas, List<Replica> tabletVersionFailedReplicas) { + if (backendPublishTask == null || !backendPublishTask.isFinished()) { + errorReplicaIds.add(replica.getId()); + } else { + Map<Long, Long> backendSuccTablets = backendPublishTask.getSuccTablets(); + // new doris BE will report succ tablets + if (backendSuccTablets != null) { + if (backendSuccTablets.containsKey(tabletId)) { + errorReplicaIds.remove(replica.getId()); + } else { + errorReplicaIds.add(replica.getId()); + } + } else { + // for compatibility, old doris BE report only error tablets + List<Long> backendErrorTablets = backendPublishTask.getErrorTablets(); + if (backendErrorTablets != null && backendErrorTablets.contains(tabletId)) { + errorReplicaIds.add(replica.getId()); + } + } + } + + if (!errorReplicaIds.contains(replica.getId())) { + if (replica.checkVersionCatchUp(version - 1, true)) { + tabletSuccReplicas.add(replica); + } else { + tabletVersionFailedReplicas.add(replica); + } + } else if (replica.getVersion() >= version) { + tabletSuccReplicas.add(replica); + errorReplicaIds.remove(replica.getId()); + } else { + tabletWriteFailedReplicas.add(replica); + } + } + protected void unprotectedPreCommitTransaction2PC(TransactionState transactionState, Set<Long> errorReplicaIds, Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends, Database db) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 1ad8d2deb8..22a019c4c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -402,13 +402,13 @@ public class GlobalTransactionMgr implements Writable { /** * if the table is deleted between commit and publish version, then should ignore the partition * + * @param dbId * @param transactionId - * @param errorReplicaIds * @return */ - public void finishTransaction(long dbId, long transactionId, Set<Long> errorReplicaIds) throws UserException { + public void finishTransaction(long dbId, long transactionId) throws UserException { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); - dbTransactionMgr.finishTransaction(transactionId, errorReplicaIds); + dbTransactionMgr.finishTransaction(transactionId); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 4cb3ee5e9e..33ea8de07e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -17,18 +17,11 @@ package org.apache.doris.transaction; -import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MaterializedIndex; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Tablet; -import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; @@ -36,14 +29,12 @@ import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TTaskType; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; public class PublishVersionDaemon extends MasterDaemon { @@ -63,15 +54,6 @@ public class PublishVersionDaemon extends MasterDaemon { } } - private boolean isAllBackendsOfUnfinishedTasksDead(List<PublishVersionTask> unfinishedTasks) { - for (PublishVersionTask unfinishedTask : unfinishedTasks) { - if (Env.getCurrentSystemInfo().checkBackendAlive(unfinishedTask.getBackendId())) { - return false; - } - } - return true; - } - private void publishVersion() { GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr(); List<TransactionState> readyTransactionStates = globalTransactionMgr.getReadyToPublishTransactions(); @@ -81,7 +63,8 @@ public class PublishVersionDaemon extends MasterDaemon { // ATTN, we publish transaction state to all backends including dead backend, if not publish to dead backend // then transaction manager will treat it as success - List<Long> allBackends = Env.getCurrentSystemInfo().getAllBackendIds(false); + SystemInfoService infoService = Env.getCurrentSystemInfo(); + List<Long> allBackends = infoService.getAllBackendIds(false); if (allBackends.isEmpty()) { LOG.warn("some transaction state need to publish, but no backend exists"); return; @@ -138,109 +121,17 @@ public class PublishVersionDaemon extends MasterDaemon { AgentTaskExecutor.submit(batchTask); } - TabletInvertedIndex tabletInvertedIndex = Env.getCurrentInvertedIndex(); // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { - Map<Long, PublishVersionTask> transTasks = transactionState.getPublishVersionTasks(); - Set<Long> publishErrorReplicaIds = Sets.newHashSet(); - List<PublishVersionTask> unfinishedTasks = Lists.newArrayList(); - for (PublishVersionTask publishVersionTask : transTasks.values()) { - if (publishVersionTask.isFinished()) { - // sometimes backend finish publish version task, - // but it maybe failed to change transactionid to version for some tablets - // and it will upload the failed tabletinfo to fe and fe will deal with them - List<Long> errorTablets = publishVersionTask.getErrorTablets(); - if (errorTablets == null || errorTablets.isEmpty()) { - continue; - } else { - for (long tabletId : errorTablets) { - // tablet inverted index also contains rollingup index - // if tablet meta not contains the tablet, skip this tablet because this tablet is dropped - // from fe - if (tabletInvertedIndex.getTabletMeta(tabletId) == null) { - continue; - } - Replica replica = tabletInvertedIndex.getReplica( - tabletId, publishVersionTask.getBackendId()); - if (replica != null) { - publishErrorReplicaIds.add(replica.getId()); - } else { - LOG.info("could not find related replica with tabletid={}, backendid={}", - tabletId, publishVersionTask.getBackendId()); - } - } - } - } else { - unfinishedTasks.add(publishVersionTask); - } - } - - boolean shouldFinishTxn = false; - if (!unfinishedTasks.isEmpty()) { - shouldFinishTxn = isAllBackendsOfUnfinishedTasksDead(unfinishedTasks); - if (transactionState.isPublishTimeout() || shouldFinishTxn) { - // transaction's publish is timeout, but there still has unfinished tasks. - // we need to collect all error replicas, and try to finish this txn. - for (PublishVersionTask unfinishedTask : unfinishedTasks) { - // set all replicas in the backend to error state - List<TPartitionVersionInfo> versionInfos = unfinishedTask.getPartitionVersionInfos(); - Set<Long> errorPartitionIds = Sets.newHashSet(); - for (TPartitionVersionInfo versionInfo : versionInfos) { - errorPartitionIds.add(versionInfo.getPartitionId()); - } - if (errorPartitionIds.isEmpty()) { - continue; - } - - Database db = Env.getCurrentInternalCatalog() - .getDbNullable(transactionState.getDbId()); - if (db == null) { - LOG.warn("Database [{}] has been dropped.", transactionState.getDbId()); - continue; - } - - for (long tableId : transactionState.getTableIdList()) { - Table table = db.getTableNullable(tableId); - if (table == null || table.getType() != Table.TableType.OLAP) { - LOG.warn("Table [{}] in database [{}] has been dropped.", tableId, db.getFullName()); - continue; - } - OlapTable olapTable = (OlapTable) table; - olapTable.readLock(); - try { - for (Long errorPartitionId : errorPartitionIds) { - Partition partition = olapTable.getPartition(errorPartitionId); - if (partition != null) { - List<MaterializedIndex> materializedIndexList - = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); - for (MaterializedIndex materializedIndex : materializedIndexList) { - for (Tablet tablet : materializedIndex.getTablets()) { - Replica replica = tablet.getReplicaByBackendId( - unfinishedTask.getBackendId()); - if (replica != null) { - publishErrorReplicaIds.add(replica.getId()); - } - } - } - } - } - } finally { - olapTable.readUnlock(); - } - } - } - shouldFinishTxn = true; - } - } else { - // all publish tasks are finished, try to finish this txn. - shouldFinishTxn = true; - } + boolean hasBackendAliveAndUnfinishTask = transactionState.getPublishVersionTasks().values().stream() + .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask || transactionState.isPublishTimeout(); if (shouldFinishTxn) { try { // one transaction exception should not affect other transaction globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId(), publishErrorReplicaIds); + transactionState.getTransactionId()); } catch (Exception e) { LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); } @@ -248,8 +139,7 @@ public class PublishVersionDaemon extends MasterDaemon { // if finish transaction state failed, then update publish version time, should check // to finish after some interval transactionState.updateSendTaskTime(); - LOG.debug("publish version for transaction {} failed, has {} error replicas during publish", - transactionState, publishErrorReplicaIds.size()); + LOG.debug("publish version for transaction {} failed", transactionState); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 0e89e4c86e..b988650fa2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -29,6 +29,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.meta.MetaContext; +import org.apache.doris.task.PublishVersionTask; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -56,7 +57,17 @@ public class DatabaseTransactionMgrTest { private static Env slaveEnv; private static Map<String, Long> LabelToTxnId; - private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + + public static void setTransactionFinishPublish(TransactionState transactionState, List<Long> backendIds) { + for (long backendId : backendIds) { + PublishVersionTask task = new PublishVersionTask(backendId, transactionState.getTransactionId(), + transactionState.getDbId(), null, System.currentTimeMillis()); + task.setFinished(true); + transactionState.addPublishVersionTask(backendId, task); + } + } @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, @@ -100,7 +111,11 @@ public class DatabaseTransactionMgrTest { Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1, null); + TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1); + setTransactionFinishPublish(transactionState1, + Lists.newArrayList(CatalogTestUtil.testBackendId1, + CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1); labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1); TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1"); @@ -120,8 +135,6 @@ public class DatabaseTransactionMgrTest { labelToTxnId.put(CatalogTestUtil.testTxnLabel3, transactionId3); labelToTxnId.put(CatalogTestUtil.testTxnLabel4, transactionId4); - TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1); - FakeEnv.setEnv(slaveEnv); slaveTransMgr.replayUpsertTransactionState(transactionState1); return labelToTxnId; diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 953ea3d879..a819c4f030 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -55,7 +55,6 @@ import org.apache.doris.transaction.TransactionState.TxnSourceType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import mockit.Injectable; import mockit.Mocked; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -66,10 +65,8 @@ import org.junit.Test; import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; - public class GlobalTransactionMgrTest { private static FakeEditLog fakeEditLog; @@ -467,9 +464,12 @@ public class GlobalTransactionMgrTest { TransactionState transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); slaveTransMgr.replayUpsertTransactionState(transactionState); - Set<Long> errorReplicaIds = Sets.newHashSet(); - errorReplicaIds.add(CatalogTestUtil.testReplicaId1); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, errorReplicaIds); + DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState, + Lists.newArrayList(CatalogTestUtil.testBackendId1, + CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); + transactionState.getPublishVersionTasks() + .get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); // check replica version @@ -524,9 +524,13 @@ public class GlobalTransactionMgrTest { // master finish the transaction failed FakeEnv.setEnv(masterEnv); - Set<Long> errorReplicaIds = Sets.newHashSet(); - errorReplicaIds.add(CatalogTestUtil.testReplicaId2); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, errorReplicaIds); + DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState, + Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2)); + + // backend2 publish failed + transactionState.getPublishVersionTasks() + .get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1); Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2); @@ -540,8 +544,12 @@ public class GlobalTransactionMgrTest { Assert.assertEquals(-1, replica2.getLastFailedVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica3.getLastFailedVersion()); - errorReplicaIds = Sets.newHashSet(); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, errorReplicaIds); + // backend2 publish success + Map<Long, Long> backend2SuccTablets = Maps.newHashMap(); + backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L); + transactionState.getPublishVersionTasks() + .get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica2.getVersion()); @@ -603,8 +611,10 @@ public class GlobalTransactionMgrTest { Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv)); // master finish the transaction2 - errorReplicaIds = Sets.newHashSet(); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2, errorReplicaIds); + DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState, + Lists.newArrayList(CatalogTestUtil.testBackendId1, + CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica1.getVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica2.getVersion()); diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 3eda812e1b..dedc454d33 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -65,6 +65,7 @@ struct TFinishTaskRequest { 14: optional list<Types.TTabletId> downloaded_tablet_ids 15: optional i64 copy_size 16: optional i64 copy_time_ms + 17: optional map<Types.TTabletId, Types.TVersion> succ_tablets } struct TTablet { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org