This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 496befd2926 [improvement](decommission be) decommission check replica 
num (#32748)
496befd2926 is described below

commit 496befd292698102e842a27d523daaf70b46a328
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Mar 28 09:37:54 2024 +0800

    [improvement](decommission be) decommission check replica num (#32748)
---
 .../java/org/apache/doris/alter/SystemHandler.java | 84 +++++++++++++++++++++-
 .../test_decommission_with_replica_num_fail.groovy | 59 +++++++++++++++
 regression-test/suites/node_p0/test_backend.groovy | 26 ++++---
 3 files changed, 158 insertions(+), 11 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index e503e093787..57e00f5ab14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -33,24 +33,34 @@ import org.apache.doris.analysis.ModifyBrokerClause;
 import org.apache.doris.analysis.ModifyFrontendHostNameClause;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MysqlCompatibleDatabase;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.system.SystemInfoService.HostInfo;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /*
  * SystemHandler is for
@@ -235,7 +245,8 @@ public class SystemHandler extends AlterHandler {
             decommissionBackends.add(backend);
         }
 
-        // TODO(cmy): check if replication num can be met
+        checkDecommissionWithReplicaAllocation(decommissionBackends);
+
         // TODO(cmy): check remaining space
 
         return decommissionBackends;
@@ -258,12 +269,81 @@ public class SystemHandler extends AlterHandler {
             decommissionBackends.add(backend);
         }
 
-        // TODO(cmy): check if replication num can be met
+        checkDecommissionWithReplicaAllocation(decommissionBackends);
+
         // TODO(cmy): check remaining space
 
         return decommissionBackends;
     }
 
+    private static void checkDecommissionWithReplicaAllocation(List<Backend> 
decommissionBackends)
+            throws DdlException {
+        if (Config.isCloudMode() || decommissionBackends.isEmpty()
+                || 
DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) {
+            return;
+        }
+
+        Set<Tag> decommissionTags = decommissionBackends.stream().map(be -> 
be.getLocationTag())
+                .collect(Collectors.toSet());
+        Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
+        for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+            long beId = backend.getId();
+            if (!backend.isScheduleAvailable()
+                    || decommissionBackends.stream().anyMatch(be -> be.getId() 
== beId)) {
+                continue;
+            }
+
+            Tag tag = backend.getLocationTag();
+            if (tag != null) {
+                tagAvailBackendNums.put(tag, 
tagAvailBackendNums.getOrDefault(tag, 0) + 1);
+            }
+        }
+
+        Env env = Env.getCurrentEnv();
+        List<Long> dbIds = env.getInternalCatalog().getDbIds();
+        for (Long dbId : dbIds) {
+            Database db = env.getInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                continue;
+            }
+
+            if (db instanceof MysqlCompatibleDatabase) {
+                continue;
+            }
+
+            for (Table table : db.getTables()) {
+                table.readLock();
+                try {
+                    if (!table.needSchedule()) {
+                        continue;
+                    }
+
+                    OlapTable tbl = (OlapTable) table;
+                    for (Partition partition : tbl.getAllPartitions()) {
+                        ReplicaAllocation replicaAlloc = 
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
+                        for (Map.Entry<Tag, Short> entry : 
replicaAlloc.getAllocMap().entrySet()) {
+                            Tag tag = entry.getKey();
+                            if (!decommissionTags.contains(tag)) {
+                                continue;
+                            }
+                            int replicaNum = (int) entry.getValue();
+                            int backendNum = 
tagAvailBackendNums.getOrDefault(tag, 0);
+                            if (replicaNum > backendNum) {
+                                throw new DdlException("After decommission, 
partition " + partition.getName()
+                                        + " of table " + db.getName() + "." + 
tbl.getName()
+                                        + " 's replication allocation { " + 
replicaAlloc
+                                        + " } > available backend num " + 
backendNum + " on tag " + tag
+                                        + ", otherwise need to decrease the 
partition's replication num.");
+                            }
+                        }
+                    }
+                } finally {
+                    table.readUnlock();
+                }
+            }
+        }
+    }
+
     @Override
     public synchronized void cancel(CancelStmt stmt) throws DdlException {
         CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) 
stmt;
diff --git 
a/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
 
b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
new file mode 100644
index 00000000000..ff19adae27d
--- /dev/null
+++ 
b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('test_decommission_with_replica_num_fail') {
+    if (isCloudMode()) {
+        return
+    }
+
+    def tbl = 'test_decommission_with_replica_num_fail'
+    def backends = sql_return_maparray('show backends')
+    def replicaNum = 0
+    def targetBackend = null
+    for (def be : backends) {
+        def alive = be.Alive.toBoolean()
+        def decommissioned = be.SystemDecommissioned.toBoolean()
+        if (alive && !decommissioned) {
+            replicaNum++
+            targetBackend = be
+        }
+    }
+    assertTrue(replicaNum > 0)
+
+    sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+    sql """
+        CREATE TABLE ${tbl}
+        (
+            k1 int,
+            k2 int
+        )
+        DISTRIBUTED BY HASH(k1) BUCKETS 6
+        PROPERTIES
+        (
+            "replication_num" = "${replicaNum}"
+        );
+    """
+    try {
+        test {
+            sql "ALTER SYSTEM DECOMMISSION BACKEND 
'${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
+            exception "otherwise need to decrease the partition's replication 
num"
+        }
+    } finally {
+        sql "CANCEL DECOMMISSION BACKEND 
'${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
+    }
+    sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+}
diff --git a/regression-test/suites/node_p0/test_backend.groovy 
b/regression-test/suites/node_p0/test_backend.groovy
index 1fe6f802e90..cce111b0a19 100644
--- a/regression-test/suites/node_p0/test_backend.groovy
+++ b/regression-test/suites/node_p0/test_backend.groovy
@@ -41,11 +41,12 @@ suite("test_backend", "nonConcurrent") {
     }
 
     if (context.config.jdbcUser.equals("root")) {
+        def beId1 = null
         try {
+            
GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num");
             try_sql """admin set frontend 
config("drop_backend_after_decommission" = "false")"""
             def result = sql_return_maparray """SHOW BACKENDS;"""
             logger.info("show backends result:${result}")
-            def beId1 = null
             for (def res : result) {
                 beId1 = res.BackendId
                 break
@@ -58,16 +59,23 @@ suite("test_backend", "nonConcurrent") {
                     assertTrue(res.SystemDecommissioned.toBoolean())
                 }
             }
-            result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" """
-            logger.info("CANCEL DECOMMISSION BACKEND ${result}")
-            result = sql_return_maparray """SHOW BACKENDS;"""
-            for (def res : result) {
-                if (res.BackendId == "${beId1}") {
-                    assertFalse(res.SystemDecommissioned.toBoolean())
+        } finally {
+            try {
+                if (beId1 != null) {
+                    def result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" 
"""
+                    logger.info("CANCEL DECOMMISSION BACKEND ${result}")
+
+                    result = sql_return_maparray """SHOW BACKENDS;"""
+                    for (def res : result) {
+                        if (res.BackendId == "${beId1}") {
+                            assertFalse(res.SystemDecommissioned.toBoolean())
+                        }
+                    }
                 }
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllFEs('SystemHandler.decommission_no_check_replica_num');
+                try_sql """admin set frontend 
config("drop_backend_after_decommission" = "true")"""
             }
-        } finally {
-            try_sql """admin set frontend 
config("drop_backend_after_decommission" = "true")"""
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to