This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8a0d940914e [fix](publish) Pick Fix publish failed because because task is null (#37546) 8a0d940914e is described below commit 8a0d940914ee8fd41acfe5ce08061fabc77da40a Author: meiyi <myime...@gmail.com> AuthorDate: Thu Jul 11 15:22:04 2024 +0800 [fix](publish) Pick Fix publish failed because because task is null (#37546) ## Proposed changes Pick https://github.com/apache/doris/pull/37531 This pr catch the exception to make the failed txn does not block the other txns. --- .../doris/transaction/PublishVersionDaemon.java | 114 +++++++++++++-------- 1 file changed, 69 insertions(+), 45 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 22ca57f2399..a1861fb7f4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -91,6 +91,13 @@ public class PublishVersionDaemon extends MasterDaemon { LOG.warn("some transaction state need to publish, but no backend exists"); return; } + traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, allBackends); + tryFinishTxn(readyTransactionStates, infoService, globalTransactionMgr, partitionVisibleVersions, + backendPartitions); + } + + private void traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState> readyTransactionStates, + List<Long> allBackends) { long createPublishVersionTaskTime = System.currentTimeMillis(); // every backend-transaction identified a single task AgentBatchTask batchTask = new AgentBatchTask(); @@ -153,60 +160,77 @@ public class PublishVersionDaemon extends MasterDaemon { if (!batchTask.getAllTasks().isEmpty()) { AgentTaskExecutor.submit(batchTask); } + } - Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap(); + private void tryFinishTxn(List<TransactionState> readyTransactionStates, + SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr, + Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) { // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { - Stream<PublishVersionTask> publishVersionTaskStream = transactionState - .getPublishVersionTasks() - .values() - .stream() - .peek(task -> { - if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { - Map<Long, Long> tableIdToDeltaNumRows = - task.getTableIdToDeltaNumRows(); - tableIdToDeltaNumRows.forEach((tableId, numRows) -> { - tableIdToTotalDeltaNumRows - .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); - tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); - }); - } - }); - boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream - .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); - transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); + try { + // try to finish the transaction, if failed just retry in next loop + tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions, + backendPartitions); + } catch (Throwable t) { + LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState, + transactionState.getPublishVersionTasks(), t); + } + } // end for readyTransactionStates + } - boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout() - || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); - if (shouldFinishTxn) { - try { - // one transaction exception should not affect other transaction - globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); - } catch (Exception e) { - LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); - } - if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { - // if finish transaction state failed, then update publish version time, should check - // to finish after some interval - transactionState.updateSendTaskTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("publish version for transaction {} failed", transactionState); + private void tryFinishOneTxn(TransactionState transactionState, + SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr, + Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) { + Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap(); + Stream<PublishVersionTask> publishVersionTaskStream = transactionState + .getPublishVersionTasks() + .values() + .stream() + .peek(task -> { + if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { + Map<Long, Long> tableIdToDeltaNumRows = + task.getTableIdToDeltaNumRows(); + tableIdToDeltaNumRows.forEach((tableId, numRows) -> { + tableIdToTotalDeltaNumRows + .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); + tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); + }); } + }); + boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream + .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); + transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); + + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout() + || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); + if (shouldFinishTxn) { + try { + // one transaction exception should not affect other transaction + globalTransactionMgr.finishTransaction(transactionState.getDbId(), + transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); + } catch (Exception e) { + LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); + } + if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { + // if finish transaction state failed, then update publish version time, should check + // to finish after some interval + transactionState.updateSendTaskTime(); + if (LOG.isDebugEnabled()) { + LOG.debug("publish version for transaction {} failed", transactionState); } } + } - if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - transactionState.pruneAfterVisible(); - if (MetricRepo.isInit) { - long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); - MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); - } + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); } - } // end for readyTransactionStates + transactionState.pruneAfterVisible(); + if (MetricRepo.isInit) { + long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); + MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); + } + } } private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState transactionState, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org