This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a2a98701c6 [improvement](log) add txn log (#28875)
9a2a98701c6 is described below

commit 9a2a98701c6b16847963094fc8b241d5679ce996
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Sun Jan 14 10:27:35 2024 +0800

    [improvement](log) add txn log (#28875)
---
 be/src/olap/task/engine_publish_version_task.cpp   | 11 ++++--
 be/src/olap/txn_manager.cpp                        | 41 ++++++++++++++++-----
 .../doris/transaction/DatabaseTransactionMgr.java  | 42 +++++++++++++++++-----
 3 files changed, 73 insertions(+), 21 deletions(-)

diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 78f5cb9c328..ce5ba534fd1 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -94,13 +94,16 @@ Status EnginePublishVersionTask::execute() {
     VLOG_NOTICE << "begin to process publish version. transaction_id=" << 
transaction_id;
     DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", {
         if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-            LOG_WARNING("EnginePublishVersionTask.finish.random random 
failed");
+            LOG_WARNING("EnginePublishVersionTask.finish.random random failed")
+                    .tag("txn_id", transaction_id);
             return Status::InternalError("debug engine publish version task 
random failed");
         }
     });
     DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", {
         if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-            LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait 
ms", wait);
+            LOG_WARNING("EnginePublishVersionTask.finish.wait wait")
+                    .tag("txn_id", transaction_id)
+                    .tag("wait ms", wait);
             std::this_thread::sleep_for(std::chrono::milliseconds(wait));
         }
     });
@@ -204,7 +207,9 @@ Status EnginePublishVersionTask::execute() {
                                     partition_id, tablet_info.tablet_id, 
version.first);
                         }
                         res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
-                                "check_version_exist failed");
+                                "version not continuous for mow, tablet_id={}, 
"
+                                "tablet_max_version={}, txn_version={}",
+                                tablet_info.tablet_id, max_version, 
version.first);
                         int64_t missed_version = max_version + 1;
                         int64_t missed_txn_id =
                                 
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 161c39fdd20..7a82ef4c6ae 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -142,13 +142,18 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, 
TTransactionId transac
 
     DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
         if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-            LOG_WARNING("TxnManager.prepare_txn.random_failed random failed");
+            LOG_WARNING("TxnManager.prepare_txn.random_failed random failed")
+                    .tag("txn_id", transaction_id)
+                    .tag("tablet_id", tablet_id);
             return Status::InternalError("debug prepare txn random failed");
         }
     });
     DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
         if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-            LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait);
+            LOG_WARNING("TxnManager.prepare_txn.wait")
+                    .tag("txn_id", transaction_id)
+                    .tag("tablet_id", tablet_id)
+                    .tag("wait ms", wait);
             std::this_thread::sleep_for(std::chrono::milliseconds(wait));
         }
     });
@@ -313,13 +318,18 @@ Status TxnManager::commit_txn(OlapMeta* meta, 
TPartitionId partition_id,
 
     DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
         if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-            LOG_WARNING("TxnManager.commit_txn.random_failed");
+            LOG_WARNING("TxnManager.commit_txn.random_failed")
+                    .tag("txn_id", transaction_id)
+                    .tag("tablet_id", tablet_id);
             return Status::InternalError("debug commit txn random failed");
         }
     });
     DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
         if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-            LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait);
+            LOG_WARNING("TxnManager.commit_txn.wait")
+                    .tag("txn_id", transaction_id)
+                    .tag("tablet_id", tablet_id)
+                    .tag("wait ms", wait);
             std::this_thread::sleep_for(std::chrono::milliseconds(wait));
         }
     });
@@ -388,7 +398,10 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId 
partition_id,
                                         
rowset_ptr->rowset_meta()->get_rowset_pb(), false);
         DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
             if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-                
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
+                LOG_WARNING("TxnManager.RowsetMetaManager.save_wait")
+                        .tag("txn_id", transaction_id)
+                        .tag("tablet_id", tablet_id)
+                        .tag("wait ms", wait);
                 std::this_thread::sleep_for(std::chrono::milliseconds(wait));
             }
         });
@@ -466,13 +479,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
     }
     
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
         if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-            
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta");
+            
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta")
+                    .tag("txn_id", transaction_id)
+                    .tag("tablet_id", tablet_id);
             return Status::InternalError("debug publish txn before save rs 
meta random failed");
         }
     });
     DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
         if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-            
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms", 
wait);
+            LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta")
+                    .tag("txn_id", transaction_id)
+                    .tag("tablet_id", tablet_id)
+                    .tag("wait ms", wait);
             std::this_thread::sleep_for(std::chrono::milliseconds(wait));
         }
     });
@@ -486,13 +504,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
 
     DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", 
{
         if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-            
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta");
+            
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta")
+                    .tag("txn_id", transaction_id)
+                    .tag("tablet_id", tablet_id);
             return Status::InternalError("debug publish txn after save rs meta 
random failed");
         }
     });
     DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
         if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-            
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms", 
wait);
+            LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta")
+                    .tag("txn_id", transaction_id)
+                    .tag("tablet_id", tablet_id)
+                    .tag("wait ms", wait);
             std::this_thread::sleep_for(std::chrono::milliseconds(wait));
         }
     });
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9b611425cc9..57088cbd7a8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -589,9 +589,11 @@ public class DatabaseTransactionMgr {
                                     tabletVersionFailedReplicas);
 
                             String errMsg = String.format("Failed to commit 
txn %s, cause tablet %s succ replica num %s"
-                                    + " < load required replica num %s. table 
%s, partition %s, this tablet detail: %s",
+                                    + " < load required replica num %s. table 
%s, partition: [ id=%s, commit version %s"
+                                    + ", visible version %s ], this tablet 
detail: %s",
                                     transactionId, tablet.getId(), 
successReplicaNum, loadRequiredReplicaNum, tableId,
-                                    partition.getId(), writeDetail);
+                                    partition.getId(), 
partition.getCommittedVersion(), partition.getVisibleVersion(),
+                                    writeDetail);
                             LOG.info(errMsg);
 
                             throw new 
TabletQuorumFailedException(transactionId, errMsg);
@@ -746,7 +748,7 @@ public class DatabaseTransactionMgr {
         }
 
         // update nextVersion because of the failure of persistent transaction 
resulting in error version
-        updateCatalogAfterCommitted(transactionState, db);
+        updateCatalogAfterCommitted(transactionState, db, false);
         LOG.info("transaction:[{}] successfully committed", transactionState);
     }
 
@@ -1128,9 +1130,9 @@ public class DatabaseTransactionMgr {
                                     tabletWriteFailedReplicas, 
tabletVersionFailedReplicas);
                             logs.add(String.format("publish version quorum 
succ for transaction %s on tablet %s"
                                     + " with version %s, and has failed 
replicas, load require replica num %s. "
-                                    + "table %s, partition %s, tablet detail: 
%s",
-                                    transactionState, tablet.getId(), 
newVersion,
-                                    loadRequiredReplicaNum, tableId, 
partitionId, writeDetail));
+                                    + "table %s, partition: [ id=%s, commit 
version=%s ], tablet detail: %s",
+                                    transactionState, tablet.getId(), 
newVersion, loadRequiredReplicaNum, tableId,
+                                    partitionId, 
partition.getCommittedVersion(), writeDetail));
                         }
                         continue;
                     }
@@ -1178,7 +1180,8 @@ public class DatabaseTransactionMgr {
         if (needLog) {
             transactionState.setLastPublishLogTime(now);
             for (String log : logs) {
-                LOG.info("{}. publish times {}", log, 
transactionState.getPublishCount());
+                LOG.info("{}. publish times {}, whole txn publish result {}",
+                        log, transactionState.getPublishCount(), 
publishResult.name());
             }
         }
 
@@ -1793,8 +1796,10 @@ public class DatabaseTransactionMgr {
         }
     }
 
-    private void updateCatalogAfterCommitted(TransactionState 
transactionState, Database db) {
+    private void updateCatalogAfterCommitted(TransactionState 
transactionState, Database db, boolean isReplay) {
         Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
+        List<Replica> tabletSuccReplicas = Lists.newArrayList();
+        List<Replica> tabletFailedReplicas = Lists.newArrayList();
         for (TableCommitInfo tableCommitInfo : 
transactionState.getIdToTableCommitInfos().values()) {
             long tableId = tableCommitInfo.getTableId();
             OlapTable table = (OlapTable) db.getTableNullable(tableId);
@@ -1817,13 +1822,32 @@ public class DatabaseTransactionMgr {
                 for (MaterializedIndex index : allIndices) {
                     List<Tablet> tablets = index.getTablets();
                     for (Tablet tablet : tablets) {
+                        tabletFailedReplicas.clear();
+                        tabletSuccReplicas.clear();
                         for (Replica replica : tablet.getReplicas()) {
                             if (errorReplicaIds.contains(replica.getId())) {
                                 // TODO(cmy): do we need to update last failed 
version here?
                                 // because in updateCatalogAfterVisible, it 
will be updated again.
                                 
replica.updateLastFailedVersion(partitionCommitInfo.getVersion());
+                                tabletFailedReplicas.add(replica);
+                            } else {
+                                tabletSuccReplicas.add(replica);
                             }
                         }
+                        if (!isReplay && !tabletFailedReplicas.isEmpty()) {
+                            LOG.info("some replicas load data failed for 
committed txn {} on version {}, table {}, "
+                                    + "partition {}, tablet {}, {} replicas 
load data succ: {}, {} replicas load "
+                                    + "data fail: {}",
+                                    transactionState.getTransactionId(), 
partitionCommitInfo.getVersion(),
+                                    tableId, partitionId, tablet.getId(), 
tabletSuccReplicas.size(),
+                                    Joiner.on(", 
").join(tabletSuccReplicas.stream()
+                                            .map(replica -> 
replica.toStringSimple(true))
+                                            .collect(Collectors.toList())),
+                                    tabletFailedReplicas.size(),
+                                    Joiner.on(", 
").join(tabletFailedReplicas.stream()
+                                            .map(replica -> 
replica.toStringSimple(true))
+                                            .collect(Collectors.toList())));
+                        }
                     }
                 }
                 partition.setNextVersion(partition.getNextVersion() + 1);
@@ -2036,7 +2060,7 @@ public class DatabaseTransactionMgr {
             // set transaction status will call txn state change listener
             transactionState.replaySetTransactionStatus();
             if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
-                updateCatalogAfterCommitted(transactionState, db);
+                updateCatalogAfterCommitted(transactionState, db, true);
             } else if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
                 updateCatalogAfterVisible(transactionState, db);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to