This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 74c8eed5695 [fix](replica num) Fix the decrease in the number of 
replicas and une… (#48704)
74c8eed5695 is described below

commit 74c8eed5695ba32edc93ab56b2edfb478a09420a
Author: deardeng <deng...@selectdb.com>
AuthorDate: Wed Mar 12 10:29:26 2025 +0800

    [fix](replica num) Fix the decrease in the number of replicas and une… 
(#48704)
    
    …ven distribution of replicas among bes
---
 .../main/java/org/apache/doris/alter/Alter.java    | 118 ++++++++++++++-
 .../java/org/apache/doris/catalog/Replica.java     |  18 +++
 .../org/apache/doris/clone/TabletScheduler.java    |  12 ++
 .../doris/clone/DecreaseReplicationNumTest.java    | 166 +++++++++++++++++++++
 4 files changed, 313 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 87a9b8df0e7..e3c65da4046 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -47,15 +47,18 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MysqlTable;
 import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.View;
 import org.apache.doris.cloud.alter.CloudSchemaChangeHandler;
 import org.apache.doris.common.AnalysisException;
@@ -82,6 +85,8 @@ import 
org.apache.doris.persist.ModifyTablePropertyOperationLog;
 import org.apache.doris.persist.ReplaceTableOperationLog;
 import org.apache.doris.policy.StoragePolicy;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TOdbcTableType;
 import org.apache.doris.thrift.TSortType;
 import org.apache.doris.thrift.TTabletType;
@@ -101,6 +106,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class Alter {
     private static final Logger LOG = LogManager.getLogger(Alter.class);
@@ -988,6 +994,15 @@ public class Alter {
 
         // modify meta here
         PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+        Map<Long, Long> tableBeToReplicaNumMap = Maps.newHashMap();
+        for (String partitionName : partitionNames) {
+            Partition partition = olapTable.getPartition(partitionName, 
isTempPartition);
+            Map<Long, Long> partitionBeToReplicaNumMap = 
getReplicaCountByBackend(partition);
+
+            for (Map.Entry<Long, Long> entry : 
partitionBeToReplicaNumMap.entrySet()) {
+                tableBeToReplicaNumMap.merge(entry.getKey(), entry.getValue(), 
Long::sum);
+            }
+        }
         for (String partitionName : partitionNames) {
             Partition partition = olapTable.getPartition(partitionName, 
isTempPartition);
             // 4. data property
@@ -1040,6 +1055,10 @@ public class Alter {
             }
             // 2. replica allocation
             if (!replicaAlloc.isNotSet()) {
+                if (Config.isNotCloudMode() && !olapTable.isColocateTable()) {
+                    setReplicasToDrop(partition, 
partitionInfo.getReplicaAllocation(partition.getId()),
+                            replicaAlloc, tableBeToReplicaNumMap);
+                }
                 partitionInfo.setReplicaAllocation(partition.getId(), 
replicaAlloc);
             }
             // 3. in memory
@@ -1062,11 +1081,108 @@ public class Alter {
         Env.getCurrentEnv().getEditLog().logBatchModifyPartition(info);
     }
 
+    public void setReplicasToDrop(Partition partition,
+                                 ReplicaAllocation oldReplicaAlloc,
+                                 ReplicaAllocation newReplicaAlloc,
+                                 Map<Long, Long> tableBeToReplicaNumMap) {
+        if (newReplicaAlloc.getAllocMap().entrySet().stream().noneMatch(
+                entry -> entry.getValue() < 
oldReplicaAlloc.getReplicaNumByTag(entry.getKey()))) {
+            return;
+        }
+
+        SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
+        List<Long> aliveBes = systemInfoService.getAllBackendIds(true);
+
+        processReplicasInPartition(partition,
+                tableBeToReplicaNumMap, systemInfoService, oldReplicaAlloc, 
newReplicaAlloc, aliveBes);
+    }
+
+    private void processReplicasInPartition(Partition partition,
+                                            Map<Long, Long> 
tableBeToReplicaNumMap, SystemInfoService systemInfoService,
+                                            ReplicaAllocation oldReplicaAlloc, 
ReplicaAllocation newReplicaAlloc,
+                                            List<Long> aliveBes) {
+        List<Tag> changeTags = 
newReplicaAlloc.getAllocMap().entrySet().stream()
+                .filter(entry -> entry.getValue() < 
oldReplicaAlloc.getReplicaNumByTag(entry.getKey()))
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+        Map<Long, Long> partitionBeToReplicaNumMap = 
getReplicaCountByBackend(partition);
+        for (MaterializedIndex index : 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
+            for (Tablet tablet : index.getTablets()) {
+                if (!isTabletHealthy(tablet, systemInfoService, partition, 
oldReplicaAlloc, aliveBes)) {
+                    continue;
+                }
+                Map<Tag, List<Replica>> tagToReplicaMap = 
getReplicasWithTag(tablet);
+                for (Tag tag : changeTags) {
+                    List<Replica> toDealReplicas = tagToReplicaMap.get(tag);
+                    if (toDealReplicas == null || toDealReplicas.isEmpty()) {
+                        continue;
+                    }
+                    sortReplicasByBackendCount(toDealReplicas, 
tableBeToReplicaNumMap, partitionBeToReplicaNumMap);
+                    int replicasToDrop = 
oldReplicaAlloc.getReplicaNumByTag(tag)
+                            - newReplicaAlloc.getReplicaNumByTag(tag);
+                    markReplicasForDropping(toDealReplicas, replicasToDrop,
+                            tableBeToReplicaNumMap, 
partitionBeToReplicaNumMap);
+                }
+            }
+        }
+    }
+
+    private boolean isTabletHealthy(Tablet tablet, SystemInfoService 
systemInfoService,
+                                    Partition partition, ReplicaAllocation 
oldReplicaAlloc,
+                                    List<Long> aliveBes) {
+        return tablet.getHealth(systemInfoService, 
partition.getVisibleVersion(), oldReplicaAlloc, aliveBes)
+                     .status == Tablet.TabletStatus.HEALTHY;
+    }
+
+
+    private Map<Tag, List<Replica>> getReplicasWithTag(Tablet tablet) {
+        return tablet.getReplicas().stream()
+                .collect(Collectors.groupingBy(replica -> 
Env.getCurrentSystemInfo()
+                
.getBackend(replica.getBackendIdWithoutException()).getLocationTag()));
+    }
+
+    private void sortReplicasByBackendCount(List<Replica> replicas,
+                                            Map<Long, Long> 
tableBeToReplicaNumMap,
+                                            Map<Long, Long> 
partitionBeToReplicaNumMap) {
+        replicas.sort((Replica r1, Replica r2) -> {
+            long countPartition1 = 
partitionBeToReplicaNumMap.getOrDefault(r1.getBackendIdWithoutException(), 0L);
+            long countPartition2 = 
partitionBeToReplicaNumMap.getOrDefault(r2.getBackendIdWithoutException(), 0L);
+            if (countPartition1 != countPartition2) {
+                return Long.compare(countPartition2, countPartition1);
+            }
+            long countTable1 = 
tableBeToReplicaNumMap.getOrDefault(r1.getBackendIdWithoutException(), 0L);
+            long countTable2 = 
tableBeToReplicaNumMap.getOrDefault(r2.getBackendIdWithoutException(), 0L);
+            return Long.compare(countTable2, countTable1); // desc sort
+        });
+    }
+
+    private void markReplicasForDropping(List<Replica> replicas, int 
replicasToDrop,
+                                  Map<Long, Long> tableBeToReplicaNumMap,
+                                  Map<Long, Long> partitionBeToReplicaNumMap) {
+        for (int i = 0; i < replicas.size(); i++) {
+            Replica r = replicas.get(i);
+            long beId = r.getBackendIdWithoutException();
+            if (i >= replicasToDrop) {
+                r.setScaleInDropTimeStamp(-1); // Mark for not dropping
+            } else {
+                r.setScaleInDropTimeStamp(System.currentTimeMillis()); // Mark 
for dropping
+                tableBeToReplicaNumMap.put(beId, 
tableBeToReplicaNumMap.get(beId) - 1);
+                partitionBeToReplicaNumMap.put(beId, 
partitionBeToReplicaNumMap.get(beId) - 1);
+            }
+        }
+    }
+
+    public static Map<Long, Long> getReplicaCountByBackend(Partition 
partition) {
+        return 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE).stream()
+                .flatMap(index -> index.getTablets().stream())
+                .flatMap(tablet -> tablet.getBackendIds().stream())
+                .collect(Collectors.groupingBy(id -> id, 
Collectors.counting()));
+    }
+
     public void checkNoForceProperty(Map<String, String> properties) throws 
DdlException {
         for (RewriteProperty property : 
PropertyAnalyzer.getInstance().getForceProperties()) {
             if (properties.containsKey(property.key())) {
                 throw new DdlException("Cann't modify property '" + 
property.key() + "'"
-                        + (Config.isCloudMode() ? " in cloud mode" : "") + 
".");
+                    + (Config.isCloudMode() ? " in cloud mode" : "") + ".");
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index 22f7128abe3..868b2f199a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -197,6 +197,8 @@ public class Replica {
 
     private long userDropTime = -1;
 
+    private long scaleInDropTime = -1;
+
     private long lastReportVersion = 0;
 
     public Replica() {
@@ -862,6 +864,22 @@ public class Replica {
         return false;
     }
 
+    public void setScaleInDropTimeStamp(long scaleInDropTime) {
+        this.scaleInDropTime = scaleInDropTime;
+    }
+
+    public boolean isScaleInDrop() {
+        if (this.scaleInDropTime > 0) {
+            if (System.currentTimeMillis() - this.scaleInDropTime
+                    < Config.manual_drop_replica_valid_second * 1000L) {
+                return true;
+            }
+            this.scaleInDropTime = -1;
+        }
+        return false;
+    }
+
+
     public boolean isAlive() {
         return getState() != ReplicaState.CLONE
                 && getState() != ReplicaState.DECOMMISSION
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index dc07ddb0be4..da13d5c61c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -900,6 +900,7 @@ public class TabletScheduler extends MasterDaemon {
                 || deleteReplicaNotInValidTag(tabletCtx, force)
                 || deleteReplicaChosenByRebalancer(tabletCtx, force)
                 || deleteReplicaOnUrgentHighDisk(tabletCtx, force)
+                || deleteFromScaleInDropReplicas(tabletCtx, force)
                 || deleteReplicaOnHighLoadBackend(tabletCtx, force)) {
             // if we delete at least one redundant replica, we still throw a 
SchedException with status FINISHED
             // to remove this tablet from the pendingTablets(consider it as 
finished)
@@ -1089,6 +1090,17 @@ public class TabletScheduler extends MasterDaemon {
         return deleteFromHighLoadBackend(tabletCtx, tabletCtx.getReplicas(), 
force, statistic);
     }
 
+    private boolean deleteFromScaleInDropReplicas(TabletSchedCtx tabletCtx, 
boolean force) throws SchedException {
+        // Check if there are any scale drop replicas
+        for (Replica replica : tabletCtx.getReplicas()) {
+            if (replica.isScaleInDrop()) {
+                deleteReplicaInternal(tabletCtx, replica, "scale drop 
replica", force);
+                return true;
+            }
+        }
+        return false;
+    }
+
     private boolean deleteFromHighLoadBackend(TabletSchedCtx tabletCtx, 
List<Replica> replicas,
             boolean force, LoadStatisticForTag statistic) throws 
SchedException {
         Replica chosenReplica = null;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/DecreaseReplicationNumTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DecreaseReplicationNumTest.java
new file mode 100644
index 00000000000..35054f822ef
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DecreaseReplicationNumTest.java
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.alter.Alter;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DecreaseReplicationNumTest extends TestWithFeService {
+
+    private Database db;
+
+    @Override
+    protected void beforeCreatingConnectContext() throws Exception {
+        Config.enable_debug_points = true;
+        Config.disable_balance = true;
+        Config.drop_backend_after_decommission = false;
+        Config.tablet_schedule_interval_ms = 1000L;
+        Config.tablet_checker_interval_ms = 1000L;
+    }
+
+    @Override
+    protected int backendNum() {
+        return 5;
+    }
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        Thread.sleep(1000);
+        createDatabase("test");
+        useDatabase("test");
+        db = Env.getCurrentInternalCatalog().getDbOrMetaException("test");
+    }
+
+    @Override
+    protected void runBeforeEach() throws Exception {
+        // set back to default value
+        Config.max_scheduling_tablets = 2000;
+        for (Table table : db.getTables()) {
+            dropTable(table.getName(), true);
+        }
+        Env.getCurrentEnv().getTabletScheduler().clear();
+        DebugPointUtil.clearDebugPoints();
+        
Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG)));
+    }
+
+
+    @Test
+    public void testDecreaseReplicaNum() throws Exception {
+        createTable("CREATE TABLE tbl1 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 
10"
+                + " PROPERTIES ('replication_num' = '5')");
+
+        OlapTable table = (OlapTable) db.getTableOrMetaException("tbl1");
+        Partition partition = table.getPartitions().iterator().next();
+        Map<Long, Long> beIdToTabletNum = 
Alter.getReplicaCountByBackend(partition);
+        Assertions.assertEquals(5, beIdToTabletNum.size());
+        Assertions.assertEquals(Lists.newArrayList(10L, 10L, 10L, 10L, 10L), 
new ArrayList<>(beIdToTabletNum.values()));
+        beIdToTabletNum.forEach((key, value) -> Assertions.assertEquals(value, 
10L));
+
+        List<Backend> backends = 
Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG);
+        Assertions.assertEquals(backendNum(), backends.size());
+        Backend highLoadBe = backends.get(0);
+        DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID", 
highLoadBe.getId());
+
+        alterTableSync("ALTER TABLE tbl1 MODIFY PARTITION(*) SET 
('replication_num' = '3')");
+        boolean succ = false;
+        for (int i = 0; i < 100; i++) {
+            beIdToTabletNum = Alter.getReplicaCountByBackend(partition);
+            Set<Long> afterAlter = new HashSet<>(beIdToTabletNum.values());
+            // wait for scheduler
+            if (afterAlter.size() == 1 && !beIdToTabletNum.containsValue(10L)) 
{
+                Assertions.assertTrue(afterAlter.contains(6L));
+                Assertions.assertEquals(Lists.newArrayList(6L, 6L, 6L, 6L, 
6L), new ArrayList<>(beIdToTabletNum.values()));
+                succ = true;
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        Assertions.assertTrue(succ);
+    }
+
+    @Test
+    public void testDecreaseMultiPartitionReplicaNum() throws Exception {
+        createTable("create table test_multi(id int, part int) "
+                + "partition by range(part) ("
+                + "  partition p1 values[('1'), ('2')),"
+                + "  partition p2 values[('2'), ('3')),"
+                + "  partition p3 values[('3'), ('4'))"
+                + ") "
+                + "distributed by hash(id) BUCKETS 9 "
+                + "properties ('replication_num'='4')");
+
+        OlapTable table = (OlapTable) db.getTableOrMetaException("test_multi");
+        List<Partition> partitions = table.getAllPartitions();
+        partitions.forEach(p -> {
+            Map<Long, Long> beIdToTabletNum = 
Alter.getReplicaCountByBackend(p);
+            Assertions.assertEquals(5, beIdToTabletNum.size());
+            List<Long> sortedValues = new 
ArrayList<>(beIdToTabletNum.values());
+            sortedValues.sort(Collections.reverseOrder());
+            Assertions.assertEquals(Lists.newArrayList(8L, 7L, 7L, 7L, 7L), 
sortedValues);
+        });
+
+        List<Backend> backends = 
Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG);
+        Assertions.assertEquals(backendNum(), backends.size());
+        Backend highLoadBe = backends.get(0);
+        DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID", 
highLoadBe.getId());
+
+        alterTableSync("ALTER TABLE test_multi MODIFY PARTITION(*) SET 
('replication_num' = '2')");
+        partitions.forEach(p -> {
+            boolean succ = false;
+            for (int i = 0; i < 100; i++) {
+                Map<Long, Long> beIdToTabletNum = 
Alter.getReplicaCountByBackend(p);
+                Set<Long> afterAlter = new HashSet<>(beIdToTabletNum.values());
+                List<Long> sortedValues = new 
ArrayList<>(beIdToTabletNum.values());
+                sortedValues.sort(Collections.reverseOrder());
+                // wait for scheduler
+                if (afterAlter.size() == 2 && 
beIdToTabletNum.containsValue(4L) && beIdToTabletNum.containsValue(3L)) {
+                    Assertions.assertEquals(Lists.newArrayList(4L, 4L, 4L, 3L, 
3L), sortedValues);
+                    succ = true;
+                    break;
+                }
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignored) {
+                    System.out.println(ignored);
+                }
+            }
+            Assertions.assertTrue(succ);
+        });
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to