This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 08c9cba6394 [improvement](transaction) reduce publish txn fail log #28277 (#35060) 08c9cba6394 is described below commit 08c9cba6394b42b35bbe12c8a5fcab7eb02cb16d Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Tue Jun 11 14:46:03 2024 +0800 [improvement](transaction) reduce publish txn fail log #28277 (#35060) cherry pick from #28277 --- .../main/java/org/apache/doris/common/Config.java | 4 + .../doris/transaction/DatabaseTransactionMgr.java | 317 ++++++++++++--------- .../apache/doris/transaction/TransactionState.java | 19 ++ 3 files changed, 202 insertions(+), 138 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c219a6f9656..4968231afd4 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -466,6 +466,10 @@ public class Config extends ConfigBase { + "then the load task will be successful." }) public static int publish_wait_time_second = 300; + @ConfField(mutable = true, masterOnly = true, description = {"单个事务 publish 失败打日志间隔", + "print log interval for publish transaction failed interval"}) + public static long publish_fail_log_interval_second = 5 * 60; + @ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。" + "该参数仅用于事务型 insert 操作中。", "Maximal waiting time for all data inserted before one transaction to be committed, in seconds. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index f5e77563545..36b32f01069 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -919,19 +919,8 @@ public class DatabaseTransactionMgr { // add all commit errors and publish errors to a single set Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); - Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks(); - - long now = System.currentTimeMillis(); - long firstPublishVersionTime = transactionState.getFirstPublishVersionTime(); - boolean allowPublishOneSucc = false; - if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0 - && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) { - allowPublishOneSucc = true; - } - List<Replica> tabletSuccReplicas = Lists.newArrayList(); - List<Replica> tabletWriteFailedReplicas = Lists.newArrayList(); - List<Replica> tabletVersionFailedReplicas = Lists.newArrayList(); + List<Pair<OlapTable, Partition>> relatedTblPartitions = Lists.newArrayList(); // case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are // already force dropped, we just ignore the transaction with all tables been force dropped. @@ -946,133 +935,12 @@ public class DatabaseTransactionMgr { LOG.debug("finish transaction {} with tables {}", transactionId, tableIdList); List<? extends TableIf> tableList = db.getTablesOnIdOrderIfExist(tableIdList); tableList = MetaLockUtils.writeLockTablesIfExist(tableList); - PublishResult publishResult = PublishResult.QUORUM_SUCC; + PublishResult publishResult; try { - Iterator<TableCommitInfo> tableCommitInfoIterator - = transactionState.getIdToTableCommitInfos().values().iterator(); - while (tableCommitInfoIterator.hasNext()) { - TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next(); - long tableId = tableCommitInfo.getTableId(); - OlapTable table = (OlapTable) db.getTableNullable(tableId); - // table maybe dropped between commit and publish, ignore this error - if (table == null) { - tableCommitInfoIterator.remove(); - LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}", - tableId, - transactionState); - continue; - } - PartitionInfo partitionInfo = table.getPartitionInfo(); - Iterator<PartitionCommitInfo> partitionCommitInfoIterator - = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator(); - while (partitionCommitInfoIterator.hasNext()) { - PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next(); - long partitionId = partitionCommitInfo.getPartitionId(); - Partition partition = table.getPartition(partitionId); - // partition maybe dropped between commit and publish version, ignore this error - if (partition == null) { - partitionCommitInfoIterator.remove(); - LOG.warn("partition {} is dropped, skip version check" - + " and remove it from transaction state {}", partitionId, transactionState); - continue; - } - if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) { - LOG.debug("transactionId {} partition commitInfo version {} is not equal with " - + "partition visible version {} plus one, need wait", - transactionId, - partitionCommitInfo.getVersion(), - partition.getVisibleVersion()); - String errMsg = String.format("wait for publishing partition %d version %d." - + " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1, - partitionCommitInfo.getVersion(), tableId); - transactionState.setErrorMsg(errMsg); - return; - } - int quorumReplicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1; - - List<MaterializedIndex> allIndices; - if (transactionState.getLoadedTblIndexes().isEmpty()) { - allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); - } else { - allIndices = Lists.newArrayList(); - for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) { - MaterializedIndex index = partition.getIndex(indexId); - if (index != null) { - allIndices.add(index); - } - } - } - - // check success replica number for each tablet. - // a success replica means: - // 1. Not in errorReplicaIds: succeed in both commit and publish phase - // 2. last failed version < 0: is a health replica before - // 3. version catch up: not with a stale version - // Here we only check number, the replica version will be updated in updateCatalogAfterVisible() - for (MaterializedIndex index : allIndices) { - for (Tablet tablet : index.getTablets()) { - tabletSuccReplicas.clear(); - tabletWriteFailedReplicas.clear(); - tabletVersionFailedReplicas.clear(); - for (Replica replica : tablet.getReplicas()) { - checkReplicaContinuousVersionSucc(tablet.getId(), replica, - partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()), - errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas, - tabletVersionFailedReplicas); - } - - int healthReplicaNum = tabletSuccReplicas.size(); - if (healthReplicaNum >= quorumReplicaNum) { - if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) { - String writeDetail = getTabletWriteDetail(tabletSuccReplicas, - tabletWriteFailedReplicas, tabletVersionFailedReplicas); - LOG.info("publish version quorum succ for transaction {} on tablet {} with version" - + " {}, and has failed replicas, quorum num {}. table {}, partition {}," - + " tablet detail: {}", - transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - quorumReplicaNum, tableId, partitionId, writeDetail); - } - continue; - } - - String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, - tabletVersionFailedReplicas); - if (allowPublishOneSucc && healthReplicaNum > 0) { - if (publishResult == PublishResult.QUORUM_SUCC) { - publishResult = PublishResult.TIMEOUT_SUCC; - } - // We can not do any thing except retrying, - // because publish task is assigned a version, - // and doris does not permit discontinuous - // versions. - // - // If a timeout happens, it means that the rowset - // that are being publised exists on a few replicas we should go - // ahead, otherwise data may be lost and thre - // publish task hangs forever. - LOG.info("publish version timeout succ for transaction {} on tablet {} with version" - + " {}, and has failed replicas, quorum num {}. table {}, partition {}," - + " tablet detail: {}", - transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - quorumReplicaNum, tableId, partitionId, writeDetail); - } else { - publishResult = PublishResult.FAILED; - String errMsg = String.format("publish on tablet %d failed." - + " succeed replica num %d less than quorum %d." - + " table: %d, partition: %d, publish version: %d", - tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId, - partitionId, partition.getVisibleVersion() + 1); - transactionState.setErrorMsg(errMsg); - LOG.info("publish version failed for transaction {} on tablet {} with version" - + " {}, and has failed replicas, quorum num {}. table {}, partition {}," - + " tablet detail: {}", - transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - quorumReplicaNum, tableId, partitionId, writeDetail); - } - } - } - } + if (!finishCheckPartitionVersion(transactionState, db, relatedTblPartitions)) { + return; } + publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds); if (publishResult == PublishResult.FAILED) { return; } @@ -1107,7 +975,180 @@ public class DatabaseTransactionMgr { // Otherwise, there is no way for stream load to query the result right after loading finished, // even if we call "sync" before querying. transactionState.countdownVisibleLatch(); - LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name()); + LOG.info("finish transaction {} successfully, publish times {}, publish result {}", + transactionState, transactionState.getPublishCount(), publishResult.name()); + } + + private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db, + List<Pair<OlapTable, Partition>> relatedTblPartitions) { + Iterator<TableCommitInfo> tableCommitInfoIterator + = transactionState.getIdToTableCommitInfos().values().iterator(); + while (tableCommitInfoIterator.hasNext()) { + TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next(); + long tableId = tableCommitInfo.getTableId(); + OlapTable table = (OlapTable) db.getTableNullable(tableId); + // table maybe dropped between commit and publish, ignore this error + if (table == null) { + tableCommitInfoIterator.remove(); + LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}", + tableId, + transactionState); + continue; + } + + Iterator<PartitionCommitInfo> partitionCommitInfoIterator + = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator(); + while (partitionCommitInfoIterator.hasNext()) { + PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next(); + long partitionId = partitionCommitInfo.getPartitionId(); + Partition partition = table.getPartition(partitionId); + // partition maybe dropped between commit and publish version, ignore this error + if (partition == null) { + partitionCommitInfoIterator.remove(); + LOG.warn("partition {} is dropped, skip version check" + + " and remove it from transaction state {}", partitionId, transactionState); + continue; + } + if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) { + LOG.debug("for table {} partition {}, transactionId {} partition commitInfo version {} is not" + + " equal with partition visible version {} plus one, need wait", + table.getId(), partition.getId(), transactionState.getTransactionId(), + partitionCommitInfo.getVersion(), partition.getVisibleVersion()); + String errMsg = String.format("wait for publishing partition %d version %d." + + " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1, + partitionCommitInfo.getVersion(), tableId); + transactionState.setErrorMsg(errMsg); + return false; + } + + relatedTblPartitions.add(Pair.of(table, partition)); + } + } + + return true; + } + + private PublishResult finishCheckQuorumReplicas(TransactionState transactionState, + List<Pair<OlapTable, Partition>> relatedTblPartitions, + Set<Long> errorReplicaIds) { + long now = System.currentTimeMillis(); + long firstPublishVersionTime = transactionState.getFirstPublishVersionTime(); + boolean allowPublishOneSucc = false; + if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0 + && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) { + allowPublishOneSucc = true; + } + + List<Replica> tabletSuccReplicas = Lists.newArrayList(); + List<Replica> tabletWriteFailedReplicas = Lists.newArrayList(); + List<Replica> tabletVersionFailedReplicas = Lists.newArrayList(); + List<String> logs = Lists.newArrayList(); + + Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks(); + PublishResult publishResult = PublishResult.QUORUM_SUCC; + for (Pair<OlapTable, Partition> pair : relatedTblPartitions) { + OlapTable table = pair.key(); + Partition partition = pair.value(); + long tableId = table.getId(); + long partitionId = partition.getId(); + long newVersion = partition.getVisibleVersion() + 1; + int loadRequiredReplicaNum = table.getPartitionInfo() + .getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1; + List<MaterializedIndex> allIndices; + if (transactionState.getLoadedTblIndexes().isEmpty()) { + allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); + } else { + allIndices = Lists.newArrayList(); + for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) { + MaterializedIndex index = partition.getIndex(indexId); + if (index != null) { + allIndices.add(index); + } + } + } + + // check success replica number for each tablet. + // a success replica means: + // 1. Not in errorReplicaIds: succeed in both commit and publish phase + // 2. last failed version < 0: is a health replica before + // 3. version catch up: not with a stale version + // Here we only check number, the replica version will be updated in updateCatalogAfterVisible() + for (MaterializedIndex index : allIndices) { + for (Tablet tablet : index.getTablets()) { + tabletSuccReplicas.clear(); + tabletWriteFailedReplicas.clear(); + tabletVersionFailedReplicas.clear(); + for (Replica replica : tablet.getReplicas()) { + checkReplicaContinuousVersionSucc(tablet.getId(), replica, + newVersion, publishTasks.get(replica.getBackendId()), + errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas, + tabletVersionFailedReplicas); + } + + int healthReplicaNum = tabletSuccReplicas.size(); + if (healthReplicaNum >= loadRequiredReplicaNum) { + boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty() + || !tabletVersionFailedReplicas.isEmpty(); + if (hasFailedReplica) { + String writeDetail = getTabletWriteDetail(tabletSuccReplicas, + tabletWriteFailedReplicas, tabletVersionFailedReplicas); + logs.add(String.format("publish version quorum succ for transaction %s on tablet %s" + + " with version %s, and has failed replicas, load require replica num %s. " + + "table %s, partition %s, tablet detail: %s", + transactionState, tablet.getId(), newVersion, + loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + } + continue; + } + + String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, + tabletVersionFailedReplicas); + if (allowPublishOneSucc && healthReplicaNum > 0) { + if (publishResult == PublishResult.QUORUM_SUCC) { + publishResult = PublishResult.TIMEOUT_SUCC; + } + // We can not do any thing except retrying, + // because publish task is assigned a version, + // and doris does not permit discontinuous + // versions. + // + // If a timeout happens, it means that the rowset + // that are being publised exists on a few replicas we should go + // ahead, otherwise data may be lost and thre + // publish task hangs forever. + logs.add(String.format("publish version timeout succ for transaction %s on tablet %s " + + "with version %s, and has failed replicas, load require replica num %s. " + + "table %s, partition %s, tablet detail: %s", + transactionState, tablet.getId(), newVersion, + loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + } else { + publishResult = PublishResult.FAILED; + String errMsg = String.format("publish on tablet %d failed." + + " succeed replica num %d < load required replica num %d." + + " table: %d, partition: %d, publish version: %d", + tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId, + partitionId, newVersion); + transactionState.setErrorMsg(errMsg); + logs.add(String.format("publish version failed for transaction %s on tablet %s with version" + + " %s, and has failed replicas, load required replica num %s. table %s, " + + "partition %s, tablet detail: %s", + transactionState, tablet.getId(), newVersion, + loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + } + } + } + } + + boolean needLog = publishResult != PublishResult.FAILED + || now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L; + if (needLog) { + transactionState.setLastPublishLogTime(now); + for (String log : logs) { + LOG.info("{}. publish times {}", log, transactionState.getPublishCount()); + } + } + + return publishResult; } private void checkReplicaContinuousVersionSucc(long tabletId, Replica replica, long version, diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 5eed8c655c9..f9a094eceb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -230,6 +230,12 @@ public class TransactionState implements Writable { private long lastPublishVersionTime = -1; + private long publishCount = 0; + + // txn may try finish many times and generate a lot of log. + // use lastPublishLogTime to reduce log. + private long lastPublishLogTime = 0; + @SerializedName(value = "callbackId") private long callbackId = -1; @@ -347,6 +353,7 @@ public class TransactionState implements Writable { } public void updateSendTaskTime() { + this.publishCount++; this.lastPublishVersionTime = System.currentTimeMillis(); if (this.firstPublishVersionTime <= 0) { this.firstPublishVersionTime = lastPublishVersionTime; @@ -361,6 +368,10 @@ public class TransactionState implements Writable { return this.lastPublishVersionTime; } + public long getPublishCount() { + return publishCount; + } + public boolean hasSendTask() { return this.hasSendTask; } @@ -429,6 +440,14 @@ public class TransactionState implements Writable { return errorLogUrl; } + public long getLastPublishLogTime() { + return lastPublishLogTime; + } + + public void setLastPublishLogTime(long lastPublishLogTime) { + this.lastPublishLogTime = lastPublishLogTime; + } + public void setTransactionStatus(TransactionStatus transactionStatus) { // status changed this.preStatus = this.transactionStatus; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org