This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b08865982a6 [improvement](disk balance) Impr disk rebalancer sched when be load r… (#26412) b08865982a6 is described below commit b08865982a61737ebaa0c9a17608fe464186873b Author: deardeng <565620...@qq.com> AuthorDate: Sat Nov 25 23:35:16 2023 +0800 [improvement](disk balance) Impr disk rebalancer sched when be load r… (#26412) --- .../java/org/apache/doris/catalog/Replica.java | 5 + .../org/apache/doris/clone/BeLoadRebalancer.java | 1 + .../org/apache/doris/clone/DiskRebalancer.java | 20 +-- .../apache/doris/clone/PartitionRebalancer.java | 12 -- .../java/org/apache/doris/clone/Rebalancer.java | 28 +++- .../org/apache/doris/clone/TabletScheduler.java | 3 + .../org/apache/doris/catalog/CatalogTestUtil.java | 46 +++++++ .../doris/clone/DiskReblanceWhenSchedulerIdle.java | 151 +++++++++++++++++++++ .../apache/doris/utframe/MockedBackendFactory.java | 37 ++++- 9 files changed, 278 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 434c8854b30..94608308208 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -220,6 +220,11 @@ public class Replica implements Writable { return this.backendId; } + // just for ut + public void setBackendId(long backendId) { + this.backendId = backendId; + } + public long getDataSize() { return dataSize; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 82079946e0e..e7b6211bd79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -372,4 +372,5 @@ public class BeLoadRebalancer extends Rebalancer { throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, "beload waiting for dest backend slot"); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index d4ad8769fb7..3ab069d09ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -30,7 +30,6 @@ import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.BalanceType; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; -import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -46,7 +45,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; - /* * This DiskBalancer is different from other Balancers which takes care of cluster-wide data balancing. @@ -129,14 +127,16 @@ public class DiskRebalancer extends Rebalancer { List<BackendLoadStatistic> highBEs = Lists.newArrayList(); clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); - if (Config.tablet_rebalancer_type.equalsIgnoreCase("partition")) { - PartitionRebalancer rebalancer = (PartitionRebalancer) Env.getCurrentEnv() - .getTabletScheduler().getRebalancer(); - if (rebalancer != null && rebalancer.checkCacheEmptyForLong()) { - midBEs.addAll(lowBEs); - midBEs.addAll(highBEs); - } - } else if (!(lowBEs.isEmpty() && highBEs.isEmpty())) { + Rebalancer rebalancer = FeConstants.runningUnitTest ? null + : Env.getCurrentEnv().getTabletScheduler().getRebalancer(); + if (rebalancer != null && rebalancer.unPickOverLongTime(clusterStat.getTag(), medium)) { + midBEs.addAll(lowBEs); + midBEs.addAll(highBEs); + lowBEs.clear(); + highBEs.clear(); + } + + if (!(lowBEs.isEmpty() && highBEs.isEmpty())) { // the cluster is not balanced if (prioBackends.isEmpty()) { LOG.info("cluster is not balanced with medium: {}. skip", medium); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index 141863b00d3..a730170b364 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -66,8 +66,6 @@ public class PartitionRebalancer extends Rebalancer { private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0); private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0); - private long cacheEmptyTimestamp = -1L; - public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, Map<Long, PathSlot> backendsWorkingSlots) { super(infoService, invertedIndex, backendsWorkingSlots); @@ -234,11 +232,6 @@ public class PartitionRebalancer extends Rebalancer { return !bes.contains(move.fromBe) && bes.contains(move.toBe); } - // cache empty for 10 min - public boolean checkCacheEmptyForLong() { - return cacheEmptyTimestamp > 0 && System.currentTimeMillis() > cacheEmptyTimestamp + 10 * 60 * 1000L; - } - @Override protected void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { @@ -331,11 +324,6 @@ public class PartitionRebalancer extends Rebalancer { movesCacheMap.updateMapping(statisticMap, Config.partition_rebalance_move_expire_after_access); // Perform cache maintenance movesCacheMap.maintain(); - if (movesCacheMap.size() > 0) { - cacheEmptyTimestamp = -1; - } else if (cacheEmptyTimestamp < 0) { - cacheEmptyTimestamp = System.currentTimeMillis(); - } LOG.debug("Move succeeded/total :{}/{}, current {}", counterBalanceMoveSucceeded.get(), counterBalanceMoveCreated.get(), movesCacheMap); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java index f339418055b..8f6b1d229f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java @@ -25,8 +25,12 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTask; import org.apache.doris.thrift.TStorageMedium; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; @@ -46,6 +50,7 @@ import java.util.Map; * 2. If you want to make sure the move is succeed, you can assume that it's succeed when getToDeleteReplicaId called. */ public abstract class Rebalancer { + private static final Logger LOG = LogManager.getLogger(Rebalancer.class); // When Rebalancer init, the statisticMap is usually empty. So it's no need to be an arg. // Only use updateLoadStatistic() to load stats. protected Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap(); @@ -55,6 +60,14 @@ public abstract class Rebalancer { // be id -> end time of prio protected Map<Long, Long> prioBackends = Maps.newConcurrentMap(); + // tag -> (medium, timestamp) + private Table<Tag, TStorageMedium, Long> lastPickTimeTable = HashBasedTable.create(); + + // for ut + public Table<Tag, TStorageMedium, Long> getLastPickTimeTable() { + return lastPickTimeTable; + } + public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, Map<Long, PathSlot> backendsWorkingSlots) { this.infoService = infoService; @@ -66,7 +79,12 @@ public abstract class Rebalancer { List<TabletSchedCtx> alternativeTablets = Lists.newArrayList(); for (Map.Entry<Tag, LoadStatisticForTag> entry : statisticMap.entrySet()) { for (TStorageMedium medium : TStorageMedium.values()) { - alternativeTablets.addAll(selectAlternativeTabletsForCluster(entry.getValue(), medium)); + List<TabletSchedCtx> candidates = + selectAlternativeTabletsForCluster(entry.getValue(), medium); + alternativeTablets.addAll(candidates); + if (!candidates.isEmpty()) { + lastPickTimeTable.put(entry.getKey(), medium, System.currentTimeMillis()); + } } } return alternativeTablets; @@ -77,6 +95,14 @@ public abstract class Rebalancer { protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster( LoadStatisticForTag clusterStat, TStorageMedium medium); + // 5mins + protected boolean unPickOverLongTime(Tag tag, TStorageMedium medium) { + Long lastPickTime = lastPickTimeTable.get(tag, medium); + Long now = System.currentTimeMillis(); + LOG.debug("tag={}, medium={}, lastPickTime={}, now={}", tag, medium, lastPickTime, now); + return lastPickTime == null || now - lastPickTime >= 5 * 60 * 1000L; + } + public AgentTask createBalanceTask(TabletSchedCtx tabletCtx) throws SchedException { completeSchedCtx(tabletCtx); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 77b2d44055c..59c62f71906 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -171,6 +171,7 @@ public class TabletScheduler extends MasterDaemon { return stat; } + // just return be or partition rebalancer public Rebalancer getRebalancer() { return rebalancer; } @@ -274,6 +275,8 @@ public class TabletScheduler extends MasterDaemon { return AddResult.ADDED; } + + public synchronized boolean containsTablet(long tabletId) { return allTabletTypes.containsKey(tabletId); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index a05c63b812f..d1cdeba0e3f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -374,4 +374,50 @@ public class CatalogTestUtil { olapTable.readUnlock(); } } + + public static long getReplicaPathHash(long tabletId, long backendId) { + Env env = Env.getCurrentEnv(); + TabletInvertedIndex invertedIndex = env.getTabletInvertedIndex(); + TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); + if (tabletMeta == null) { + return -1L; + } + + long dbId = tabletMeta.getDbId(); + long tableId = tabletMeta.getTableId(); + long partitionId = tabletMeta.getPartitionId(); + long indexId = tabletMeta.getIndexId(); + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + return -1L; + } + Table table = db.getTableNullable(tableId); + if (table == null) { + return -1L; + } + if (table.getType() != Table.TableType.OLAP) { + return -1L; + } + OlapTable olapTable = (OlapTable) table; + olapTable.readLock(); + try { + Partition partition = olapTable.getPartition(partitionId); + if (partition == null) { + return -1L; + } + MaterializedIndex materializedIndex = partition.getIndex(indexId); + if (materializedIndex == null) { + return -1L; + } + Tablet tablet = materializedIndex.getTablet(tabletId); + for (Replica replica : tablet.getReplicas()) { + if (replica.getBackendId() == backendId) { + return replica.getPathHash(); + } + } + } finally { + olapTable.readUnlock(); + } + return -1; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java new file mode 100644 index 00000000000..028a07941c9 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +public class DiskReblanceWhenSchedulerIdle extends TestWithFeService { + + @Override + protected void beforeCreatingConnectContext() throws Exception { + Config.enable_round_robin_create_tablet = true; + Config.allow_replica_on_same_host = true; + Config.tablet_checker_interval_ms = 100; + Config.tablet_schedule_interval_ms = 100; + Config.schedule_slot_num_per_hdd_path = 1; + Config.disable_balance = true; + Config.enable_debug_points = true; + } + + @Override + protected int backendNum() { + return 2; + } + + @Test + public void testDiskReblanceWhenSchedulerIdle() throws Exception { + // case start + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends(); + Assertions.assertEquals(backendNum(), backends.size()); + for (Backend be : backends) { + Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(be.getId())); + } + + long totalCapacity = 10L << 30; + + for (int i = 0; i < backends.size(); i++) { + Map<String, DiskInfo> disks = Maps.newHashMap(); + for (int j = 0; j < 2; j++) { + DiskInfo diskInfo = new DiskInfo("be_" + i + "_disk_" + j); + diskInfo.setTotalCapacityB(totalCapacity); + diskInfo.setDataUsedCapacityB(1L << 30); + diskInfo.setAvailableCapacityB(9L << 30); + diskInfo.setPathHash((1000L * (i + 1)) + 10 * j); + disks.put(diskInfo.getRootPath(), diskInfo); + } + backends.get(i).setDisks(ImmutableMap.copyOf(disks)); + } + Backend be0 = backends.get(0); + + createDatabase("test"); + createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) " + + " BUCKETS 4 PROPERTIES ( \"replication_num\" = \"1\"," + + " \"storage_medium\" = \"HDD\")"); + + Assertions.assertEquals(2, invertedIndex.getTabletNumByBackendId(backends.get(0).getId())); + Assertions.assertEquals(2, invertedIndex.getTabletNumByBackendId(backends.get(1).getId())); + + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test"); + OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1"); + Assertions.assertNotNull(tbl); + Partition partition = tbl.getPartitions().iterator().next(); + List<Tablet> tablets = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() + .getTablets(); + + DiskInfo diskInfo0 = be0.getDisks().values().asList().get(0); + DiskInfo diskInfo1 = be0.getDisks().values().asList().get(1); + + tablets.forEach(tablet -> { + Lists.newArrayList(tablet.getReplicas()).forEach( + replica -> { + if (replica.getBackendId() == backends.get(1).getId()) { + replica.updateStat(totalCapacity / 4, 1); + tablet.deleteReplica(replica); + replica.setBackendId(backends.get(0).getId()); + replica.setPathHash(diskInfo0.getPathHash()); + tablet.addReplica(replica); + } else { + replica.setPathHash(diskInfo0.getPathHash()); + } + } + ); + }); + + diskInfo0.setAvailableCapacityB(0L); + diskInfo1.setAvailableCapacityB(totalCapacity); + DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID", backends.get(1).getId()); + + Table<Tag, TStorageMedium, Long> lastPickTimeTable = Env.getCurrentEnv().getTabletScheduler().getRebalancer().getLastPickTimeTable(); + lastPickTimeTable.put(Tag.DEFAULT_BACKEND_TAG, TStorageMedium.HDD, 0L); + Config.disable_balance = false; + + + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map<Long, Integer> gotDiskTabletNums = Maps.newHashMap(); + tablets.forEach(tablet -> tablet.getReplicas().forEach(replica -> { + gotDiskTabletNums.put(replica.getPathHash(), 1 + gotDiskTabletNums.getOrDefault(replica.getPathHash(), 0)); + })); + + + Map<Long, Integer> expectTabletNums = Maps.newHashMap(); + expectTabletNums.put(diskInfo0.getPathHash(), 2); + expectTabletNums.put(diskInfo1.getPathHash(), 2); + + Assertions.assertEquals(expectTabletNums, gotDiskTabletNums); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index b2184a5bad0..f96a85253fd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -19,6 +19,7 @@ package org.apache.doris.utframe; import org.apache.doris.catalog.CatalogTestUtil; import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.DiskInfo.DiskState; import org.apache.doris.common.ClientPool; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; @@ -26,7 +27,7 @@ import org.apache.doris.proto.PBackendServiceGrpc; import org.apache.doris.proto.Types; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; -import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.FrontendService.Client; import org.apache.doris.thrift.HeartbeatService; import org.apache.doris.thrift.TAgentPublishRequest; import org.apache.doris.thrift.TAgentResult; @@ -64,6 +65,7 @@ import org.apache.doris.thrift.TScanOpenResult; import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TStorageMediumMigrateReq; import org.apache.doris.thrift.TStreamLoadRecordResult; import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TTabletStatResult; @@ -171,7 +173,7 @@ public class MockedBackendFactory { public void run() { while (true) { boolean ok = false; - FrontendService.Client client = null; + Client client = null; TNetworkAddress address = null; try { // ATTR: backend.getFeAddress must after taskQueue.take, because fe addr thread race @@ -197,6 +199,9 @@ public class MockedBackendFactory { case CLONE: handleCloneTablet(request, finishTaskRequest); break; + case STORAGE_MEDIUM_MIGRATE: + handleStorageMediumMigrate(request, finishTaskRequest); + break; default: break; } @@ -252,6 +257,34 @@ public class MockedBackendFactory { finishTaskRequest.setFinishTabletInfos(tabletInfos); } + private void handleStorageMediumMigrate(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { + TStorageMediumMigrateReq req = request.getStorageMediumMigrateReq(); + long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id)); + + long srcDataPath = CatalogTestUtil.getReplicaPathHash(req.tablet_id, backendInFe.getId()); + DiskInfo srcDiskInfo = getDisk(srcDataPath); + if (srcDiskInfo != null) { + srcDiskInfo.setDataUsedCapacityB(Math.min(srcDiskInfo.getTotalCapacityB(), + srcDiskInfo.getDataUsedCapacityB() - dataSize)); + srcDiskInfo.setAvailableCapacityB(Math.max(0L, + srcDiskInfo.getAvailableCapacityB() + dataSize)); + srcDiskInfo.setState(DiskState.ONLINE); + } + + DiskInfo destDiskInfo = getDisk(req.data_dir); + if (destDiskInfo != null) { + destDiskInfo.setDataUsedCapacityB(Math.min(destDiskInfo.getTotalCapacityB(), + destDiskInfo.getDataUsedCapacityB() + dataSize)); + destDiskInfo.setAvailableCapacityB(Math.max(0L, + destDiskInfo.getAvailableCapacityB() - dataSize)); + destDiskInfo.setState(DiskState.ONLINE); + } + } + + private DiskInfo getDisk(String dataDir) { + return backendInFe.getDisks().get(dataDir); + } + private DiskInfo getDisk(long pathHash) { DiskInfo diskInfo = null; for (DiskInfo tmpDiskInfo : backendInFe.getDisks().values()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org