This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ef4fba649cb [Fix](group commit) Fix multiple cluster group commit BE 
select  strategy (#38644)
ef4fba649cb is described below

commit ef4fba649cb563efe64874addcbac11204925cfc
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Sun Aug 4 18:33:55 2024 +0800

    [Fix](group commit) Fix multiple cluster group commit BE select  strategy 
(#38644)
---
 .../org/apache/doris/load/GroupCommitManager.java  | 64 ++++++++++++++--------
 1 file changed, 41 insertions(+), 23 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 20f7b9ed9be..1009c4257b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -58,10 +58,10 @@ public class GroupCommitManager {
 
     private Set<Long> blockedTableIds = new HashSet<>();
 
-    // Table id to BE id map. Only for group commit.
-    private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
-    // BE id to pressure map. Only for group commit.
-    private Map<Long, SlidingWindowCounter> tablePressureMap = new 
ConcurrentHashMap<>();
+    // Encoded <Cluster and Table id> to BE id map. Only for group commit.
+    private final Map<String, Long> tableToBeMap = new ConcurrentHashMap<>();
+    // Table id to pressure map. Only for group commit.
+    private final Map<Long, SlidingWindowCounter> tableToPressureMap = new 
ConcurrentHashMap<>();
 
     public boolean isBlock(long tableId) {
         return blockedTableIds.contains(tableId);
@@ -243,13 +243,13 @@ public class GroupCommitManager {
 
     private long selectBackendForCloudGroupCommitInternal(long tableId, String 
cluster)
             throws DdlException, LoadException {
-        LOG.debug("cloud group commit select be info, tableToBeMap {}, 
tablePressureMap {}", tableToBeMap.toString(),
-                tablePressureMap.toString());
+        LOG.debug("cloud group commit select be info, tableToBeMap {}, 
tablePressureMap {}",
+                tableToBeMap.toString(), tableToPressureMap.toString());
         if (Strings.isNullOrEmpty(cluster)) {
             ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
         }
 
-        Long cachedBackendId = getCachedBackend(tableId);
+        Long cachedBackendId = getCachedBackend(cluster, tableId);
         if (cachedBackendId != null) {
             return cachedBackendId;
         }
@@ -261,7 +261,7 @@ public class GroupCommitManager {
             throw new LoadException("No alive backend");
         }
         // If the cached backend is not active or decommissioned, select a 
random new backend.
-        Long randomBackendId = getRandomBackend(tableId, backends);
+        Long randomBackendId = getRandomBackend(cluster, tableId, backends);
         if (randomBackendId != null) {
             return randomBackendId;
         }
@@ -274,8 +274,8 @@ public class GroupCommitManager {
 
     private long selectBackendForLocalGroupCommitInternal(long tableId) throws 
LoadException {
         LOG.debug("group commit select be info, tableToBeMap {}, 
tablePressureMap {}", tableToBeMap.toString(),
-                tablePressureMap.toString());
-        Long cachedBackendId = getCachedBackend(tableId);
+                tableToPressureMap.toString());
+        Long cachedBackendId = getCachedBackend(null, tableId);
         if (cachedBackendId != null) {
             return cachedBackendId;
         }
@@ -293,7 +293,7 @@ public class GroupCommitManager {
         }
 
         // If the cached backend is not active or decommissioned, select a 
random new backend.
-        Long randomBackendId = getRandomBackend(tableId, backends);
+        Long randomBackendId = getRandomBackend(null, tableId, backends);
         if (randomBackendId != null) {
             return randomBackendId;
         }
@@ -305,31 +305,41 @@ public class GroupCommitManager {
     }
 
     @Nullable
-    private Long getCachedBackend(long tableId) {
+    private Long getCachedBackend(String cluster, long tableId) {
         OlapTable table = (OlapTable) 
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
-        if (tableToBeMap.containsKey(tableId)) {
-            if (tablePressureMap.get(tableId).get() < 
table.getGroupCommitDataBytes()) {
-                Backend backend = 
Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
+        if (tableToBeMap.containsKey(encode(cluster, tableId))) {
+            if (tableToPressureMap.get(tableId).get() < 
table.getGroupCommitDataBytes()) {
+                // There are multiple threads getting cached backends for the 
same table.
+                // Maybe one thread removes the tableId from the tableToBeMap.
+                // Another thread gets the same tableId but can not find this 
tableId.
+                // So another thread needs to get the random backend.
+                Long backendId = tableToBeMap.get(encode(cluster, tableId));
+                Backend backend;
+                if (backendId != null) {
+                    backend = Env.getCurrentSystemInfo().getBackend(backendId);
+                } else {
+                    return null;
+                }
                 if (backend.isActive() && !backend.isDecommissioned()) {
                     return backend.getId();
                 } else {
-                    tableToBeMap.remove(tableId);
+                    tableToBeMap.remove(encode(cluster, tableId));
                 }
             } else {
-                tableToBeMap.remove(tableId);
+                tableToBeMap.remove(encode(cluster, tableId));
             }
         }
         return null;
     }
 
     @Nullable
-    private Long getRandomBackend(long tableId, List<Backend> backends) {
+    private Long getRandomBackend(String cluster, long tableId, List<Backend> 
backends) {
         OlapTable table = (OlapTable) 
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
         Collections.shuffle(backends);
         for (Backend backend : backends) {
             if (backend.isActive() && !backend.isDecommissioned()) {
-                tableToBeMap.put(tableId, backend.getId());
-                tablePressureMap.put(tableId,
+                tableToBeMap.put(encode(cluster, tableId), backend.getId());
+                tableToPressureMap.put(tableId,
                         new 
SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
                 return backend.getId();
             }
@@ -337,6 +347,14 @@ public class GroupCommitManager {
         return null;
     }
 
+    private String encode(String cluster, long tableId) {
+        if (cluster == null) {
+            return String.valueOf(tableId);
+        } else {
+            return cluster + tableId;
+        }
+    }
+
     public void updateLoadData(long tableId, long receiveData) {
         if (tableId == -1) {
             LOG.warn("invalid table id: " + tableId);
@@ -359,10 +377,10 @@ public class GroupCommitManager {
     }
 
     public void updateLoadDataInternal(long tableId, long receiveData) {
-        if (tablePressureMap.containsKey(tableId)) {
-            tablePressureMap.get(tableId).add(receiveData);
+        if (tableToPressureMap.containsKey(tableId)) {
+            tableToPressureMap.get(tableId).add(receiveData);
             LOG.info("Update load data for table{}, receiveData {}, 
tablePressureMap {}", tableId, receiveData,
-                    tablePressureMap.toString());
+                    tableToPressureMap.toString());
         } else {
             LOG.warn("can not find backend id: {}", tableId);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to