This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8a0d940914e [fix](publish) Pick Fix publish failed because because 
task is null (#37546)
8a0d940914e is described below

commit 8a0d940914ee8fd41acfe5ce08061fabc77da40a
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu Jul 11 15:22:04 2024 +0800

    [fix](publish) Pick Fix publish failed because because task is null (#37546)
    
    ## Proposed changes
    
    Pick https://github.com/apache/doris/pull/37531
    
    This pr catch the exception to make the failed txn does not block the
    other txns.
---
 .../doris/transaction/PublishVersionDaemon.java    | 114 +++++++++++++--------
 1 file changed, 69 insertions(+), 45 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 22ca57f2399..a1861fb7f4d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -91,6 +91,13 @@ public class PublishVersionDaemon extends MasterDaemon {
             LOG.warn("some transaction state need to publish, but no backend 
exists");
             return;
         }
+        traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, 
allBackends);
+        tryFinishTxn(readyTransactionStates, infoService, 
globalTransactionMgr, partitionVisibleVersions,
+                backendPartitions);
+    }
+
+    private void 
traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState> 
readyTransactionStates,
+            List<Long> allBackends) {
         long createPublishVersionTaskTime = System.currentTimeMillis();
         // every backend-transaction identified a single task
         AgentBatchTask batchTask = new AgentBatchTask();
@@ -153,60 +160,77 @@ public class PublishVersionDaemon extends MasterDaemon {
         if (!batchTask.getAllTasks().isEmpty()) {
             AgentTaskExecutor.submit(batchTask);
         }
+    }
 
-        Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
+    private void tryFinishTxn(List<TransactionState> readyTransactionStates,
+            SystemInfoService infoService, GlobalTransactionMgr 
globalTransactionMgr,
+            Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> 
backendPartitions) {
         // try to finish the transaction, if failed just retry in next loop
         for (TransactionState transactionState : readyTransactionStates) {
-            Stream<PublishVersionTask> publishVersionTaskStream = 
transactionState
-                    .getPublishVersionTasks()
-                    .values()
-                    .stream()
-                    .peek(task -> {
-                        if (task.isFinished() && 
CollectionUtils.isEmpty(task.getErrorTablets())) {
-                            Map<Long, Long> tableIdToDeltaNumRows =
-                                    task.getTableIdToDeltaNumRows();
-                            tableIdToDeltaNumRows.forEach((tableId, numRows) 
-> {
-                                tableIdToTotalDeltaNumRows
-                                        .computeIfPresent(tableId, (id, 
orgNumRows) -> orgNumRows + numRows);
-                                
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
-                            });
-                        }
-                    });
-            boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
-                    .anyMatch(task -> !task.isFinished() && 
infoService.checkBackendAlive(task.getBackendId()));
-            
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+            try {
+                // try to finish the transaction, if failed just retry in next 
loop
+                tryFinishOneTxn(transactionState, infoService, 
globalTransactionMgr, partitionVisibleVersions,
+                        backendPartitions);
+            } catch (Throwable t) {
+                LOG.error("errors while finish transaction: {}, publish tasks: 
{}", transactionState,
+                        transactionState.getPublishVersionTasks(), t);
+            }
+        } // end for readyTransactionStates
+    }
 
-            boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || 
transactionState.isPublishTimeout()
-                    || 
DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
-            if (shouldFinishTxn) {
-                try {
-                    // one transaction exception should not affect other 
transaction
-                    
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
-                            transactionState.getTransactionId(), 
partitionVisibleVersions, backendPartitions);
-                } catch (Exception e) {
-                    LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
-                }
-                if (transactionState.getTransactionStatus() != 
TransactionStatus.VISIBLE) {
-                    // if finish transaction state failed, then update publish 
version time, should check
-                    // to finish after some interval
-                    transactionState.updateSendTaskTime();
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("publish version for transaction {} failed", 
transactionState);
+    private void tryFinishOneTxn(TransactionState transactionState,
+            SystemInfoService infoService, GlobalTransactionMgr 
globalTransactionMgr,
+            Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> 
backendPartitions) {
+        Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
+        Stream<PublishVersionTask> publishVersionTaskStream = transactionState
+                .getPublishVersionTasks()
+                .values()
+                .stream()
+                .peek(task -> {
+                    if (task.isFinished() && 
CollectionUtils.isEmpty(task.getErrorTablets())) {
+                        Map<Long, Long> tableIdToDeltaNumRows =
+                                task.getTableIdToDeltaNumRows();
+                        tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
+                            tableIdToTotalDeltaNumRows
+                                    .computeIfPresent(tableId, (id, 
orgNumRows) -> orgNumRows + numRows);
+                            tableIdToTotalDeltaNumRows.putIfAbsent(tableId, 
numRows);
+                        });
                     }
+                });
+        boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
+                .anyMatch(task -> !task.isFinished() && 
infoService.checkBackendAlive(task.getBackendId()));
+        
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+
+        boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || 
transactionState.isPublishTimeout()
+                || 
DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
+        if (shouldFinishTxn) {
+            try {
+                // one transaction exception should not affect other 
transaction
+                
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
+                        transactionState.getTransactionId(), 
partitionVisibleVersions, backendPartitions);
+            } catch (Exception e) {
+                LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
+            }
+            if (transactionState.getTransactionStatus() != 
TransactionStatus.VISIBLE) {
+                // if finish transaction state failed, then update publish 
version time, should check
+                // to finish after some interval
+                transactionState.updateSendTaskTime();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("publish version for transaction {} failed", 
transactionState);
                 }
             }
+        }
 
-            if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                for (PublishVersionTask task : 
transactionState.getPublishVersionTasks().values()) {
-                    AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION, task.getSignature());
-                }
-                transactionState.pruneAfterVisible();
-                if (MetricRepo.isInit) {
-                    long publishTime = 
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
-                    MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
-                }
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            for (PublishVersionTask task : 
transactionState.getPublishVersionTasks().values()) {
+                AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION, task.getSignature());
             }
-        } // end for readyTransactionStates
+            transactionState.pruneAfterVisible();
+            if (MetricRepo.isInit) {
+                long publishTime = 
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
+                MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
+            }
+        }
     }
 
     private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState 
transactionState,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to