This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7ee271b98fa [fix](publish) Pick Catch exception in genPublishTask to 
make one failed txn does not block the other txns (#37724) (#38044)
7ee271b98fa is described below

commit 7ee271b98fa569389aff4da6d824bb1f094b9ca5
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu Jul 18 14:47:32 2024 +0800

    [fix](publish) Pick Catch exception in genPublishTask to make one failed 
txn does not block the other txns (#37724) (#38044)
    
    Pick https://github.com/apache/doris/pull/37724
---
 .../doris/transaction/PublishVersionDaemon.java    | 196 ++++++++++++---------
 1 file changed, 112 insertions(+), 84 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index d5d72869117..b83a0974301 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -73,6 +73,12 @@ public class PublishVersionDaemon extends MasterDaemon {
             LOG.warn("some transaction state need to publish, but no backend 
exists");
             return;
         }
+        traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, 
allBackends);
+        tryFinishTxn(readyTransactionStates, infoService, 
globalTransactionMgr);
+    }
+
+    private void 
traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState> 
readyTransactionStates,
+            List<Long> allBackends) {
         long createPublishVersionTaskTime = System.currentTimeMillis();
         // every backend-transaction identified a single task
         AgentBatchTask batchTask = new AgentBatchTask();
@@ -81,99 +87,121 @@ public class PublishVersionDaemon extends MasterDaemon {
             if (transactionState.hasSendTask()) {
                 continue;
             }
-            List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
-            for (TableCommitInfo tableCommitInfo : 
transactionState.getIdToTableCommitInfos().values()) {
-                
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
-            }
-            List<TPartitionVersionInfo> partitionVersionInfos = new 
ArrayList<>(partitionCommitInfos.size());
-            for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
-                TPartitionVersionInfo versionInfo = new 
TPartitionVersionInfo(commitInfo.getPartitionId(),
-                        commitInfo.getVersion(), 0);
-                partitionVersionInfos.add(versionInfo);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("try to publish version info partitionid [{}], 
version [{}]",
-                            commitInfo.getPartitionId(),
-                            commitInfo.getVersion());
-                }
-            }
-            Set<Long> publishBackends = 
transactionState.getPublishVersionTasks().keySet();
-            // public version tasks are not persisted in catalog, so 
publishBackends may be empty.
-            // so we have to try publish to all backends;
-            if (publishBackends.isEmpty()) {
-                // could not just add to it, should new a new object, or the 
back map will destroyed
-                publishBackends = Sets.newHashSet();
-                publishBackends.addAll(allBackends);
+            try {
+                genPublishTask(allBackends, transactionState, 
createPublishVersionTaskTime, batchTask);
+            } catch (Throwable t) {
+                LOG.error("errors while generate publish task for transaction: 
{}", transactionState, t);
             }
-
-            for (long backendId : publishBackends) {
-                PublishVersionTask task = new PublishVersionTask(backendId,
-                        transactionState.getTransactionId(),
-                        transactionState.getDbId(),
-                        partitionVersionInfos,
-                        createPublishVersionTaskTime);
-                // add to AgentTaskQueue for handling finish report.
-                // not check return value, because the add will success
-                AgentTaskQueue.addTask(task);
-                batchTask.addTask(task);
-                transactionState.addPublishVersionTask(backendId, task);
-            }
-            transactionState.setSendedTask();
-            LOG.info("send publish tasks for transaction: {}, db: {}", 
transactionState.getTransactionId(),
-                    transactionState.getDbId());
         }
         if (!batchTask.getAllTasks().isEmpty()) {
             AgentTaskExecutor.submit(batchTask);
         }
+    }
 
-        Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
-        // try to finish the transaction, if failed just retry in next loop
-        for (TransactionState transactionState : readyTransactionStates) {
-            Stream<PublishVersionTask> publishVersionTaskStream = 
transactionState
-                    .getPublishVersionTasks()
-                    .values()
-                    .stream()
-                    .peek(task -> {
-                        if (task.isFinished() && 
CollectionUtils.isEmpty(task.getErrorTablets())) {
-                            Map<Long, Long> tableIdToDeltaNumRows =
-                                    task.getTableIdToDeltaNumRows();
-                            tableIdToDeltaNumRows.forEach((tableId, numRows) 
-> {
-                                tableIdToTotalDeltaNumRows
-                                        .computeIfPresent(tableId, (id, 
orgNumRows) -> orgNumRows + numRows);
-                                
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
-                            });
-                        }
-                    });
-            boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
-                    .anyMatch(task -> !task.isFinished() && 
infoService.checkBackendAlive(task.getBackendId()));
-            
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
-
-            boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || 
transactionState.isPublishTimeout();
-            if (shouldFinishTxn) {
-                try {
-                    // one transaction exception should not affect other 
transaction
-                    
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
-                            transactionState.getTransactionId());
-                } catch (Exception e) {
-                    LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
-                }
-                if (transactionState.getTransactionStatus() != 
TransactionStatus.VISIBLE) {
-                    // if finish transaction state failed, then update publish 
version time, should check
-                    // to finish after some interval
-                    transactionState.updateSendTaskTime();
-                    LOG.debug("publish version for transaction {} failed", 
transactionState);
-                }
+    private void genPublishTask(List<Long> allBackends, TransactionState 
transactionState,
+            long createPublishVersionTaskTime, AgentBatchTask batchTask) {
+        List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
+        for (TableCommitInfo tableCommitInfo : 
transactionState.getIdToTableCommitInfos().values()) {
+            
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
+        }
+        List<TPartitionVersionInfo> partitionVersionInfos = new 
ArrayList<>(partitionCommitInfos.size());
+        for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
+            TPartitionVersionInfo versionInfo = new 
TPartitionVersionInfo(commitInfo.getPartitionId(),
+                    commitInfo.getVersion(), 0);
+            partitionVersionInfos.add(versionInfo);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("try to publish version info partitionid [{}], 
version [{}]",
+                        commitInfo.getPartitionId(),
+                        commitInfo.getVersion());
             }
+        }
+        Set<Long> publishBackends = 
transactionState.getPublishVersionTasks().keySet();
+        // public version tasks are not persisted in catalog, so 
publishBackends may be empty.
+        // so we have to try publish to all backends;
+        if (publishBackends.isEmpty()) {
+            // could not just add to it, should new a new object, or the back 
map will destroyed
+            publishBackends = Sets.newHashSet();
+            publishBackends.addAll(allBackends);
+        }
 
-            if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                for (PublishVersionTask task : 
transactionState.getPublishVersionTasks().values()) {
-                    AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION, task.getSignature());
-                }
-                transactionState.pruneAfterVisible();
-                if (MetricRepo.isInit) {
-                    long publishTime = 
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
-                    MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
-                }
+        for (long backendId : publishBackends) {
+            PublishVersionTask task = new PublishVersionTask(backendId,
+                    transactionState.getTransactionId(),
+                    transactionState.getDbId(),
+                    partitionVersionInfos,
+                    createPublishVersionTaskTime);
+            // add to AgentTaskQueue for handling finish report.
+            // not check return value, because the add will success
+            AgentTaskQueue.addTask(task);
+            batchTask.addTask(task);
+            transactionState.addPublishVersionTask(backendId, task);
+        }
+        transactionState.setSendedTask();
+        LOG.info("send publish tasks for transaction: {}, db: {}", 
transactionState.getTransactionId(),
+                transactionState.getDbId());
+    }
+
+    private void tryFinishTxn(List<TransactionState> readyTransactionStates,
+            SystemInfoService infoService, GlobalTransactionMgr 
globalTransactionMgr) {
+        for (TransactionState transactionState : readyTransactionStates) {
+            try {
+                // try to finish the transaction, if failed just retry in next 
loop
+                tryFinishOneTxn(transactionState, infoService, 
globalTransactionMgr);
+            } catch (Throwable t) {
+                LOG.error("errors while finish transaction: {}, publish tasks: 
{}", transactionState,
+                        transactionState.getPublishVersionTasks(), t);
             }
         } // end for readyTransactionStates
     }
+
+    private void tryFinishOneTxn(TransactionState transactionState, 
SystemInfoService infoService,
+            GlobalTransactionMgr globalTransactionMgr) {
+        Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
+        Stream<PublishVersionTask> publishVersionTaskStream = transactionState
+                .getPublishVersionTasks()
+                .values()
+                .stream()
+                .peek(task -> {
+                    if (task.isFinished() && 
CollectionUtils.isEmpty(task.getErrorTablets())) {
+                        Map<Long, Long> tableIdToDeltaNumRows =
+                                task.getTableIdToDeltaNumRows();
+                        tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
+                            tableIdToTotalDeltaNumRows
+                                    .computeIfPresent(tableId, (id, 
orgNumRows) -> orgNumRows + numRows);
+                            tableIdToTotalDeltaNumRows.putIfAbsent(tableId, 
numRows);
+                        });
+                    }
+                });
+        boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
+                .anyMatch(task -> !task.isFinished() && 
infoService.checkBackendAlive(task.getBackendId()));
+        
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+
+        boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || 
transactionState.isPublishTimeout();
+        if (shouldFinishTxn) {
+            try {
+                // one transaction exception should not affect other 
transaction
+                
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
+                        transactionState.getTransactionId());
+            } catch (Exception e) {
+                LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
+            }
+            if (transactionState.getTransactionStatus() != 
TransactionStatus.VISIBLE) {
+                // if finish transaction state failed, then update publish 
version time, should check
+                // to finish after some interval
+                transactionState.updateSendTaskTime();
+                LOG.debug("publish version for transaction {} failed", 
transactionState);
+            }
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            for (PublishVersionTask task : 
transactionState.getPublishVersionTasks().values()) {
+                AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION, task.getSignature());
+            }
+            transactionState.pruneAfterVisible();
+            if (MetricRepo.isInit) {
+                long publishTime = 
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
+                MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
+            }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to