This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7dd01b1fa9b [fix](publish) Pick Catch exception in genPublishTask to 
make one failed txn does not block the other txns (#37724) (#38042)
7dd01b1fa9b is described below

commit 7dd01b1fa9b3e3def95f8a62b67fe452b16e40d3
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu Jul 18 14:15:49 2024 +0800

    [fix](publish) Pick Catch exception in genPublishTask to make one failed 
txn does not block the other txns (#37724) (#38042)
    
    Pick https://github.com/apache/doris/pull/37724
---
 .../doris/transaction/PublishVersionDaemon.java    | 98 ++++++++++++----------
 1 file changed, 54 insertions(+), 44 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index a1861fb7f4d..ac1cbe9154c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -108,58 +108,68 @@ public class PublishVersionDaemon extends MasterDaemon {
             if (transactionState.hasSendTask()) {
                 continue;
             }
-            List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
-            for (TableCommitInfo tableCommitInfo : 
transactionState.getIdToTableCommitInfos().values()) {
-                
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
-
-                try {
-                    
beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, 
tableCommitInfo));
-                } catch (MetaNotFoundException e) {
-                    LOG.warn("exception occur when trying to get rollup 
tablets info", e);
-                }
+            try {
+                genPublishTask(allBackends, transactionState, 
createPublishVersionTaskTime, beIdToBaseTabletIds,
+                        batchTask);
+            } catch (Throwable t) {
+                LOG.error("errors while generate publish task for transaction: 
{}", transactionState, t);
             }
+        }
+        if (!batchTask.getAllTasks().isEmpty()) {
+            AgentTaskExecutor.submit(batchTask);
+        }
+    }
 
-            List<TPartitionVersionInfo> partitionVersionInfos = new 
ArrayList<>(partitionCommitInfos.size());
-            for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
-                TPartitionVersionInfo versionInfo = new 
TPartitionVersionInfo(commitInfo.getPartitionId(),
-                        commitInfo.getVersion(), 0);
-                partitionVersionInfos.add(versionInfo);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("try to publish version info partitionid [{}], 
version [{}]",
-                            commitInfo.getPartitionId(),
-                            commitInfo.getVersion());
-                }
-            }
+    private void genPublishTask(List<Long> allBackends, TransactionState 
transactionState,
+            long createPublishVersionTaskTime, Map<Long, Set<Long>> 
beIdToBaseTabletIds, AgentBatchTask batchTask) {
+        List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
+        for (TableCommitInfo tableCommitInfo : 
transactionState.getIdToTableCommitInfos().values()) {
+            
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
 
-            Set<Long> publishBackends = 
transactionState.getPublishVersionTasks().keySet();
-            // public version tasks are not persisted in catalog, so 
publishBackends may be empty.
-            // so we have to try publish to all backends;
-            if (publishBackends.isEmpty()) {
-                // could not just add to it, should new a new object, or the 
back map will destroyed
-                publishBackends = Sets.newHashSet();
-                publishBackends.addAll(allBackends);
+            try {
+                
beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, 
tableCommitInfo));
+            } catch (MetaNotFoundException e) {
+                LOG.warn("exception occur when trying to get rollup tablets 
info", e);
             }
+        }
 
-            for (long backendId : publishBackends) {
-                PublishVersionTask task = new PublishVersionTask(backendId,
-                        transactionState.getTransactionId(),
-                        transactionState.getDbId(),
-                        partitionVersionInfos,
-                        createPublishVersionTaskTime);
-                
task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, 
Collections.emptySet()));
-                // add to AgentTaskQueue for handling finish report.
-                // not check return value, because the add will success
-                AgentTaskQueue.addTask(task);
-                batchTask.addTask(task);
-                transactionState.addPublishVersionTask(backendId, task);
+        List<TPartitionVersionInfo> partitionVersionInfos = new 
ArrayList<>(partitionCommitInfos.size());
+        for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
+            TPartitionVersionInfo versionInfo = new 
TPartitionVersionInfo(commitInfo.getPartitionId(),
+                    commitInfo.getVersion(), 0);
+            partitionVersionInfos.add(versionInfo);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("try to publish version info partitionid [{}], 
version [{}]",
+                        commitInfo.getPartitionId(),
+                        commitInfo.getVersion());
             }
-            transactionState.setSendedTask();
-            LOG.info("send publish tasks for transaction: {}, db: {}", 
transactionState.getTransactionId(),
-                    transactionState.getDbId());
         }
-        if (!batchTask.getAllTasks().isEmpty()) {
-            AgentTaskExecutor.submit(batchTask);
+
+        Set<Long> publishBackends = 
transactionState.getPublishVersionTasks().keySet();
+        // public version tasks are not persisted in catalog, so 
publishBackends may be empty.
+        // so we have to try publish to all backends;
+        if (publishBackends.isEmpty()) {
+            // could not just add to it, should new a new object, or the back 
map will destroyed
+            publishBackends = Sets.newHashSet();
+            publishBackends.addAll(allBackends);
+        }
+
+        for (long backendId : publishBackends) {
+            PublishVersionTask task = new PublishVersionTask(backendId,
+                    transactionState.getTransactionId(),
+                    transactionState.getDbId(),
+                    partitionVersionInfos,
+                    createPublishVersionTaskTime);
+            task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, 
Collections.emptySet()));
+            // add to AgentTaskQueue for handling finish report.
+            // not check return value, because the add will success
+            AgentTaskQueue.addTask(task);
+            batchTask.addTask(task);
+            transactionState.addPublishVersionTask(backendId, task);
         }
+        transactionState.setSendedTask();
+        LOG.info("send publish tasks for transaction: {}, db: {}", 
transactionState.getTransactionId(),
+                transactionState.getDbId());
     }
 
     private void tryFinishTxn(List<TransactionState> readyTransactionStates,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to