This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 74c8eed5695 [fix](replica num) Fix the decrease in the number of replicas and une… (#48704) 74c8eed5695 is described below commit 74c8eed5695ba32edc93ab56b2edfb478a09420a Author: deardeng <deng...@selectdb.com> AuthorDate: Wed Mar 12 10:29:26 2025 +0800 [fix](replica num) Fix the decrease in the number of replicas and une… (#48704) …ven distribution of replicas among bes --- .../main/java/org/apache/doris/alter/Alter.java | 118 ++++++++++++++- .../java/org/apache/doris/catalog/Replica.java | 18 +++ .../org/apache/doris/clone/TabletScheduler.java | 12 ++ .../doris/clone/DecreaseReplicationNumTest.java | 166 +++++++++++++++++++++ 4 files changed, 313 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 87a9b8df0e7..e3c65da4046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -47,15 +47,18 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.View; import org.apache.doris.cloud.alter.CloudSchemaChangeHandler; import org.apache.doris.common.AnalysisException; @@ -82,6 +85,8 @@ import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.policy.StoragePolicy; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TOdbcTableType; import org.apache.doris.thrift.TSortType; import org.apache.doris.thrift.TTabletType; @@ -101,6 +106,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; public class Alter { private static final Logger LOG = LogManager.getLogger(Alter.class); @@ -988,6 +994,15 @@ public class Alter { // modify meta here PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + Map<Long, Long> tableBeToReplicaNumMap = Maps.newHashMap(); + for (String partitionName : partitionNames) { + Partition partition = olapTable.getPartition(partitionName, isTempPartition); + Map<Long, Long> partitionBeToReplicaNumMap = getReplicaCountByBackend(partition); + + for (Map.Entry<Long, Long> entry : partitionBeToReplicaNumMap.entrySet()) { + tableBeToReplicaNumMap.merge(entry.getKey(), entry.getValue(), Long::sum); + } + } for (String partitionName : partitionNames) { Partition partition = olapTable.getPartition(partitionName, isTempPartition); // 4. data property @@ -1040,6 +1055,10 @@ public class Alter { } // 2. replica allocation if (!replicaAlloc.isNotSet()) { + if (Config.isNotCloudMode() && !olapTable.isColocateTable()) { + setReplicasToDrop(partition, partitionInfo.getReplicaAllocation(partition.getId()), + replicaAlloc, tableBeToReplicaNumMap); + } partitionInfo.setReplicaAllocation(partition.getId(), replicaAlloc); } // 3. in memory @@ -1062,11 +1081,108 @@ public class Alter { Env.getCurrentEnv().getEditLog().logBatchModifyPartition(info); } + public void setReplicasToDrop(Partition partition, + ReplicaAllocation oldReplicaAlloc, + ReplicaAllocation newReplicaAlloc, + Map<Long, Long> tableBeToReplicaNumMap) { + if (newReplicaAlloc.getAllocMap().entrySet().stream().noneMatch( + entry -> entry.getValue() < oldReplicaAlloc.getReplicaNumByTag(entry.getKey()))) { + return; + } + + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + List<Long> aliveBes = systemInfoService.getAllBackendIds(true); + + processReplicasInPartition(partition, + tableBeToReplicaNumMap, systemInfoService, oldReplicaAlloc, newReplicaAlloc, aliveBes); + } + + private void processReplicasInPartition(Partition partition, + Map<Long, Long> tableBeToReplicaNumMap, SystemInfoService systemInfoService, + ReplicaAllocation oldReplicaAlloc, ReplicaAllocation newReplicaAlloc, + List<Long> aliveBes) { + List<Tag> changeTags = newReplicaAlloc.getAllocMap().entrySet().stream() + .filter(entry -> entry.getValue() < oldReplicaAlloc.getReplicaNumByTag(entry.getKey())) + .map(Map.Entry::getKey).collect(Collectors.toList()); + Map<Long, Long> partitionBeToReplicaNumMap = getReplicaCountByBackend(partition); + for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) { + for (Tablet tablet : index.getTablets()) { + if (!isTabletHealthy(tablet, systemInfoService, partition, oldReplicaAlloc, aliveBes)) { + continue; + } + Map<Tag, List<Replica>> tagToReplicaMap = getReplicasWithTag(tablet); + for (Tag tag : changeTags) { + List<Replica> toDealReplicas = tagToReplicaMap.get(tag); + if (toDealReplicas == null || toDealReplicas.isEmpty()) { + continue; + } + sortReplicasByBackendCount(toDealReplicas, tableBeToReplicaNumMap, partitionBeToReplicaNumMap); + int replicasToDrop = oldReplicaAlloc.getReplicaNumByTag(tag) + - newReplicaAlloc.getReplicaNumByTag(tag); + markReplicasForDropping(toDealReplicas, replicasToDrop, + tableBeToReplicaNumMap, partitionBeToReplicaNumMap); + } + } + } + } + + private boolean isTabletHealthy(Tablet tablet, SystemInfoService systemInfoService, + Partition partition, ReplicaAllocation oldReplicaAlloc, + List<Long> aliveBes) { + return tablet.getHealth(systemInfoService, partition.getVisibleVersion(), oldReplicaAlloc, aliveBes) + .status == Tablet.TabletStatus.HEALTHY; + } + + + private Map<Tag, List<Replica>> getReplicasWithTag(Tablet tablet) { + return tablet.getReplicas().stream() + .collect(Collectors.groupingBy(replica -> Env.getCurrentSystemInfo() + .getBackend(replica.getBackendIdWithoutException()).getLocationTag())); + } + + private void sortReplicasByBackendCount(List<Replica> replicas, + Map<Long, Long> tableBeToReplicaNumMap, + Map<Long, Long> partitionBeToReplicaNumMap) { + replicas.sort((Replica r1, Replica r2) -> { + long countPartition1 = partitionBeToReplicaNumMap.getOrDefault(r1.getBackendIdWithoutException(), 0L); + long countPartition2 = partitionBeToReplicaNumMap.getOrDefault(r2.getBackendIdWithoutException(), 0L); + if (countPartition1 != countPartition2) { + return Long.compare(countPartition2, countPartition1); + } + long countTable1 = tableBeToReplicaNumMap.getOrDefault(r1.getBackendIdWithoutException(), 0L); + long countTable2 = tableBeToReplicaNumMap.getOrDefault(r2.getBackendIdWithoutException(), 0L); + return Long.compare(countTable2, countTable1); // desc sort + }); + } + + private void markReplicasForDropping(List<Replica> replicas, int replicasToDrop, + Map<Long, Long> tableBeToReplicaNumMap, + Map<Long, Long> partitionBeToReplicaNumMap) { + for (int i = 0; i < replicas.size(); i++) { + Replica r = replicas.get(i); + long beId = r.getBackendIdWithoutException(); + if (i >= replicasToDrop) { + r.setScaleInDropTimeStamp(-1); // Mark for not dropping + } else { + r.setScaleInDropTimeStamp(System.currentTimeMillis()); // Mark for dropping + tableBeToReplicaNumMap.put(beId, tableBeToReplicaNumMap.get(beId) - 1); + partitionBeToReplicaNumMap.put(beId, partitionBeToReplicaNumMap.get(beId) - 1); + } + } + } + + public static Map<Long, Long> getReplicaCountByBackend(Partition partition) { + return partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE).stream() + .flatMap(index -> index.getTablets().stream()) + .flatMap(tablet -> tablet.getBackendIds().stream()) + .collect(Collectors.groupingBy(id -> id, Collectors.counting())); + } + public void checkNoForceProperty(Map<String, String> properties) throws DdlException { for (RewriteProperty property : PropertyAnalyzer.getInstance().getForceProperties()) { if (properties.containsKey(property.key())) { throw new DdlException("Cann't modify property '" + property.key() + "'" - + (Config.isCloudMode() ? " in cloud mode" : "") + "."); + + (Config.isCloudMode() ? " in cloud mode" : "") + "."); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 22f7128abe3..868b2f199a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -197,6 +197,8 @@ public class Replica { private long userDropTime = -1; + private long scaleInDropTime = -1; + private long lastReportVersion = 0; public Replica() { @@ -862,6 +864,22 @@ public class Replica { return false; } + public void setScaleInDropTimeStamp(long scaleInDropTime) { + this.scaleInDropTime = scaleInDropTime; + } + + public boolean isScaleInDrop() { + if (this.scaleInDropTime > 0) { + if (System.currentTimeMillis() - this.scaleInDropTime + < Config.manual_drop_replica_valid_second * 1000L) { + return true; + } + this.scaleInDropTime = -1; + } + return false; + } + + public boolean isAlive() { return getState() != ReplicaState.CLONE && getState() != ReplicaState.DECOMMISSION diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index dc07ddb0be4..da13d5c61c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -900,6 +900,7 @@ public class TabletScheduler extends MasterDaemon { || deleteReplicaNotInValidTag(tabletCtx, force) || deleteReplicaChosenByRebalancer(tabletCtx, force) || deleteReplicaOnUrgentHighDisk(tabletCtx, force) + || deleteFromScaleInDropReplicas(tabletCtx, force) || deleteReplicaOnHighLoadBackend(tabletCtx, force)) { // if we delete at least one redundant replica, we still throw a SchedException with status FINISHED // to remove this tablet from the pendingTablets(consider it as finished) @@ -1089,6 +1090,17 @@ public class TabletScheduler extends MasterDaemon { return deleteFromHighLoadBackend(tabletCtx, tabletCtx.getReplicas(), force, statistic); } + private boolean deleteFromScaleInDropReplicas(TabletSchedCtx tabletCtx, boolean force) throws SchedException { + // Check if there are any scale drop replicas + for (Replica replica : tabletCtx.getReplicas()) { + if (replica.isScaleInDrop()) { + deleteReplicaInternal(tabletCtx, replica, "scale drop replica", force); + return true; + } + } + return false; + } + private boolean deleteFromHighLoadBackend(TabletSchedCtx tabletCtx, List<Replica> replicas, boolean force, LoadStatisticForTag statistic) throws SchedException { Replica chosenReplica = null; diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecreaseReplicationNumTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecreaseReplicationNumTest.java new file mode 100644 index 00000000000..35054f822ef --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecreaseReplicationNumTest.java @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.alter.Alter; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DecreaseReplicationNumTest extends TestWithFeService { + + private Database db; + + @Override + protected void beforeCreatingConnectContext() throws Exception { + Config.enable_debug_points = true; + Config.disable_balance = true; + Config.drop_backend_after_decommission = false; + Config.tablet_schedule_interval_ms = 1000L; + Config.tablet_checker_interval_ms = 1000L; + } + + @Override + protected int backendNum() { + return 5; + } + + @Override + protected void runBeforeAll() throws Exception { + Thread.sleep(1000); + createDatabase("test"); + useDatabase("test"); + db = Env.getCurrentInternalCatalog().getDbOrMetaException("test"); + } + + @Override + protected void runBeforeEach() throws Exception { + // set back to default value + Config.max_scheduling_tablets = 2000; + for (Table table : db.getTables()) { + dropTable(table.getName(), true); + } + Env.getCurrentEnv().getTabletScheduler().clear(); + DebugPointUtil.clearDebugPoints(); + Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG))); + } + + + @Test + public void testDecreaseReplicaNum() throws Exception { + createTable("CREATE TABLE tbl1 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 10" + + " PROPERTIES ('replication_num' = '5')"); + + OlapTable table = (OlapTable) db.getTableOrMetaException("tbl1"); + Partition partition = table.getPartitions().iterator().next(); + Map<Long, Long> beIdToTabletNum = Alter.getReplicaCountByBackend(partition); + Assertions.assertEquals(5, beIdToTabletNum.size()); + Assertions.assertEquals(Lists.newArrayList(10L, 10L, 10L, 10L, 10L), new ArrayList<>(beIdToTabletNum.values())); + beIdToTabletNum.forEach((key, value) -> Assertions.assertEquals(value, 10L)); + + List<Backend> backends = Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG); + Assertions.assertEquals(backendNum(), backends.size()); + Backend highLoadBe = backends.get(0); + DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID", highLoadBe.getId()); + + alterTableSync("ALTER TABLE tbl1 MODIFY PARTITION(*) SET ('replication_num' = '3')"); + boolean succ = false; + for (int i = 0; i < 100; i++) { + beIdToTabletNum = Alter.getReplicaCountByBackend(partition); + Set<Long> afterAlter = new HashSet<>(beIdToTabletNum.values()); + // wait for scheduler + if (afterAlter.size() == 1 && !beIdToTabletNum.containsValue(10L)) { + Assertions.assertTrue(afterAlter.contains(6L)); + Assertions.assertEquals(Lists.newArrayList(6L, 6L, 6L, 6L, 6L), new ArrayList<>(beIdToTabletNum.values())); + succ = true; + break; + } + Thread.sleep(1000); + } + Assertions.assertTrue(succ); + } + + @Test + public void testDecreaseMultiPartitionReplicaNum() throws Exception { + createTable("create table test_multi(id int, part int) " + + "partition by range(part) (" + + " partition p1 values[('1'), ('2'))," + + " partition p2 values[('2'), ('3'))," + + " partition p3 values[('3'), ('4'))" + + ") " + + "distributed by hash(id) BUCKETS 9 " + + "properties ('replication_num'='4')"); + + OlapTable table = (OlapTable) db.getTableOrMetaException("test_multi"); + List<Partition> partitions = table.getAllPartitions(); + partitions.forEach(p -> { + Map<Long, Long> beIdToTabletNum = Alter.getReplicaCountByBackend(p); + Assertions.assertEquals(5, beIdToTabletNum.size()); + List<Long> sortedValues = new ArrayList<>(beIdToTabletNum.values()); + sortedValues.sort(Collections.reverseOrder()); + Assertions.assertEquals(Lists.newArrayList(8L, 7L, 7L, 7L, 7L), sortedValues); + }); + + List<Backend> backends = Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG); + Assertions.assertEquals(backendNum(), backends.size()); + Backend highLoadBe = backends.get(0); + DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID", highLoadBe.getId()); + + alterTableSync("ALTER TABLE test_multi MODIFY PARTITION(*) SET ('replication_num' = '2')"); + partitions.forEach(p -> { + boolean succ = false; + for (int i = 0; i < 100; i++) { + Map<Long, Long> beIdToTabletNum = Alter.getReplicaCountByBackend(p); + Set<Long> afterAlter = new HashSet<>(beIdToTabletNum.values()); + List<Long> sortedValues = new ArrayList<>(beIdToTabletNum.values()); + sortedValues.sort(Collections.reverseOrder()); + // wait for scheduler + if (afterAlter.size() == 2 && beIdToTabletNum.containsValue(4L) && beIdToTabletNum.containsValue(3L)) { + Assertions.assertEquals(Lists.newArrayList(4L, 4L, 4L, 3L, 3L), sortedValues); + succ = true; + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + System.out.println(ignored); + } + } + Assertions.assertTrue(succ); + }); + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org