This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 77ec0a23b3e [fix](auto bucket) fix auto buckets calc using the first k partition (#41675) 77ec0a23b3e is described below commit 77ec0a23b3ec2ff5fee0bceae212da6f52557d82 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Mon Oct 14 21:58:24 2024 +0800 [fix](auto bucket) fix auto buckets calc using the first k partition (#41675) If the first k (at most 7) partition data size is ascending, the result will be partion_size[k-1] + ema(first k partitons delta). This is a bug, should use the last k partitions, but not the first k partitions to calculate. --- .../org/apache/doris/clone/BeLoadRebalancer.java | 10 ++++- .../doris/clone/DynamicPartitionScheduler.java | 15 +++---- .../doris/catalog/DynamicPartitionTableTest.java | 49 +++++++++++++++++++--- .../apache/doris/utframe/TestWithFeService.java | 4 +- .../org/apache/doris/utframe/UtFrameUtils.java | 4 +- 5 files changed, 63 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 53e8ecf9119..e1460c269c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -113,7 +113,8 @@ public class BeLoadRebalancer extends Rebalancer { numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum(); } } - LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); + LOG.info("get number of low load paths: {}, with medium: {}, tag: {}, isUrgent {}", + numOfLowPaths, medium, clusterStat.getTag(), isUrgent); List<String> alternativeTabletInfos = Lists.newArrayList(); int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); @@ -121,6 +122,8 @@ public class BeLoadRebalancer extends Rebalancer { .map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId()))) .collect(Collectors.toList()); + boolean hasCandidateTablet = false; + // choose tablets from high load backends. // BackendLoadStatistic is sorted by load score in ascend order, // so we need to traverse it from last to first @@ -222,6 +225,8 @@ public class BeLoadRebalancer extends Rebalancer { continue; } + hasCandidateTablet = true; + // for urgent disk, pick tablets order by size, // then it may always pick tablets that was on the low backends. if (!lowBETablets.isEmpty() @@ -270,6 +275,9 @@ public class BeLoadRebalancer extends Rebalancer { if (!alternativeTablets.isEmpty()) { LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}", medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos); + } else if (isUrgent && !hasCandidateTablet) { + LOG.info("urgent balance cann't found candidate tablets. medium: {}, tag: {}", + medium, clusterStat.getTag()); } return alternativeTablets; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index c0a65966fd9..37a4b922023 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -170,22 +170,19 @@ public class DynamicPartitionScheduler extends MasterDaemon { return historyPartitionsSize.get(0); } - int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size(); - boolean isAscending = true; - for (int i = 1; i < size; i++) { - if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) { + ArrayList<Long> ascendingDeltaSize = new ArrayList<Long>(); + for (int i = Math.max(1, historyPartitionsSize.size() - 7); i < historyPartitionsSize.size(); i++) { + long delta = historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1); + if (delta < 0) { isAscending = false; break; } + ascendingDeltaSize.add(delta); } if (isAscending) { - ArrayList<Long> historyDeltaSize = Lists.newArrayList(); - for (int i = 1; i < size; i++) { - historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1)); - } - return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7); + return historyPartitionsSize.get(historyPartitionsSize.size() - 1) + ema(ascendingDeltaSize, 7); } else { return ema(historyPartitionsSize, 7); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 79093d6ed4b..2ae051e4f25 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1736,6 +1736,8 @@ public class DynamicPartitionTableTest { + " PROPERTIES (\n" + " \"dynamic_partition.enable\" = \"true\",\n" + " \"dynamic_partition.time_unit\" = \"YEAR\",\n" + + " \"dynamic_partition.start\" = \"-50\",\n" + + " \"dynamic_partition.create_history_partition\" = \"true\",\n" + " \"dynamic_partition.end\" = \"1\",\n" + " \"dynamic_partition.prefix\" = \"p\",\n" + " \"replication_allocation\" = \"tag.location.default: 1\"\n" @@ -1744,22 +1746,59 @@ public class DynamicPartitionTableTest { Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException("test"); OlapTable table = (OlapTable) db.getTableOrAnalysisException("test_autobucket_dynamic_partition"); List<Partition> partitions = Lists.newArrayList(table.getAllPartitions()); - Assert.assertEquals(2, partitions.size()); + Assert.assertEquals(52, partitions.size()); for (Partition partition : partitions) { Assert.assertEquals(FeConstants.default_bucket_num, partition.getDistributionInfo().getBucketNum()); partition.setVisibleVersionAndTime(2L, System.currentTimeMillis()); } RebalancerTestUtil.updateReplicaDataSize(1, 1, 1); - String alterStmt = + String alterStmt1 = "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '2')"; - ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt)); + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1)); List<Pair<Long, Long>> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(db.getId(), table.getId())); Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); partitions = Lists.newArrayList(table.getAllPartitions()); partitions.sort(Comparator.comparing(Partition::getId)); - Assert.assertEquals(3, partitions.size()); - Assert.assertEquals(1, partitions.get(2).getDistributionInfo().getBucketNum()); + Assert.assertEquals(53, partitions.size()); + Assert.assertEquals(1, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); + + table.readLock(); + try { + // first 40 partitions with size 0, then 13 partitions with size 100GB(10GB * 10 buckets) + for (int i = 0; i < 52; i++) { + Partition partition = partitions.get(i); + partition.updateVisibleVersion(2L); + for (MaterializedIndex idx : partition.getMaterializedIndices( + MaterializedIndex.IndexExtState.VISIBLE)) { + Assert.assertEquals(10, idx.getTablets().size()); + for (Tablet tablet : idx.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + replica.updateVersion(2L); + replica.setDataSize(i < 40 ? 0L : 10L << 30); + replica.setRowCount(1000L); + } + } + } + if (i >= 40) { + // first 52 partitions are 10 buckets(FeConstants.default_bucket_num) + Assert.assertEquals(10 * (10L << 30), partition.getAllDataSize(true)); + } + } + } finally { + table.readUnlock(); + } + + String alterStmt2 = + "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '3')"; + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt2)); + Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); + + partitions = Lists.newArrayList(table.getAllPartitions()); + partitions.sort(Comparator.comparing(Partition::getId)); + Assert.assertEquals(54, partitions.size()); + // 100GB total, 1GB per bucket, should 100 buckets. + Assert.assertEquals(100, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 407c1544a4b..8e25efdfada 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -517,8 +517,8 @@ public abstract class TestWithFeService { Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); diskInfo1.setPathHash(be.getId()); - diskInfo1.setTotalCapacityB(10L << 30); - diskInfo1.setAvailableCapacityB(5L << 30); + diskInfo1.setTotalCapacityB(10L << 40); + diskInfo1.setAvailableCapacityB(5L << 40); diskInfo1.setDataUsedCapacityB(480000); diskInfo1.setPathHash(be.getId()); Map<String, DiskInfo> disks = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index d09860351bf..22fa581391f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -317,8 +317,8 @@ public class UtFrameUtils { Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); Map<String, DiskInfo> disks = Maps.newHashMap(); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); - diskInfo1.setTotalCapacityB(10L << 30); - diskInfo1.setAvailableCapacityB(5L << 30); + diskInfo1.setTotalCapacityB(10L << 40); + diskInfo1.setAvailableCapacityB(5L << 40); diskInfo1.setDataUsedCapacityB(480000); diskInfo1.setPathHash(be.getId()); disks.put(diskInfo1.getRootPath(), diskInfo1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org