This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 7dd01b1fa9b [fix](publish) Pick Catch exception in genPublishTask to make one failed txn does not block the other txns (#37724) (#38042) 7dd01b1fa9b is described below commit 7dd01b1fa9b3e3def95f8a62b67fe452b16e40d3 Author: meiyi <myime...@gmail.com> AuthorDate: Thu Jul 18 14:15:49 2024 +0800 [fix](publish) Pick Catch exception in genPublishTask to make one failed txn does not block the other txns (#37724) (#38042) Pick https://github.com/apache/doris/pull/37724 --- .../doris/transaction/PublishVersionDaemon.java | 98 ++++++++++++---------- 1 file changed, 54 insertions(+), 44 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index a1861fb7f4d..ac1cbe9154c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -108,58 +108,68 @@ public class PublishVersionDaemon extends MasterDaemon { if (transactionState.hasSendTask()) { continue; } - List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>(); - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values()); - - try { - beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, tableCommitInfo)); - } catch (MetaNotFoundException e) { - LOG.warn("exception occur when trying to get rollup tablets info", e); - } + try { + genPublishTask(allBackends, transactionState, createPublishVersionTaskTime, beIdToBaseTabletIds, + batchTask); + } catch (Throwable t) { + LOG.error("errors while generate publish task for transaction: {}", transactionState, t); } + } + if (!batchTask.getAllTasks().isEmpty()) { + AgentTaskExecutor.submit(batchTask); + } + } - List<TPartitionVersionInfo> partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size()); - for (PartitionCommitInfo commitInfo : partitionCommitInfos) { - TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(), - commitInfo.getVersion(), 0); - partitionVersionInfos.add(versionInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("try to publish version info partitionid [{}], version [{}]", - commitInfo.getPartitionId(), - commitInfo.getVersion()); - } - } + private void genPublishTask(List<Long> allBackends, TransactionState transactionState, + long createPublishVersionTaskTime, Map<Long, Set<Long>> beIdToBaseTabletIds, AgentBatchTask batchTask) { + List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>(); + for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { + partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values()); - Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet(); - // public version tasks are not persisted in catalog, so publishBackends may be empty. - // so we have to try publish to all backends; - if (publishBackends.isEmpty()) { - // could not just add to it, should new a new object, or the back map will destroyed - publishBackends = Sets.newHashSet(); - publishBackends.addAll(allBackends); + try { + beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, tableCommitInfo)); + } catch (MetaNotFoundException e) { + LOG.warn("exception occur when trying to get rollup tablets info", e); } + } - for (long backendId : publishBackends) { - PublishVersionTask task = new PublishVersionTask(backendId, - transactionState.getTransactionId(), - transactionState.getDbId(), - partitionVersionInfos, - createPublishVersionTaskTime); - task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet())); - // add to AgentTaskQueue for handling finish report. - // not check return value, because the add will success - AgentTaskQueue.addTask(task); - batchTask.addTask(task); - transactionState.addPublishVersionTask(backendId, task); + List<TPartitionVersionInfo> partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size()); + for (PartitionCommitInfo commitInfo : partitionCommitInfos) { + TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(), + commitInfo.getVersion(), 0); + partitionVersionInfos.add(versionInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("try to publish version info partitionid [{}], version [{}]", + commitInfo.getPartitionId(), + commitInfo.getVersion()); } - transactionState.setSendedTask(); - LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(), - transactionState.getDbId()); } - if (!batchTask.getAllTasks().isEmpty()) { - AgentTaskExecutor.submit(batchTask); + + Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet(); + // public version tasks are not persisted in catalog, so publishBackends may be empty. + // so we have to try publish to all backends; + if (publishBackends.isEmpty()) { + // could not just add to it, should new a new object, or the back map will destroyed + publishBackends = Sets.newHashSet(); + publishBackends.addAll(allBackends); + } + + for (long backendId : publishBackends) { + PublishVersionTask task = new PublishVersionTask(backendId, + transactionState.getTransactionId(), + transactionState.getDbId(), + partitionVersionInfos, + createPublishVersionTaskTime); + task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet())); + // add to AgentTaskQueue for handling finish report. + // not check return value, because the add will success + AgentTaskQueue.addTask(task); + batchTask.addTask(task); + transactionState.addPublishVersionTask(backendId, task); } + transactionState.setSendedTask(); + LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(), + transactionState.getDbId()); } private void tryFinishTxn(List<TransactionState> readyTransactionStates, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org