This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 7ee271b98fa [fix](publish) Pick Catch exception in genPublishTask to make one failed txn does not block the other txns (#37724) (#38044) 7ee271b98fa is described below commit 7ee271b98fa569389aff4da6d824bb1f094b9ca5 Author: meiyi <myime...@gmail.com> AuthorDate: Thu Jul 18 14:47:32 2024 +0800 [fix](publish) Pick Catch exception in genPublishTask to make one failed txn does not block the other txns (#37724) (#38044) Pick https://github.com/apache/doris/pull/37724 --- .../doris/transaction/PublishVersionDaemon.java | 196 ++++++++++++--------- 1 file changed, 112 insertions(+), 84 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index d5d72869117..b83a0974301 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -73,6 +73,12 @@ public class PublishVersionDaemon extends MasterDaemon { LOG.warn("some transaction state need to publish, but no backend exists"); return; } + traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, allBackends); + tryFinishTxn(readyTransactionStates, infoService, globalTransactionMgr); + } + + private void traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState> readyTransactionStates, + List<Long> allBackends) { long createPublishVersionTaskTime = System.currentTimeMillis(); // every backend-transaction identified a single task AgentBatchTask batchTask = new AgentBatchTask(); @@ -81,99 +87,121 @@ public class PublishVersionDaemon extends MasterDaemon { if (transactionState.hasSendTask()) { continue; } - List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>(); - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values()); - } - List<TPartitionVersionInfo> partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size()); - for (PartitionCommitInfo commitInfo : partitionCommitInfos) { - TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(), - commitInfo.getVersion(), 0); - partitionVersionInfos.add(versionInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("try to publish version info partitionid [{}], version [{}]", - commitInfo.getPartitionId(), - commitInfo.getVersion()); - } - } - Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet(); - // public version tasks are not persisted in catalog, so publishBackends may be empty. - // so we have to try publish to all backends; - if (publishBackends.isEmpty()) { - // could not just add to it, should new a new object, or the back map will destroyed - publishBackends = Sets.newHashSet(); - publishBackends.addAll(allBackends); + try { + genPublishTask(allBackends, transactionState, createPublishVersionTaskTime, batchTask); + } catch (Throwable t) { + LOG.error("errors while generate publish task for transaction: {}", transactionState, t); } - - for (long backendId : publishBackends) { - PublishVersionTask task = new PublishVersionTask(backendId, - transactionState.getTransactionId(), - transactionState.getDbId(), - partitionVersionInfos, - createPublishVersionTaskTime); - // add to AgentTaskQueue for handling finish report. - // not check return value, because the add will success - AgentTaskQueue.addTask(task); - batchTask.addTask(task); - transactionState.addPublishVersionTask(backendId, task); - } - transactionState.setSendedTask(); - LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(), - transactionState.getDbId()); } if (!batchTask.getAllTasks().isEmpty()) { AgentTaskExecutor.submit(batchTask); } + } - Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap(); - // try to finish the transaction, if failed just retry in next loop - for (TransactionState transactionState : readyTransactionStates) { - Stream<PublishVersionTask> publishVersionTaskStream = transactionState - .getPublishVersionTasks() - .values() - .stream() - .peek(task -> { - if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { - Map<Long, Long> tableIdToDeltaNumRows = - task.getTableIdToDeltaNumRows(); - tableIdToDeltaNumRows.forEach((tableId, numRows) -> { - tableIdToTotalDeltaNumRows - .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); - tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); - }); - } - }); - boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream - .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); - transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); - - boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout(); - if (shouldFinishTxn) { - try { - // one transaction exception should not affect other transaction - globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId()); - } catch (Exception e) { - LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); - } - if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { - // if finish transaction state failed, then update publish version time, should check - // to finish after some interval - transactionState.updateSendTaskTime(); - LOG.debug("publish version for transaction {} failed", transactionState); - } + private void genPublishTask(List<Long> allBackends, TransactionState transactionState, + long createPublishVersionTaskTime, AgentBatchTask batchTask) { + List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>(); + for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { + partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values()); + } + List<TPartitionVersionInfo> partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size()); + for (PartitionCommitInfo commitInfo : partitionCommitInfos) { + TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(), + commitInfo.getVersion(), 0); + partitionVersionInfos.add(versionInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("try to publish version info partitionid [{}], version [{}]", + commitInfo.getPartitionId(), + commitInfo.getVersion()); } + } + Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet(); + // public version tasks are not persisted in catalog, so publishBackends may be empty. + // so we have to try publish to all backends; + if (publishBackends.isEmpty()) { + // could not just add to it, should new a new object, or the back map will destroyed + publishBackends = Sets.newHashSet(); + publishBackends.addAll(allBackends); + } - if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - transactionState.pruneAfterVisible(); - if (MetricRepo.isInit) { - long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); - MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); - } + for (long backendId : publishBackends) { + PublishVersionTask task = new PublishVersionTask(backendId, + transactionState.getTransactionId(), + transactionState.getDbId(), + partitionVersionInfos, + createPublishVersionTaskTime); + // add to AgentTaskQueue for handling finish report. + // not check return value, because the add will success + AgentTaskQueue.addTask(task); + batchTask.addTask(task); + transactionState.addPublishVersionTask(backendId, task); + } + transactionState.setSendedTask(); + LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(), + transactionState.getDbId()); + } + + private void tryFinishTxn(List<TransactionState> readyTransactionStates, + SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr) { + for (TransactionState transactionState : readyTransactionStates) { + try { + // try to finish the transaction, if failed just retry in next loop + tryFinishOneTxn(transactionState, infoService, globalTransactionMgr); + } catch (Throwable t) { + LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState, + transactionState.getPublishVersionTasks(), t); } } // end for readyTransactionStates } + + private void tryFinishOneTxn(TransactionState transactionState, SystemInfoService infoService, + GlobalTransactionMgr globalTransactionMgr) { + Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap(); + Stream<PublishVersionTask> publishVersionTaskStream = transactionState + .getPublishVersionTasks() + .values() + .stream() + .peek(task -> { + if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { + Map<Long, Long> tableIdToDeltaNumRows = + task.getTableIdToDeltaNumRows(); + tableIdToDeltaNumRows.forEach((tableId, numRows) -> { + tableIdToTotalDeltaNumRows + .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); + tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); + }); + } + }); + boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream + .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); + transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); + + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout(); + if (shouldFinishTxn) { + try { + // one transaction exception should not affect other transaction + globalTransactionMgr.finishTransaction(transactionState.getDbId(), + transactionState.getTransactionId()); + } catch (Exception e) { + LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); + } + if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { + // if finish transaction state failed, then update publish version time, should check + // to finish after some interval + transactionState.updateSendTaskTime(); + LOG.debug("publish version for transaction {} failed", transactionState); + } + } + + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); + } + transactionState.pruneAfterVisible(); + if (MetricRepo.isInit) { + long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); + MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org