This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9a2a98701c6 [improvement](log) add txn log (#28875) 9a2a98701c6 is described below commit 9a2a98701c6b16847963094fc8b241d5679ce996 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Sun Jan 14 10:27:35 2024 +0800 [improvement](log) add txn log (#28875) --- be/src/olap/task/engine_publish_version_task.cpp | 11 ++++-- be/src/olap/txn_manager.cpp | 41 ++++++++++++++++----- .../doris/transaction/DatabaseTransactionMgr.java | 42 +++++++++++++++++----- 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 78f5cb9c328..ce5ba534fd1 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -94,13 +94,16 @@ Status EnginePublishVersionTask::execute() { VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id; DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("EnginePublishVersionTask.finish.random random failed"); + LOG_WARNING("EnginePublishVersionTask.finish.random random failed") + .tag("txn_id", transaction_id); return Status::InternalError("debug engine publish version task random failed"); } }); DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", { if (auto wait = dp->param<int>("duration", 0); wait > 0) { - LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait ms", wait); + LOG_WARNING("EnginePublishVersionTask.finish.wait wait") + .tag("txn_id", transaction_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -204,7 +207,9 @@ Status EnginePublishVersionTask::execute() { partition_id, tablet_info.tablet_id, version.first); } res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>( - "check_version_exist failed"); + "version not continuous for mow, tablet_id={}, " + "tablet_max_version={}, txn_version={}", + tablet_info.tablet_id, max_version, version.first); int64_t missed_version = max_version + 1; int64_t missed_txn_id = StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version( diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 161c39fdd20..7a82ef4c6ae 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -142,13 +142,18 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("TxnManager.prepare_txn.random_failed random failed"); + LOG_WARNING("TxnManager.prepare_txn.random_failed random failed") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id); return Status::InternalError("debug prepare txn random failed"); } }); DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", { if (auto wait = dp->param<int>("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait); + LOG_WARNING("TxnManager.prepare_txn.wait") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -313,13 +318,18 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("TxnManager.commit_txn.random_failed"); + LOG_WARNING("TxnManager.commit_txn.random_failed") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id); return Status::InternalError("debug commit txn random failed"); } }); DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", { if (auto wait = dp->param<int>("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait); + LOG_WARNING("TxnManager.commit_txn.wait") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -388,7 +398,10 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, rowset_ptr->rowset_meta()->get_rowset_pb(), false); DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", { if (auto wait = dp->param<int>("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait); + LOG_WARNING("TxnManager.RowsetMetaManager.save_wait") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -466,13 +479,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, } DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta"); + LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id); return Status::InternalError("debug publish txn before save rs meta random failed"); } }); DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", { if (auto wait = dp->param<int>("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms", wait); + LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -486,13 +504,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta"); + LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id); return Status::InternalError("debug publish txn after save rs meta random failed"); } }); DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", { if (auto wait = dp->param<int>("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms", wait); + LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 9b611425cc9..57088cbd7a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -589,9 +589,11 @@ public class DatabaseTransactionMgr { tabletVersionFailedReplicas); String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica num %s" - + " < load required replica num %s. table %s, partition %s, this tablet detail: %s", + + " < load required replica num %s. table %s, partition: [ id=%s, commit version %s" + + ", visible version %s ], this tablet detail: %s", transactionId, tablet.getId(), successReplicaNum, loadRequiredReplicaNum, tableId, - partition.getId(), writeDetail); + partition.getId(), partition.getCommittedVersion(), partition.getVisibleVersion(), + writeDetail); LOG.info(errMsg); throw new TabletQuorumFailedException(transactionId, errMsg); @@ -746,7 +748,7 @@ public class DatabaseTransactionMgr { } // update nextVersion because of the failure of persistent transaction resulting in error version - updateCatalogAfterCommitted(transactionState, db); + updateCatalogAfterCommitted(transactionState, db, false); LOG.info("transaction:[{}] successfully committed", transactionState); } @@ -1128,9 +1130,9 @@ public class DatabaseTransactionMgr { tabletWriteFailedReplicas, tabletVersionFailedReplicas); logs.add(String.format("publish version quorum succ for transaction %s on tablet %s" + " with version %s, and has failed replicas, load require replica num %s. " - + "table %s, partition %s, tablet detail: %s", - transactionState, tablet.getId(), newVersion, - loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + + "table %s, partition: [ id=%s, commit version=%s ], tablet detail: %s", + transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, + partitionId, partition.getCommittedVersion(), writeDetail)); } continue; } @@ -1178,7 +1180,8 @@ public class DatabaseTransactionMgr { if (needLog) { transactionState.setLastPublishLogTime(now); for (String log : logs) { - LOG.info("{}. publish times {}", log, transactionState.getPublishCount()); + LOG.info("{}. publish times {}, whole txn publish result {}", + log, transactionState.getPublishCount(), publishResult.name()); } } @@ -1793,8 +1796,10 @@ public class DatabaseTransactionMgr { } } - private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) { + private void updateCatalogAfterCommitted(TransactionState transactionState, Database db, boolean isReplay) { Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); + List<Replica> tabletSuccReplicas = Lists.newArrayList(); + List<Replica> tabletFailedReplicas = Lists.newArrayList(); for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { long tableId = tableCommitInfo.getTableId(); OlapTable table = (OlapTable) db.getTableNullable(tableId); @@ -1817,13 +1822,32 @@ public class DatabaseTransactionMgr { for (MaterializedIndex index : allIndices) { List<Tablet> tablets = index.getTablets(); for (Tablet tablet : tablets) { + tabletFailedReplicas.clear(); + tabletSuccReplicas.clear(); for (Replica replica : tablet.getReplicas()) { if (errorReplicaIds.contains(replica.getId())) { // TODO(cmy): do we need to update last failed version here? // because in updateCatalogAfterVisible, it will be updated again. replica.updateLastFailedVersion(partitionCommitInfo.getVersion()); + tabletFailedReplicas.add(replica); + } else { + tabletSuccReplicas.add(replica); } } + if (!isReplay && !tabletFailedReplicas.isEmpty()) { + LOG.info("some replicas load data failed for committed txn {} on version {}, table {}, " + + "partition {}, tablet {}, {} replicas load data succ: {}, {} replicas load " + + "data fail: {}", + transactionState.getTransactionId(), partitionCommitInfo.getVersion(), + tableId, partitionId, tablet.getId(), tabletSuccReplicas.size(), + Joiner.on(", ").join(tabletSuccReplicas.stream() + .map(replica -> replica.toStringSimple(true)) + .collect(Collectors.toList())), + tabletFailedReplicas.size(), + Joiner.on(", ").join(tabletFailedReplicas.stream() + .map(replica -> replica.toStringSimple(true)) + .collect(Collectors.toList()))); + } } } partition.setNextVersion(partition.getNextVersion() + 1); @@ -2036,7 +2060,7 @@ public class DatabaseTransactionMgr { // set transaction status will call txn state change listener transactionState.replaySetTransactionStatus(); if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { - updateCatalogAfterCommitted(transactionState, db); + updateCatalogAfterCommitted(transactionState, db, true); } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { updateCatalogAfterVisible(transactionState, db); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org