This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 496befd2926 [improvement](decommission be) decommission check replica num (#32748) 496befd2926 is described below commit 496befd292698102e842a27d523daaf70b46a328 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Mar 28 09:37:54 2024 +0800 [improvement](decommission be) decommission check replica num (#32748) --- .../java/org/apache/doris/alter/SystemHandler.java | 84 +++++++++++++++++++++- .../test_decommission_with_replica_num_fail.groovy | 59 +++++++++++++++ regression-test/suites/node_p0/test_backend.groovy | 26 ++++--- 3 files changed, 158 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index e503e093787..57e00f5ab14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -33,24 +33,34 @@ import org.apache.doris.analysis.ModifyBrokerClause; import org.apache.doris.analysis.ModifyFrontendHostNameClause; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MysqlCompatibleDatabase; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.NetUtils; import org.apache.doris.ha.FrontendNodeType; +import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.lang3.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /* * SystemHandler is for @@ -235,7 +245,8 @@ public class SystemHandler extends AlterHandler { decommissionBackends.add(backend); } - // TODO(cmy): check if replication num can be met + checkDecommissionWithReplicaAllocation(decommissionBackends); + // TODO(cmy): check remaining space return decommissionBackends; @@ -258,12 +269,81 @@ public class SystemHandler extends AlterHandler { decommissionBackends.add(backend); } - // TODO(cmy): check if replication num can be met + checkDecommissionWithReplicaAllocation(decommissionBackends); + // TODO(cmy): check remaining space return decommissionBackends; } + private static void checkDecommissionWithReplicaAllocation(List<Backend> decommissionBackends) + throws DdlException { + if (Config.isCloudMode() || decommissionBackends.isEmpty() + || DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) { + return; + } + + Set<Tag> decommissionTags = decommissionBackends.stream().map(be -> be.getLocationTag()) + .collect(Collectors.toSet()); + Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap(); + for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) { + long beId = backend.getId(); + if (!backend.isScheduleAvailable() + || decommissionBackends.stream().anyMatch(be -> be.getId() == beId)) { + continue; + } + + Tag tag = backend.getLocationTag(); + if (tag != null) { + tagAvailBackendNums.put(tag, tagAvailBackendNums.getOrDefault(tag, 0) + 1); + } + } + + Env env = Env.getCurrentEnv(); + List<Long> dbIds = env.getInternalCatalog().getDbIds(); + for (Long dbId : dbIds) { + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + + if (db instanceof MysqlCompatibleDatabase) { + continue; + } + + for (Table table : db.getTables()) { + table.readLock(); + try { + if (!table.needSchedule()) { + continue; + } + + OlapTable tbl = (OlapTable) table; + for (Partition partition : tbl.getAllPartitions()) { + ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); + for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) { + Tag tag = entry.getKey(); + if (!decommissionTags.contains(tag)) { + continue; + } + int replicaNum = (int) entry.getValue(); + int backendNum = tagAvailBackendNums.getOrDefault(tag, 0); + if (replicaNum > backendNum) { + throw new DdlException("After decommission, partition " + partition.getName() + + " of table " + db.getName() + "." + tbl.getName() + + " 's replication allocation { " + replicaAlloc + + " } > available backend num " + backendNum + " on tag " + tag + + ", otherwise need to decrease the partition's replication num."); + } + } + } + } finally { + table.readUnlock(); + } + } + } + } + @Override public synchronized void cancel(CancelStmt stmt) throws DdlException { CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt; diff --git a/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy new file mode 100644 index 00000000000..ff19adae27d --- /dev/null +++ b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('test_decommission_with_replica_num_fail') { + if (isCloudMode()) { + return + } + + def tbl = 'test_decommission_with_replica_num_fail' + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + def targetBackend = null + for (def be : backends) { + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() + if (alive && !decommissioned) { + replicaNum++ + targetBackend = be + } + } + assertTrue(replicaNum > 0) + + sql "DROP TABLE IF EXISTS ${tbl} FORCE" + sql """ + CREATE TABLE ${tbl} + ( + k1 int, + k2 int + ) + DISTRIBUTED BY HASH(k1) BUCKETS 6 + PROPERTIES + ( + "replication_num" = "${replicaNum}" + ); + """ + try { + test { + sql "ALTER SYSTEM DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'" + exception "otherwise need to decrease the partition's replication num" + } + } finally { + sql "CANCEL DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'" + } + sql "DROP TABLE IF EXISTS ${tbl} FORCE" +} diff --git a/regression-test/suites/node_p0/test_backend.groovy b/regression-test/suites/node_p0/test_backend.groovy index 1fe6f802e90..cce111b0a19 100644 --- a/regression-test/suites/node_p0/test_backend.groovy +++ b/regression-test/suites/node_p0/test_backend.groovy @@ -41,11 +41,12 @@ suite("test_backend", "nonConcurrent") { } if (context.config.jdbcUser.equals("root")) { + def beId1 = null try { + GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num"); try_sql """admin set frontend config("drop_backend_after_decommission" = "false")""" def result = sql_return_maparray """SHOW BACKENDS;""" logger.info("show backends result:${result}") - def beId1 = null for (def res : result) { beId1 = res.BackendId break @@ -58,16 +59,23 @@ suite("test_backend", "nonConcurrent") { assertTrue(res.SystemDecommissioned.toBoolean()) } } - result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" """ - logger.info("CANCEL DECOMMISSION BACKEND ${result}") - result = sql_return_maparray """SHOW BACKENDS;""" - for (def res : result) { - if (res.BackendId == "${beId1}") { - assertFalse(res.SystemDecommissioned.toBoolean()) + } finally { + try { + if (beId1 != null) { + def result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" """ + logger.info("CANCEL DECOMMISSION BACKEND ${result}") + + result = sql_return_maparray """SHOW BACKENDS;""" + for (def res : result) { + if (res.BackendId == "${beId1}") { + assertFalse(res.SystemDecommissioned.toBoolean()) + } + } } + } finally { + GetDebugPoint().disableDebugPointForAllFEs('SystemHandler.decommission_no_check_replica_num'); + try_sql """admin set frontend config("drop_backend_after_decommission" = "true")""" } - } finally { - try_sql """admin set frontend config("drop_backend_after_decommission" = "true")""" } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org