This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7ed116be9cd [improvement](decommission be) decommission check replica 
num #32748 (#34038)
7ed116be9cd is described below

commit 7ed116be9cdb43bdf0dd1e54dcee2da53d72da45
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Fri May 10 21:27:48 2024 +0800

    [improvement](decommission be) decommission check replica num #32748 
(#34038)
---
 .../java/org/apache/doris/alter/SystemHandler.java | 81 +++++++++++++++++++++-
 .../org/apache/doris/regression/suite/Suite.groovy |  3 +
 regression-test/pipeline/p0/conf/fe.conf           |  2 +
 regression-test/pipeline/p1/conf/be.conf           |  1 +
 regression-test/pipeline/p1/conf/fe.conf           |  2 +
 .../test_decommission_with_replica_num_fail.groovy | 55 +++++++++++++++
 regression-test/suites/node_p0/test_backend.groovy | 40 +++++++++++
 7 files changed, 183 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index d4aae2d7dc0..8be8d4a2153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -33,24 +33,34 @@ import org.apache.doris.analysis.ModifyBrokerClause;
 import org.apache.doris.analysis.ModifyFrontendHostNameClause;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MysqlCompatibleDatabase;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.system.SystemInfoService.HostInfo;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /*
  * SystemHandler is for
@@ -220,12 +230,81 @@ public class SystemHandler extends AlterHandler {
             decommissionBackends.add(backend);
         }
 
-        // TODO(cmy): check if replication num can be met
+        checkDecommissionWithReplicaAllocation(decommissionBackends);
+
         // TODO(cmy): check remaining space
 
         return decommissionBackends;
     }
 
+    private static void checkDecommissionWithReplicaAllocation(List<Backend> 
decommissionBackends)
+            throws DdlException {
+        if (decommissionBackends.isEmpty()
+                || 
DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) {
+            return;
+        }
+
+        Set<Tag> decommissionTags = decommissionBackends.stream().map(be -> 
be.getLocationTag())
+                .collect(Collectors.toSet());
+        Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
+        for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+            long beId = backend.getId();
+            if (!backend.isScheduleAvailable()
+                    || decommissionBackends.stream().anyMatch(be -> be.getId() 
== beId)) {
+                continue;
+            }
+
+            Tag tag = backend.getLocationTag();
+            if (tag != null) {
+                tagAvailBackendNums.put(tag, 
tagAvailBackendNums.getOrDefault(tag, 0) + 1);
+            }
+        }
+
+        Env env = Env.getCurrentEnv();
+        List<Long> dbIds = env.getInternalCatalog().getDbIds();
+        for (Long dbId : dbIds) {
+            Database db = env.getInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                continue;
+            }
+
+            if (db instanceof MysqlCompatibleDatabase) {
+                continue;
+            }
+
+            for (Table table : db.getTables()) {
+                table.readLock();
+                try {
+                    if (!table.isManagedTable()) {
+                        continue;
+                    }
+
+                    OlapTable tbl = (OlapTable) table;
+                    for (Partition partition : tbl.getAllPartitions()) {
+                        ReplicaAllocation replicaAlloc = 
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
+                        for (Map.Entry<Tag, Short> entry : 
replicaAlloc.getAllocMap().entrySet()) {
+                            Tag tag = entry.getKey();
+                            if (!decommissionTags.contains(tag)) {
+                                continue;
+                            }
+                            int replicaNum = (int) entry.getValue();
+                            int backendNum = 
tagAvailBackendNums.getOrDefault(tag, 0);
+                            if (replicaNum > backendNum) {
+                                throw new DdlException("After decommission, 
partition " + partition.getName()
+                                        + " of table " + db.getFullName() + 
"." + tbl.getName()
+                                        + " 's replication allocation { " + 
replicaAlloc
+                                        + " } > available backend num " + 
backendNum + " on tag " + tag
+                                        + ", otherwise need to decrease the 
partition's replication num.");
+                            }
+                        }
+                    }
+                } finally {
+                    table.readUnlock();
+                }
+            }
+        }
+    }
+
     @Override
     public synchronized void cancel(CancelStmt stmt) throws DdlException {
         CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) 
stmt;
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 7575c185d54..f0ca33486da 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -570,6 +570,9 @@ class Suite implements GroovyInterceptable {
         assert p.exitValue() == 0
     }
 
+    List<String> getFrontendIpHttpPort() {
+        return sql_return_maparray("show frontends").collect { it.Host + ":" + 
it.HttpPort };
+    }
 
     void getBackendIpHttpPort(Map<String, String> backendId_to_backendIP, 
Map<String, String> backendId_to_backendHttpPort) {
         List<List<Object>> backends = sql("show backends");
diff --git a/regression-test/pipeline/p0/conf/fe.conf 
b/regression-test/pipeline/p0/conf/fe.conf
index b301d04a88e..1ec05be9fd9 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -78,6 +78,8 @@ enable_map_type=true
 enable_struct_type=true
 enable_feature_binlog=true
 
+enable_debug_points=true
+
 # enable mtmv
 enable_mtmv = true
 
diff --git a/regression-test/pipeline/p1/conf/be.conf 
b/regression-test/pipeline/p1/conf/be.conf
index a97e528a6b8..ae95e4f65a8 100644
--- a/regression-test/pipeline/p1/conf/be.conf
+++ b/regression-test/pipeline/p1/conf/be.conf
@@ -71,3 +71,4 @@ enable_set_in_bitmap_value=true
 enable_feature_binlog=true
 max_sys_mem_available_low_water_mark_bytes=69206016
 enable_merge_on_write_correctness_check=false
+enable_debug_points=true
diff --git a/regression-test/pipeline/p1/conf/fe.conf 
b/regression-test/pipeline/p1/conf/fe.conf
index adc042357ca..10b13131e49 100644
--- a/regression-test/pipeline/p1/conf/fe.conf
+++ b/regression-test/pipeline/p1/conf/fe.conf
@@ -75,6 +75,8 @@ remote_fragment_exec_timeout_ms=60000
 fuzzy_test_type=p1
 use_fuzzy_session_variable=true
 
+enable_debug_points=true
+
 # enable mtmv
 enable_mtmv = true
 
diff --git 
a/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
 
b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
new file mode 100644
index 00000000000..d7941591eec
--- /dev/null
+++ 
b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('test_decommission_with_replica_num_fail') {
+    def tbl = 'test_decommission_with_replica_num_fail'
+    def backends = sql_return_maparray('show backends')
+    def replicaNum = 0
+    def targetBackend = null
+    for (def be : backends) {
+        def alive = be.Alive.toBoolean()
+        def decommissioned = be.SystemDecommissioned.toBoolean()
+        if (alive && !decommissioned) {
+            replicaNum++
+            targetBackend = be
+        }
+    }
+    assertTrue(replicaNum > 0)
+
+    sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+    sql """
+        CREATE TABLE ${tbl}
+        (
+            k1 int,
+            k2 int
+        )
+        DISTRIBUTED BY HASH(k1) BUCKETS 6
+        PROPERTIES
+        (
+            "replication_num" = "${replicaNum}"
+        );
+    """
+    try {
+        test {
+            sql "ALTER SYSTEM DECOMMISSION BACKEND 
'${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
+            exception "otherwise need to decrease the partition's replication 
num"
+        }
+    } finally {
+        sql "CANCEL DECOMMISSION BACKEND 
'${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
+    }
+    sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+}
diff --git a/regression-test/suites/node_p0/test_backend.groovy 
b/regression-test/suites/node_p0/test_backend.groovy
index 5de31b1f964..4c85ff1b54a 100644
--- a/regression-test/suites/node_p0/test_backend.groovy
+++ b/regression-test/suites/node_p0/test_backend.groovy
@@ -39,4 +39,44 @@ suite("test_backend") {
         result = sql """SHOW BACKENDS;"""
         logger.info("result:${result}")
     }
+
+    if (context.config.jdbcUser.equals("root")) {
+        def decommissionBe = null
+        try {
+            
GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num");
+            try_sql """admin set frontend 
config("drop_backend_after_decommission" = "false")"""
+            def result = sql_return_maparray """SHOW BACKENDS;"""
+            logger.info("show backends result:${result}")
+            for (def res : result) {
+                decommissionBe = res
+                break
+            }
+            sql """CANCEL DECOMMISSION BACKEND 
"${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
+            result = sql """ALTER SYSTEM DECOMMISSION BACKEND 
"${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
+            logger.info("ALTER SYSTEM DECOMMISSION BACKEND ${result}")
+            result = sql_return_maparray """SHOW BACKENDS;"""
+            for (def res : result) {
+                if (res.BackendId == "${decommissionBe.BackendId}") {
+                    assertTrue(res.SystemDecommissioned.toBoolean())
+                }
+            }
+        } finally {
+            try {
+                if (decommissionBe != null) {
+                    def result = sql """CANCEL DECOMMISSION BACKEND 
"${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
+                    logger.info("CANCEL DECOMMISSION BACKEND ${result}")
+
+                    result = sql_return_maparray """SHOW BACKENDS;"""
+                    for (def res : result) {
+                        if (res.BackendId == "${decommissionBe.BackendId}") {
+                            assertFalse(res.SystemDecommissioned.toBoolean())
+                        }
+                    }
+                }
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllFEs('SystemHandler.decommission_no_check_replica_num');
+                try_sql """admin set frontend 
config("drop_backend_after_decommission" = "true")"""
+            }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to