This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e083dc26a00 [cherry-pick](branch-2.1) Pick "[Fix](group commit) Fix 
multiple cluster group commit BE select  strategy (#38644)" (#39010)
e083dc26a00 is described below

commit e083dc26a00c3a109de8eb51ee3af31b242edbfa
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Wed Aug 7 22:07:30 2024 +0800

    [cherry-pick](branch-2.1) Pick "[Fix](group commit) Fix multiple cluster 
group commit BE select  strategy (#38644)" (#39010)
    
    ## Proposed changes
    
    Pick #38644
    
    <!--Describe your changes.-->
---
 .../org/apache/doris/load/GroupCommitManager.java  | 30 ++++++++++++++--------
 1 file changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 1ec6a06179e..b6cf6cbb0a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -55,8 +55,8 @@ public class GroupCommitManager {
 
     // Table id to BE id map. Only for group commit.
     private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
-    // BE id to pressure map. Only for group commit.
-    private Map<Long, SlidingWindowCounter> tablePressureMap = new 
ConcurrentHashMap<>();
+    // Table id to pressure map. Only for group commit.
+    private Map<Long, SlidingWindowCounter> tableToPressureMap = new 
ConcurrentHashMap<>();
 
     public boolean isBlock(long tableId) {
         return blockedTableIds.contains(tableId);
@@ -236,8 +236,8 @@ public class GroupCommitManager {
     }
 
     private long selectBackendForLocalGroupCommitInternal(long tableId) throws 
LoadException {
-        LOG.debug("group commit select be info, tableToBeMap {}, 
tablePressureMap {}", tableToBeMap.toString(),
-                tablePressureMap.toString());
+        LOG.debug("group commit select be info, tableToBeMap {}, 
tableToPressureMap {}", tableToBeMap.toString(),
+                tableToPressureMap.toString());
         Long cachedBackendId = getCachedBackend(tableId);
         if (cachedBackendId != null) {
             return cachedBackendId;
@@ -264,8 +264,18 @@ public class GroupCommitManager {
     private Long getCachedBackend(long tableId) {
         OlapTable table = (OlapTable) 
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
         if (tableToBeMap.containsKey(tableId)) {
-            if (tablePressureMap.get(tableId).get() < 
table.getGroupCommitDataBytes()) {
-                Backend backend = 
Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
+            if (tableToPressureMap.get(tableId).get() < 
table.getGroupCommitDataBytes()) {
+                // There are multiple threads getting cached backends for the 
same table.
+                // Maybe one thread removes the tableId from the tableToBeMap.
+                // Another thread gets the same tableId but can not find this 
tableId.
+                // So another thread needs to get the random backend.
+                Long backendId = tableToBeMap.get(tableId);
+                Backend backend;
+                if (backendId != null) {
+                    backend = Env.getCurrentSystemInfo().getBackend(backendId);
+                } else {
+                    return null;
+                }
                 if (backend.isAlive() && !backend.isDecommissioned()) {
                     return backend.getId();
                 } else {
@@ -285,7 +295,7 @@ public class GroupCommitManager {
         for (Backend backend : backends) {
             if (backend.isAlive() && !backend.isDecommissioned()) {
                 tableToBeMap.put(tableId, backend.getId());
-                tablePressureMap.put(tableId,
+                tableToPressureMap.put(tableId,
                         new 
SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
                 return backend.getId();
             }
@@ -315,10 +325,10 @@ public class GroupCommitManager {
     }
 
     public void updateLoadDataInternal(long tableId, long receiveData) {
-        if (tablePressureMap.containsKey(tableId)) {
-            tablePressureMap.get(tableId).add(receiveData);
+        if (tableToPressureMap.containsKey(tableId)) {
+            tableToPressureMap.get(tableId).add(receiveData);
             LOG.info("Update load data for table{}, receiveData {}, 
tablePressureMap {}", tableId, receiveData,
-                    tablePressureMap.toString());
+                    tableToPressureMap.toString());
         } else {
             LOG.warn("can not find backend id: {}", tableId);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to