This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 08c9cba6394 [improvement](transaction) reduce publish txn fail log 
#28277 (#35060)
08c9cba6394 is described below

commit 08c9cba6394b42b35bbe12c8a5fcab7eb02cb16d
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Tue Jun 11 14:46:03 2024 +0800

    [improvement](transaction) reduce publish txn fail log #28277 (#35060)
    
    cherry pick from #28277
---
 .../main/java/org/apache/doris/common/Config.java  |   4 +
 .../doris/transaction/DatabaseTransactionMgr.java  | 317 ++++++++++++---------
 .../apache/doris/transaction/TransactionState.java |  19 ++
 3 files changed, 202 insertions(+), 138 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c219a6f9656..4968231afd4 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -466,6 +466,10 @@ public class Config extends ConfigBase {
             + "then the load task will be successful." })
     public static int publish_wait_time_second = 300;
 
+    @ConfField(mutable = true, masterOnly = true, description = {"单个事务 publish 
失败打日志间隔",
+            "print log interval for publish transaction failed interval"})
+    public static long publish_fail_log_interval_second = 5 * 60;
+
     @ConfField(mutable = true, masterOnly = true, description = 
{"提交事务的最大超时时间,单位是秒。"
             + "该参数仅用于事务型 insert 操作中。",
             "Maximal waiting time for all data inserted before one transaction 
to be committed, in seconds. "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index f5e77563545..36b32f01069 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -919,19 +919,8 @@ public class DatabaseTransactionMgr {
 
         // add all commit errors and publish errors to a single set
         Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
-        Map<Long, PublishVersionTask> publishTasks = 
transactionState.getPublishVersionTasks();
-
-        long now = System.currentTimeMillis();
-        long firstPublishVersionTime = 
transactionState.getFirstPublishVersionTime();
-        boolean allowPublishOneSucc = false;
-        if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
-                && now >= firstPublishVersionTime + 
Config.publish_wait_time_second * 1000L) {
-            allowPublishOneSucc = true;
-        }
 
-        List<Replica> tabletSuccReplicas = Lists.newArrayList();
-        List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
-        List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+        List<Pair<OlapTable, Partition>> relatedTblPartitions = 
Lists.newArrayList();
 
         // case 1 If database is dropped, then we just throw 
MetaNotFoundException, because all related tables are
         // already force dropped, we just ignore the transaction with all 
tables been force dropped.
@@ -946,133 +935,12 @@ public class DatabaseTransactionMgr {
         LOG.debug("finish transaction {} with tables {}", transactionId, 
tableIdList);
         List<? extends TableIf> tableList = 
db.getTablesOnIdOrderIfExist(tableIdList);
         tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
-        PublishResult publishResult = PublishResult.QUORUM_SUCC;
+        PublishResult publishResult;
         try {
-            Iterator<TableCommitInfo> tableCommitInfoIterator
-                    = 
transactionState.getIdToTableCommitInfos().values().iterator();
-            while (tableCommitInfoIterator.hasNext()) {
-                TableCommitInfo tableCommitInfo = 
tableCommitInfoIterator.next();
-                long tableId = tableCommitInfo.getTableId();
-                OlapTable table = (OlapTable) db.getTableNullable(tableId);
-                // table maybe dropped between commit and publish, ignore this 
error
-                if (table == null) {
-                    tableCommitInfoIterator.remove();
-                    LOG.warn("table {} is dropped, skip version check and 
remove it from transaction state {}",
-                            tableId,
-                            transactionState);
-                    continue;
-                }
-                PartitionInfo partitionInfo = table.getPartitionInfo();
-                Iterator<PartitionCommitInfo> partitionCommitInfoIterator
-                        = 
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
-                while (partitionCommitInfoIterator.hasNext()) {
-                    PartitionCommitInfo partitionCommitInfo = 
partitionCommitInfoIterator.next();
-                    long partitionId = partitionCommitInfo.getPartitionId();
-                    Partition partition = table.getPartition(partitionId);
-                    // partition maybe dropped between commit and publish 
version, ignore this error
-                    if (partition == null) {
-                        partitionCommitInfoIterator.remove();
-                        LOG.warn("partition {} is dropped, skip version check"
-                                        + " and remove it from transaction 
state {}", partitionId, transactionState);
-                        continue;
-                    }
-                    if (partition.getVisibleVersion() != 
partitionCommitInfo.getVersion() - 1) {
-                        LOG.debug("transactionId {} partition commitInfo 
version {} is not equal with "
-                                        + "partition visible version {} plus 
one, need wait",
-                                transactionId,
-                                partitionCommitInfo.getVersion(),
-                                partition.getVisibleVersion());
-                        String errMsg = String.format("wait for publishing 
partition %d version %d."
-                                        + " self version: %d. table %d", 
partitionId, partition.getVisibleVersion() + 1,
-                                partitionCommitInfo.getVersion(), tableId);
-                        transactionState.setErrorMsg(errMsg);
-                        return;
-                    }
-                    int quorumReplicaNum = 
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
-
-                    List<MaterializedIndex> allIndices;
-                    if (transactionState.getLoadedTblIndexes().isEmpty()) {
-                        allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
-                    } else {
-                        allIndices = Lists.newArrayList();
-                        for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
-                            MaterializedIndex index = 
partition.getIndex(indexId);
-                            if (index != null) {
-                                allIndices.add(index);
-                            }
-                        }
-                    }
-
-                    // check success replica number for each tablet.
-                    // a success replica means:
-                    //  1. Not in errorReplicaIds: succeed in both commit and 
publish phase
-                    //  2. last failed version < 0: is a health replica before
-                    //  3. version catch up: not with a stale version
-                    // Here we only check number, the replica version will be 
updated in updateCatalogAfterVisible()
-                    for (MaterializedIndex index : allIndices) {
-                        for (Tablet tablet : index.getTablets()) {
-                            tabletSuccReplicas.clear();
-                            tabletWriteFailedReplicas.clear();
-                            tabletVersionFailedReplicas.clear();
-                            for (Replica replica : tablet.getReplicas()) {
-                                
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
-                                        partitionCommitInfo.getVersion(), 
publishTasks.get(replica.getBackendId()),
-                                        errorReplicaIds, tabletSuccReplicas, 
tabletWriteFailedReplicas,
-                                        tabletVersionFailedReplicas);
-                            }
-
-                            int healthReplicaNum = tabletSuccReplicas.size();
-                            if (healthReplicaNum >= quorumReplicaNum) {
-                                if (!tabletWriteFailedReplicas.isEmpty() || 
!tabletVersionFailedReplicas.isEmpty()) {
-                                    String writeDetail = 
getTabletWriteDetail(tabletSuccReplicas,
-                                            tabletWriteFailedReplicas, 
tabletVersionFailedReplicas);
-                                    LOG.info("publish version quorum succ for 
transaction {} on tablet {} with version"
-                                            + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
-                                            + " tablet detail: {}",
-                                            transactionState, tablet.getId(), 
partitionCommitInfo.getVersion(),
-                                            quorumReplicaNum, tableId, 
partitionId, writeDetail);
-                                }
-                                continue;
-                            }
-
-                            String writeDetail = 
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
-                                    tabletVersionFailedReplicas);
-                            if (allowPublishOneSucc && healthReplicaNum > 0) {
-                                if (publishResult == 
PublishResult.QUORUM_SUCC) {
-                                    publishResult = PublishResult.TIMEOUT_SUCC;
-                                }
-                                // We can not do any thing except retrying,
-                                // because publish task is assigned a version,
-                                // and doris does not permit discontinuous
-                                // versions.
-                                //
-                                // If a timeout happens, it means that the 
rowset
-                                // that are being publised exists on a few 
replicas we should go
-                                // ahead, otherwise data may be lost and thre
-                                // publish task hangs forever.
-                                LOG.info("publish version timeout succ for 
transaction {} on tablet {} with version"
-                                        + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
-                                        + " tablet detail: {}",
-                                        transactionState, tablet.getId(), 
partitionCommitInfo.getVersion(),
-                                        quorumReplicaNum, tableId, 
partitionId, writeDetail);
-                            } else {
-                                publishResult = PublishResult.FAILED;
-                                String errMsg = String.format("publish on 
tablet %d failed."
-                                                + " succeed replica num %d 
less than quorum %d."
-                                                + " table: %d, partition: %d, 
publish version: %d",
-                                        tablet.getId(), healthReplicaNum, 
quorumReplicaNum, tableId,
-                                        partitionId, 
partition.getVisibleVersion() + 1);
-                                transactionState.setErrorMsg(errMsg);
-                                LOG.info("publish version failed for 
transaction {} on tablet {} with version"
-                                        + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
-                                        + " tablet detail: {}",
-                                        transactionState, tablet.getId(), 
partitionCommitInfo.getVersion(),
-                                        quorumReplicaNum, tableId, 
partitionId, writeDetail);
-                            }
-                        }
-                    }
-                }
+            if (!finishCheckPartitionVersion(transactionState, db, 
relatedTblPartitions)) {
+                return;
             }
+            publishResult = finishCheckQuorumReplicas(transactionState, 
relatedTblPartitions, errorReplicaIds);
             if (publishResult == PublishResult.FAILED) {
                 return;
             }
@@ -1107,7 +975,180 @@ public class DatabaseTransactionMgr {
         // Otherwise, there is no way for stream load to query the result 
right after loading finished,
         // even if we call "sync" before querying.
         transactionState.countdownVisibleLatch();
-        LOG.info("finish transaction {} successfully, publish result: {}", 
transactionState, publishResult.name());
+        LOG.info("finish transaction {} successfully, publish times {}, 
publish result {}",
+                transactionState, transactionState.getPublishCount(), 
publishResult.name());
+    }
+
+    private boolean finishCheckPartitionVersion(TransactionState 
transactionState, Database db,
+            List<Pair<OlapTable, Partition>> relatedTblPartitions) {
+        Iterator<TableCommitInfo> tableCommitInfoIterator
+                = 
transactionState.getIdToTableCommitInfos().values().iterator();
+        while (tableCommitInfoIterator.hasNext()) {
+            TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+            long tableId = tableCommitInfo.getTableId();
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            // table maybe dropped between commit and publish, ignore this 
error
+            if (table == null) {
+                tableCommitInfoIterator.remove();
+                LOG.warn("table {} is dropped, skip version check and remove 
it from transaction state {}",
+                        tableId,
+                        transactionState);
+                continue;
+            }
+
+            Iterator<PartitionCommitInfo> partitionCommitInfoIterator
+                    = 
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+            while (partitionCommitInfoIterator.hasNext()) {
+                PartitionCommitInfo partitionCommitInfo = 
partitionCommitInfoIterator.next();
+                long partitionId = partitionCommitInfo.getPartitionId();
+                Partition partition = table.getPartition(partitionId);
+                // partition maybe dropped between commit and publish version, 
ignore this error
+                if (partition == null) {
+                    partitionCommitInfoIterator.remove();
+                    LOG.warn("partition {} is dropped, skip version check"
+                                    + " and remove it from transaction state 
{}", partitionId, transactionState);
+                    continue;
+                }
+                if (partition.getVisibleVersion() != 
partitionCommitInfo.getVersion() - 1) {
+                    LOG.debug("for table {} partition {}, transactionId {} 
partition commitInfo version {} is not"
+                            + " equal with partition visible version {} plus 
one, need wait",
+                            table.getId(), partition.getId(), 
transactionState.getTransactionId(),
+                            partitionCommitInfo.getVersion(), 
partition.getVisibleVersion());
+                    String errMsg = String.format("wait for publishing 
partition %d version %d."
+                                    + " self version: %d. table %d", 
partitionId, partition.getVisibleVersion() + 1,
+                            partitionCommitInfo.getVersion(), tableId);
+                    transactionState.setErrorMsg(errMsg);
+                    return false;
+                }
+
+                relatedTblPartitions.add(Pair.of(table, partition));
+            }
+        }
+
+        return true;
+    }
+
+    private PublishResult finishCheckQuorumReplicas(TransactionState 
transactionState,
+            List<Pair<OlapTable, Partition>> relatedTblPartitions,
+            Set<Long> errorReplicaIds) {
+        long now = System.currentTimeMillis();
+        long firstPublishVersionTime = 
transactionState.getFirstPublishVersionTime();
+        boolean allowPublishOneSucc = false;
+        if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
+                && now >= firstPublishVersionTime + 
Config.publish_wait_time_second * 1000L) {
+            allowPublishOneSucc = true;
+        }
+
+        List<Replica> tabletSuccReplicas = Lists.newArrayList();
+        List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+        List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+        List<String> logs = Lists.newArrayList();
+
+        Map<Long, PublishVersionTask> publishTasks = 
transactionState.getPublishVersionTasks();
+        PublishResult publishResult = PublishResult.QUORUM_SUCC;
+        for (Pair<OlapTable, Partition> pair : relatedTblPartitions) {
+            OlapTable table = pair.key();
+            Partition partition = pair.value();
+            long tableId = table.getId();
+            long partitionId = partition.getId();
+            long newVersion = partition.getVisibleVersion() + 1;
+            int loadRequiredReplicaNum = table.getPartitionInfo()
+                    .getReplicaAllocation(partitionId).getTotalReplicaNum() / 
2 + 1;
+            List<MaterializedIndex> allIndices;
+            if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+            } else {
+                allIndices = Lists.newArrayList();
+                for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
+                    MaterializedIndex index = partition.getIndex(indexId);
+                    if (index != null) {
+                        allIndices.add(index);
+                    }
+                }
+            }
+
+            // check success replica number for each tablet.
+            // a success replica means:
+            //  1. Not in errorReplicaIds: succeed in both commit and publish 
phase
+            //  2. last failed version < 0: is a health replica before
+            //  3. version catch up: not with a stale version
+            // Here we only check number, the replica version will be updated 
in updateCatalogAfterVisible()
+            for (MaterializedIndex index : allIndices) {
+                for (Tablet tablet : index.getTablets()) {
+                    tabletSuccReplicas.clear();
+                    tabletWriteFailedReplicas.clear();
+                    tabletVersionFailedReplicas.clear();
+                    for (Replica replica : tablet.getReplicas()) {
+                        checkReplicaContinuousVersionSucc(tablet.getId(), 
replica,
+                                newVersion, 
publishTasks.get(replica.getBackendId()),
+                                errorReplicaIds, tabletSuccReplicas, 
tabletWriteFailedReplicas,
+                                tabletVersionFailedReplicas);
+                    }
+
+                    int healthReplicaNum = tabletSuccReplicas.size();
+                    if (healthReplicaNum >= loadRequiredReplicaNum) {
+                        boolean hasFailedReplica = 
!tabletWriteFailedReplicas.isEmpty()
+                                || !tabletVersionFailedReplicas.isEmpty();
+                        if (hasFailedReplica) {
+                            String writeDetail = 
getTabletWriteDetail(tabletSuccReplicas,
+                                    tabletWriteFailedReplicas, 
tabletVersionFailedReplicas);
+                            logs.add(String.format("publish version quorum 
succ for transaction %s on tablet %s"
+                                    + " with version %s, and has failed 
replicas, load require replica num %s. "
+                                    + "table %s, partition %s, tablet detail: 
%s",
+                                    transactionState, tablet.getId(), 
newVersion,
+                                    loadRequiredReplicaNum, tableId, 
partitionId, writeDetail));
+                        }
+                        continue;
+                    }
+
+                    String writeDetail = 
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
+                            tabletVersionFailedReplicas);
+                    if (allowPublishOneSucc && healthReplicaNum > 0) {
+                        if (publishResult == PublishResult.QUORUM_SUCC) {
+                            publishResult = PublishResult.TIMEOUT_SUCC;
+                        }
+                        // We can not do any thing except retrying,
+                        // because publish task is assigned a version,
+                        // and doris does not permit discontinuous
+                        // versions.
+                        //
+                        // If a timeout happens, it means that the rowset
+                        // that are being publised exists on a few replicas we 
should go
+                        // ahead, otherwise data may be lost and thre
+                        // publish task hangs forever.
+                        logs.add(String.format("publish version timeout succ 
for transaction %s on tablet %s "
+                                + "with version %s, and has failed replicas, 
load require replica num %s. "
+                                + "table %s, partition %s, tablet detail: %s",
+                                transactionState, tablet.getId(), newVersion,
+                                loadRequiredReplicaNum, tableId, partitionId, 
writeDetail));
+                    } else {
+                        publishResult = PublishResult.FAILED;
+                        String errMsg = String.format("publish on tablet %d 
failed."
+                                        + " succeed replica num %d < load 
required replica num %d."
+                                        + " table: %d, partition: %d, publish 
version: %d",
+                                tablet.getId(), healthReplicaNum, 
loadRequiredReplicaNum, tableId,
+                                partitionId, newVersion);
+                        transactionState.setErrorMsg(errMsg);
+                        logs.add(String.format("publish version failed for 
transaction %s on tablet %s with version"
+                                + " %s, and has failed replicas, load required 
replica num %s. table %s, "
+                                + "partition %s, tablet detail: %s",
+                                transactionState, tablet.getId(), newVersion,
+                                loadRequiredReplicaNum, tableId, partitionId, 
writeDetail));
+                    }
+                }
+            }
+        }
+
+        boolean needLog = publishResult != PublishResult.FAILED
+                || now - transactionState.getLastPublishLogTime() > 
Config.publish_fail_log_interval_second * 1000L;
+        if (needLog) {
+            transactionState.setLastPublishLogTime(now);
+            for (String log : logs) {
+                LOG.info("{}. publish times {}", log, 
transactionState.getPublishCount());
+            }
+        }
+
+        return publishResult;
     }
 
     private void checkReplicaContinuousVersionSucc(long tabletId, Replica 
replica, long version,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 5eed8c655c9..f9a094eceb9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -230,6 +230,12 @@ public class TransactionState implements Writable {
 
     private long lastPublishVersionTime = -1;
 
+    private long publishCount = 0;
+
+    // txn may try finish many times and generate a lot of log.
+    // use lastPublishLogTime to reduce log.
+    private long lastPublishLogTime = 0;
+
     @SerializedName(value = "callbackId")
     private long callbackId = -1;
 
@@ -347,6 +353,7 @@ public class TransactionState implements Writable {
     }
 
     public void updateSendTaskTime() {
+        this.publishCount++;
         this.lastPublishVersionTime = System.currentTimeMillis();
         if (this.firstPublishVersionTime <= 0) {
             this.firstPublishVersionTime = lastPublishVersionTime;
@@ -361,6 +368,10 @@ public class TransactionState implements Writable {
         return this.lastPublishVersionTime;
     }
 
+    public long getPublishCount() {
+        return publishCount;
+    }
+
     public boolean hasSendTask() {
         return this.hasSendTask;
     }
@@ -429,6 +440,14 @@ public class TransactionState implements Writable {
         return errorLogUrl;
     }
 
+    public long getLastPublishLogTime() {
+        return lastPublishLogTime;
+    }
+
+    public void setLastPublishLogTime(long lastPublishLogTime) {
+        this.lastPublishLogTime = lastPublishLogTime;
+    }
+
     public void setTransactionStatus(TransactionStatus transactionStatus) {
         // status changed
         this.preStatus = this.transactionStatus;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to