This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f1a0ae4b72 [fix](race) fix access colocate group ids race (#36444)
8f1a0ae4b72 is described below

commit 8f1a0ae4b72bb668249c38cea53716e4c30252ac
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Wed Jun 19 09:44:03 2024 +0800

    [fix](race) fix access colocate group ids race (#36444)
    
    ```
    2024-06-17 21:42:30,205 ERROR (colocate group clone checker|390) 
[Daemon.run():118] daemon thread got exception. name: colocate group clone 
checker
    java.util.ConcurrentModificationException: null
            at java.util.HashMap$HashIterator.nextNode(HashMap.java:1597) ~[?:?]
            at java.util.HashMap$EntryIterator.next(HashMap.java:1630) ~[?:?]
            at java.util.HashMap$EntryIterator.next(HashMap.java:1628) ~[?:?]
            at 
com.google.common.collect.AbstractMapBasedMultimap$KeySet$1.next(AbstractMapBasedMultimap.java:964)
 ~[guava-32.1.2-jre.jar:?]
            at 
org.apache.doris.clone.ColocateTableCheckerAndBalancer.matchGroups(ColocateTableCheckerAndBalancer.java:482)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.clone.ColocateTableCheckerAndBalancer.runAfterCatalogReady(ColocateTableCheckerAndBalancer.java:339)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58) 
~[doris-fe.jar:1.2-SNAPSHOT]
            at org.apache.doris.common.util.Daemon.run(Daemon.java:116) 
~[doris-fe.jar:1.2-SNAPSHOT]
    ```
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 .../org/apache/doris/catalog/ColocateTableIndex.java   |  2 +-
 .../doris/clone/ColocateTableCheckerAndBalancer.java   | 18 ++++++++++++++++--
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 0675b1fbb5d..28fc0ad55b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -430,7 +430,7 @@ public class ColocateTableIndex implements Writable {
     public Set<GroupId> getAllGroupIds() {
         readLock();
         try {
-            return group2Tables.keySet();
+            return Sets.newHashSet(group2Tables.keySet());
         } finally {
             readUnlock();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 740acd331c3..292013ec05a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -378,7 +378,7 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
      *  A    B    C    D
      */
     private void relocateAndBalanceGroups() {
-        Set<GroupId> groupIds = 
Sets.newHashSet(Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds());
+        Set<GroupId> groupIds = 
Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds();
 
         // balance only inside each group, excluded balance between all groups
         Set<GroupId> changeGroups = relocateAndBalanceGroup(groupIds, false);
@@ -410,6 +410,10 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
             }
 
             ColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            if (groupSchema == null) {
+                LOG.info("Not found colocate group {}, maybe delete", groupId);
+                continue;
+            }
             ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
             try {
                 
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
@@ -480,13 +484,18 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
         // check each group
         Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
         for (GroupId groupId : groupIds) {
+            ColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            if (groupSchema == null) {
+                LOG.info("Not found colocate group {}, maybe delete", groupId);
+                continue;
+            }
+
             List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
             List<Set<Long>> backendBucketsSeq = 
colocateIndex.getBackendsPerBucketSeqSet(groupId);
             if (backendBucketsSeq.isEmpty()) {
                 continue;
             }
 
-            ColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
             ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
             String unstableReason = null;
             OUT:
@@ -588,6 +597,7 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
         for (GroupId groupId : groupIds) {
             ColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
             if (groupSchema == null) {
+                LOG.info("Not found colocate group {}, maybe delete", groupId);
                 continue;
             }
             ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
@@ -718,6 +728,10 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
             GlobalColocateStatistic globalColocateStatistic, List<List<Long>> 
balancedBackendsPerBucketSeq,
             boolean balanceBetweenGroups) {
         ColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+        if (groupSchema == null) {
+            LOG.info("Not found colocate group {}, maybe delete", groupId);
+            return false;
+        }
         short replicaNum = 
groupSchema.getReplicaAlloc().getReplicaNumByTag(tag);
         List<List<Long>> backendsPerBucketSeq = Lists.newArrayList(
                 colocateIndex.getBackendsPerBucketSeqByTag(groupId, tag));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to