This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b08865982a6 [improvement](disk balance) Impr disk rebalancer sched 
when be load r… (#26412)
b08865982a6 is described below

commit b08865982a61737ebaa0c9a17608fe464186873b
Author: deardeng <565620...@qq.com>
AuthorDate: Sat Nov 25 23:35:16 2023 +0800

    [improvement](disk balance) Impr disk rebalancer sched when be load r… 
(#26412)
---
 .../java/org/apache/doris/catalog/Replica.java     |   5 +
 .../org/apache/doris/clone/BeLoadRebalancer.java   |   1 +
 .../org/apache/doris/clone/DiskRebalancer.java     |  20 +--
 .../apache/doris/clone/PartitionRebalancer.java    |  12 --
 .../java/org/apache/doris/clone/Rebalancer.java    |  28 +++-
 .../org/apache/doris/clone/TabletScheduler.java    |   3 +
 .../org/apache/doris/catalog/CatalogTestUtil.java  |  46 +++++++
 .../doris/clone/DiskReblanceWhenSchedulerIdle.java | 151 +++++++++++++++++++++
 .../apache/doris/utframe/MockedBackendFactory.java |  37 ++++-
 9 files changed, 278 insertions(+), 25 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index 434c8854b30..94608308208 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -220,6 +220,11 @@ public class Replica implements Writable {
         return this.backendId;
     }
 
+    // just for ut
+    public void setBackendId(long backendId) {
+        this.backendId = backendId;
+    }
+
     public long getDataSize() {
         return dataSize;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 82079946e0e..e7b6211bd79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -372,4 +372,5 @@ public class BeLoadRebalancer extends Rebalancer {
         throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
                 "beload waiting for dest backend slot");
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index d4ad8769fb7..3ab069d09ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -30,7 +30,6 @@ import org.apache.doris.clone.SchedException.SubCode;
 import org.apache.doris.clone.TabletSchedCtx.BalanceType;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
@@ -46,7 +45,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 /*
 
  * This DiskBalancer is different from other Balancers which takes care of 
cluster-wide data balancing.
@@ -129,14 +127,16 @@ public class DiskRebalancer extends Rebalancer {
         List<BackendLoadStatistic> highBEs = Lists.newArrayList();
         clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, 
medium);
 
-        if (Config.tablet_rebalancer_type.equalsIgnoreCase("partition")) {
-            PartitionRebalancer rebalancer = (PartitionRebalancer) 
Env.getCurrentEnv()
-                    .getTabletScheduler().getRebalancer();
-            if (rebalancer != null && rebalancer.checkCacheEmptyForLong()) {
-                midBEs.addAll(lowBEs);
-                midBEs.addAll(highBEs);
-            }
-        } else if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
+        Rebalancer rebalancer = FeConstants.runningUnitTest ? null
+                : Env.getCurrentEnv().getTabletScheduler().getRebalancer();
+        if (rebalancer != null && 
rebalancer.unPickOverLongTime(clusterStat.getTag(), medium)) {
+            midBEs.addAll(lowBEs);
+            midBEs.addAll(highBEs);
+            lowBEs.clear();
+            highBEs.clear();
+        }
+
+        if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
             // the cluster is not balanced
             if (prioBackends.isEmpty()) {
                 LOG.info("cluster is not balanced with medium: {}. skip", 
medium);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 141863b00d3..a730170b364 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -66,8 +66,6 @@ public class PartitionRebalancer extends Rebalancer {
     private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
     private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
 
-    private long cacheEmptyTimestamp = -1L;
-
     public PartitionRebalancer(SystemInfoService infoService, 
TabletInvertedIndex invertedIndex,
             Map<Long, PathSlot> backendsWorkingSlots) {
         super(infoService, invertedIndex, backendsWorkingSlots);
@@ -234,11 +232,6 @@ public class PartitionRebalancer extends Rebalancer {
         return !bes.contains(move.fromBe) && bes.contains(move.toBe);
     }
 
-    // cache empty for 10 min
-    public boolean checkCacheEmptyForLong() {
-        return cacheEmptyTimestamp > 0 && System.currentTimeMillis() > 
cacheEmptyTimestamp + 10 * 60 * 1000L;
-    }
-
     @Override
     protected void completeSchedCtx(TabletSchedCtx tabletCtx)
             throws SchedException {
@@ -331,11 +324,6 @@ public class PartitionRebalancer extends Rebalancer {
         movesCacheMap.updateMapping(statisticMap, 
Config.partition_rebalance_move_expire_after_access);
         // Perform cache maintenance
         movesCacheMap.maintain();
-        if (movesCacheMap.size() > 0) {
-            cacheEmptyTimestamp = -1;
-        } else if (cacheEmptyTimestamp < 0) {
-            cacheEmptyTimestamp = System.currentTimeMillis();
-        }
         LOG.debug("Move succeeded/total :{}/{}, current {}",
                 counterBalanceMoveSucceeded.get(), 
counterBalanceMoveCreated.get(), movesCacheMap);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index f339418055b..8f6b1d229f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -25,8 +25,12 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.thrift.TStorageMedium;
 
+import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
@@ -46,6 +50,7 @@ import java.util.Map;
  * 2. If you want to make sure the move is succeed, you can assume that it's 
succeed when getToDeleteReplicaId called.
  */
 public abstract class Rebalancer {
+    private static final Logger LOG = LogManager.getLogger(Rebalancer.class);
     // When Rebalancer init, the statisticMap is usually empty. So it's no 
need to be an arg.
     // Only use updateLoadStatistic() to load stats.
     protected Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
@@ -55,6 +60,14 @@ public abstract class Rebalancer {
     // be id -> end time of prio
     protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
 
+    // tag -> (medium, timestamp)
+    private Table<Tag, TStorageMedium, Long> lastPickTimeTable = 
HashBasedTable.create();
+
+    // for ut
+    public Table<Tag, TStorageMedium, Long> getLastPickTimeTable() {
+        return lastPickTimeTable;
+    }
+
     public Rebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex,
             Map<Long, PathSlot> backendsWorkingSlots) {
         this.infoService = infoService;
@@ -66,7 +79,12 @@ public abstract class Rebalancer {
         List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
         for (Map.Entry<Tag, LoadStatisticForTag> entry : 
statisticMap.entrySet()) {
             for (TStorageMedium medium : TStorageMedium.values()) {
-                
alternativeTablets.addAll(selectAlternativeTabletsForCluster(entry.getValue(), 
medium));
+                List<TabletSchedCtx> candidates =
+                        selectAlternativeTabletsForCluster(entry.getValue(), 
medium);
+                alternativeTablets.addAll(candidates);
+                if (!candidates.isEmpty()) {
+                    lastPickTimeTable.put(entry.getKey(), medium, 
System.currentTimeMillis());
+                }
             }
         }
         return alternativeTablets;
@@ -77,6 +95,14 @@ public abstract class Rebalancer {
     protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
             LoadStatisticForTag clusterStat, TStorageMedium medium);
 
+    // 5mins
+    protected boolean unPickOverLongTime(Tag tag, TStorageMedium medium) {
+        Long lastPickTime = lastPickTimeTable.get(tag, medium);
+        Long now = System.currentTimeMillis();
+        LOG.debug("tag={}, medium={}, lastPickTime={}, now={}", tag, medium, 
lastPickTime, now);
+        return lastPickTime == null || now - lastPickTime >= 5 * 60 * 1000L;
+    }
+
     public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
             throws SchedException {
         completeSchedCtx(tabletCtx);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 77b2d44055c..59c62f71906 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -171,6 +171,7 @@ public class TabletScheduler extends MasterDaemon {
         return stat;
     }
 
+    // just return be or partition rebalancer
     public Rebalancer getRebalancer() {
         return rebalancer;
     }
@@ -274,6 +275,8 @@ public class TabletScheduler extends MasterDaemon {
         return AddResult.ADDED;
     }
 
+
+
     public synchronized boolean containsTablet(long tabletId) {
         return allTabletTypes.containsKey(tabletId);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index a05c63b812f..d1cdeba0e3f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -374,4 +374,50 @@ public class CatalogTestUtil {
             olapTable.readUnlock();
         }
     }
+
+    public static long getReplicaPathHash(long tabletId, long backendId) {
+        Env env = Env.getCurrentEnv();
+        TabletInvertedIndex invertedIndex = env.getTabletInvertedIndex();
+        TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
+        if (tabletMeta == null) {
+            return -1L;
+        }
+
+        long dbId = tabletMeta.getDbId();
+        long tableId = tabletMeta.getTableId();
+        long partitionId = tabletMeta.getPartitionId();
+        long indexId = tabletMeta.getIndexId();
+        Database db = env.getInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            return -1L;
+        }
+        Table table = db.getTableNullable(tableId);
+        if (table == null) {
+            return -1L;
+        }
+        if (table.getType() != Table.TableType.OLAP) {
+            return -1L;
+        }
+        OlapTable olapTable = (OlapTable) table;
+        olapTable.readLock();
+        try {
+            Partition partition = olapTable.getPartition(partitionId);
+            if (partition == null) {
+                return -1L;
+            }
+            MaterializedIndex materializedIndex = partition.getIndex(indexId);
+            if (materializedIndex == null) {
+                return -1L;
+            }
+            Tablet tablet = materializedIndex.getTablet(tabletId);
+            for (Replica replica : tablet.getReplicas()) {
+                if (replica.getBackendId() == backendId) {
+                    return replica.getPathHash();
+                }
+            }
+        } finally {
+            olapTable.readUnlock();
+        }
+        return -1;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
new file mode 100644
index 00000000000..028a07941c9
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class DiskReblanceWhenSchedulerIdle extends TestWithFeService {
+
+    @Override
+    protected void beforeCreatingConnectContext() throws Exception {
+        Config.enable_round_robin_create_tablet = true;
+        Config.allow_replica_on_same_host = true;
+        Config.tablet_checker_interval_ms = 100;
+        Config.tablet_schedule_interval_ms = 100;
+        Config.schedule_slot_num_per_hdd_path = 1;
+        Config.disable_balance = true;
+        Config.enable_debug_points = true;
+    }
+
+    @Override
+    protected int backendNum() {
+        return 2;
+    }
+
+    @Test
+    public void testDiskReblanceWhenSchedulerIdle() throws Exception {
+        // case start
+        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+        List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+        Assertions.assertEquals(backendNum(), backends.size());
+        for (Backend be : backends) {
+            Assertions.assertEquals(0, 
invertedIndex.getTabletNumByBackendId(be.getId()));
+        }
+
+        long totalCapacity = 10L << 30;
+
+        for (int i = 0; i < backends.size(); i++) {
+            Map<String, DiskInfo> disks = Maps.newHashMap();
+            for (int j = 0; j < 2; j++) {
+                DiskInfo diskInfo = new DiskInfo("be_" + i + "_disk_" + j);
+                diskInfo.setTotalCapacityB(totalCapacity);
+                diskInfo.setDataUsedCapacityB(1L << 30);
+                diskInfo.setAvailableCapacityB(9L << 30);
+                diskInfo.setPathHash((1000L * (i + 1)) + 10 * j);
+                disks.put(diskInfo.getRootPath(), diskInfo);
+            }
+            backends.get(i).setDisks(ImmutableMap.copyOf(disks));
+        }
+        Backend be0 = backends.get(0);
+
+        createDatabase("test");
+        createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) "
+                + " BUCKETS 4 PROPERTIES ( \"replication_num\" = \"1\","
+                + " \"storage_medium\" = \"HDD\")");
+
+        Assertions.assertEquals(2, 
invertedIndex.getTabletNumByBackendId(backends.get(0).getId()));
+        Assertions.assertEquals(2, 
invertedIndex.getTabletNumByBackendId(backends.get(1).getId()));
+
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test");
+        OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
+        Assertions.assertNotNull(tbl);
+        Partition partition = tbl.getPartitions().iterator().next();
+        List<Tablet> tablets = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+                .getTablets();
+
+        DiskInfo diskInfo0 = be0.getDisks().values().asList().get(0);
+        DiskInfo diskInfo1 = be0.getDisks().values().asList().get(1);
+
+        tablets.forEach(tablet -> {
+            Lists.newArrayList(tablet.getReplicas()).forEach(
+                    replica -> {
+                    if (replica.getBackendId() == backends.get(1).getId()) {
+                        replica.updateStat(totalCapacity / 4, 1);
+                        tablet.deleteReplica(replica);
+                        replica.setBackendId(backends.get(0).getId());
+                        replica.setPathHash(diskInfo0.getPathHash());
+                        tablet.addReplica(replica);
+                    } else {
+                        replica.setPathHash(diskInfo0.getPathHash());
+                    }
+                }
+            );
+        });
+
+        diskInfo0.setAvailableCapacityB(0L);
+        diskInfo1.setAvailableCapacityB(totalCapacity);
+        DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID", 
backends.get(1).getId());
+
+        Table<Tag, TStorageMedium, Long> lastPickTimeTable = 
Env.getCurrentEnv().getTabletScheduler().getRebalancer().getLastPickTimeTable();
+        lastPickTimeTable.put(Tag.DEFAULT_BACKEND_TAG, TStorageMedium.HDD, 0L);
+        Config.disable_balance = false;
+
+
+        try {
+            Thread.sleep(1000L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        Map<Long, Integer> gotDiskTabletNums = Maps.newHashMap();
+        tablets.forEach(tablet -> tablet.getReplicas().forEach(replica -> {
+            gotDiskTabletNums.put(replica.getPathHash(), 1 + 
gotDiskTabletNums.getOrDefault(replica.getPathHash(), 0));
+        }));
+
+
+        Map<Long, Integer> expectTabletNums = Maps.newHashMap();
+        expectTabletNums.put(diskInfo0.getPathHash(), 2);
+        expectTabletNums.put(diskInfo1.getPathHash(), 2);
+
+        Assertions.assertEquals(expectTabletNums, gotDiskTabletNums);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index b2184a5bad0..f96a85253fd 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -19,6 +19,7 @@ package org.apache.doris.utframe;
 
 import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.DiskInfo.DiskState;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
@@ -26,7 +27,7 @@ import org.apache.doris.proto.PBackendServiceGrpc;
 import org.apache.doris.proto.Types;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.BackendService;
-import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.FrontendService.Client;
 import org.apache.doris.thrift.HeartbeatService;
 import org.apache.doris.thrift.TAgentPublishRequest;
 import org.apache.doris.thrift.TAgentResult;
@@ -64,6 +65,7 @@ import org.apache.doris.thrift.TScanOpenResult;
 import org.apache.doris.thrift.TSnapshotRequest;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStorageMediumMigrateReq;
 import org.apache.doris.thrift.TStreamLoadRecordResult;
 import org.apache.doris.thrift.TTabletInfo;
 import org.apache.doris.thrift.TTabletStatResult;
@@ -171,7 +173,7 @@ public class MockedBackendFactory {
                 public void run() {
                     while (true) {
                         boolean ok = false;
-                        FrontendService.Client client = null;
+                        Client client = null;
                         TNetworkAddress address = null;
                         try {
                             // ATTR: backend.getFeAddress must after 
taskQueue.take, because fe addr thread race
@@ -197,6 +199,9 @@ public class MockedBackendFactory {
                                 case CLONE:
                                     handleCloneTablet(request, 
finishTaskRequest);
                                     break;
+                                case STORAGE_MEDIUM_MIGRATE:
+                                    handleStorageMediumMigrate(request, 
finishTaskRequest);
+                                    break;
                                 default:
                                     break;
                             }
@@ -252,6 +257,34 @@ public class MockedBackendFactory {
                     finishTaskRequest.setFinishTabletInfos(tabletInfos);
                 }
 
+                private void handleStorageMediumMigrate(TAgentTaskRequest 
request, TFinishTaskRequest finishTaskRequest) {
+                    TStorageMediumMigrateReq req = 
request.getStorageMediumMigrateReq();
+                    long dataSize = Math.max(1, 
CatalogTestUtil.getTabletDataSize(req.tablet_id));
+
+                    long srcDataPath = 
CatalogTestUtil.getReplicaPathHash(req.tablet_id, backendInFe.getId());
+                    DiskInfo srcDiskInfo = getDisk(srcDataPath);
+                    if (srcDiskInfo != null) {
+                        
srcDiskInfo.setDataUsedCapacityB(Math.min(srcDiskInfo.getTotalCapacityB(),
+                                srcDiskInfo.getDataUsedCapacityB() - 
dataSize));
+                        srcDiskInfo.setAvailableCapacityB(Math.max(0L,
+                                srcDiskInfo.getAvailableCapacityB() + 
dataSize));
+                        srcDiskInfo.setState(DiskState.ONLINE);
+                    }
+
+                    DiskInfo destDiskInfo = getDisk(req.data_dir);
+                    if (destDiskInfo != null) {
+                        
destDiskInfo.setDataUsedCapacityB(Math.min(destDiskInfo.getTotalCapacityB(),
+                                destDiskInfo.getDataUsedCapacityB() + 
dataSize));
+                        destDiskInfo.setAvailableCapacityB(Math.max(0L,
+                                destDiskInfo.getAvailableCapacityB() - 
dataSize));
+                        destDiskInfo.setState(DiskState.ONLINE);
+                    }
+                }
+
+                private DiskInfo getDisk(String dataDir) {
+                    return backendInFe.getDisks().get(dataDir);
+                }
+
                 private DiskInfo getDisk(long pathHash) {
                     DiskInfo diskInfo = null;
                     for (DiskInfo tmpDiskInfo : 
backendInFe.getDisks().values()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to