This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77ec0a23b3e [fix](auto bucket) fix auto buckets calc using the first k 
partition (#41675)
77ec0a23b3e is described below

commit 77ec0a23b3ec2ff5fee0bceae212da6f52557d82
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Mon Oct 14 21:58:24 2024 +0800

    [fix](auto bucket) fix auto buckets calc using the first k partition 
(#41675)
    
    If the first k (at most 7) partition data size is ascending, the result
    will be partion_size[k-1] + ema(first k partitons delta).
    
    This is a bug, should use the last k partitions, but not the first k
    partitions to calculate.
---
 .../org/apache/doris/clone/BeLoadRebalancer.java   | 10 ++++-
 .../doris/clone/DynamicPartitionScheduler.java     | 15 +++----
 .../doris/catalog/DynamicPartitionTableTest.java   | 49 +++++++++++++++++++---
 .../apache/doris/utframe/TestWithFeService.java    |  4 +-
 .../org/apache/doris/utframe/UtFrameUtils.java     |  4 +-
 5 files changed, 63 insertions(+), 19 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 53e8ecf9119..e1460c269c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -113,7 +113,8 @@ public class BeLoadRebalancer extends Rebalancer {
                 numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
             }
         }
-        LOG.info("get number of low load paths: {}, with medium: {}", 
numOfLowPaths, medium);
+        LOG.info("get number of low load paths: {}, with medium: {}, tag: {}, 
isUrgent {}",
+                numOfLowPaths, medium, clusterStat.getTag(), isUrgent);
 
         List<String> alternativeTabletInfos = Lists.newArrayList();
         int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
@@ -121,6 +122,8 @@ public class BeLoadRebalancer extends Rebalancer {
                 .map(beStat -> 
Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
                 .collect(Collectors.toList());
 
+        boolean hasCandidateTablet = false;
+
         // choose tablets from high load backends.
         // BackendLoadStatistic is sorted by load score in ascend order,
         // so we need to traverse it from last to first
@@ -222,6 +225,8 @@ public class BeLoadRebalancer extends Rebalancer {
                         continue;
                     }
 
+                    hasCandidateTablet = true;
+
                     // for urgent disk, pick tablets order by size,
                     // then it may always pick tablets that was on the low 
backends.
                     if (!lowBETablets.isEmpty()
@@ -270,6 +275,9 @@ public class BeLoadRebalancer extends Rebalancer {
         if (!alternativeTablets.isEmpty()) {
             LOG.info("select alternative tablets, medium: {}, is urgent: {}, 
num: {}, detail: {}",
                     medium, isUrgent, alternativeTablets.size(), 
alternativeTabletInfos);
+        } else if (isUrgent && !hasCandidateTablet) {
+            LOG.info("urgent balance cann't found candidate tablets. medium: 
{}, tag: {}",
+                    medium, clusterStat.getTag());
         }
         return alternativeTablets;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index c0a65966fd9..37a4b922023 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -170,22 +170,19 @@ public class DynamicPartitionScheduler extends 
MasterDaemon {
             return historyPartitionsSize.get(0);
         }
 
-        int size = historyPartitionsSize.size() > 7 ? 7 : 
historyPartitionsSize.size();
-
         boolean isAscending = true;
-        for (int i = 1; i < size; i++) {
-            if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 
1)) {
+        ArrayList<Long> ascendingDeltaSize = new ArrayList<Long>();
+        for (int i = Math.max(1, historyPartitionsSize.size() - 7); i < 
historyPartitionsSize.size(); i++) {
+            long delta = historyPartitionsSize.get(i) - 
historyPartitionsSize.get(i - 1);
+            if (delta < 0) {
                 isAscending = false;
                 break;
             }
+            ascendingDeltaSize.add(delta);
         }
 
         if (isAscending) {
-            ArrayList<Long> historyDeltaSize = Lists.newArrayList();
-            for (int i = 1; i < size; i++) {
-                historyDeltaSize.add(historyPartitionsSize.get(i) - 
historyPartitionsSize.get(i - 1));
-            }
-            return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 
7);
+            return historyPartitionsSize.get(historyPartitionsSize.size() - 1) 
+ ema(ascendingDeltaSize, 7);
         } else {
             return ema(historyPartitionsSize, 7);
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
index 79093d6ed4b..2ae051e4f25 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
@@ -1736,6 +1736,8 @@ public class DynamicPartitionTableTest {
                 + " PROPERTIES (\n"
                 + " \"dynamic_partition.enable\" = \"true\",\n"
                 + " \"dynamic_partition.time_unit\" = \"YEAR\",\n"
+                + " \"dynamic_partition.start\" = \"-50\",\n"
+                + " \"dynamic_partition.create_history_partition\" = 
\"true\",\n"
                 + " \"dynamic_partition.end\" = \"1\",\n"
                 + " \"dynamic_partition.prefix\" = \"p\",\n"
                 + " \"replication_allocation\" = \"tag.location.default: 1\"\n"
@@ -1744,22 +1746,59 @@ public class DynamicPartitionTableTest {
         Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException("test");
         OlapTable table = (OlapTable) 
db.getTableOrAnalysisException("test_autobucket_dynamic_partition");
         List<Partition> partitions = 
Lists.newArrayList(table.getAllPartitions());
-        Assert.assertEquals(2, partitions.size());
+        Assert.assertEquals(52, partitions.size());
         for (Partition partition : partitions) {
             Assert.assertEquals(FeConstants.default_bucket_num, 
partition.getDistributionInfo().getBucketNum());
             partition.setVisibleVersionAndTime(2L, System.currentTimeMillis());
         }
         RebalancerTestUtil.updateReplicaDataSize(1, 1, 1);
 
-        String alterStmt =
+        String alterStmt1 =
                 "alter table test.test_autobucket_dynamic_partition set 
('dynamic_partition.end' = '2')";
-        ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt));
+        ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1));
         List<Pair<Long, Long>> tempDynamicPartitionTableInfo = 
Lists.newArrayList(Pair.of(db.getId(), table.getId()));
         
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
 false);
 
         partitions = Lists.newArrayList(table.getAllPartitions());
         partitions.sort(Comparator.comparing(Partition::getId));
-        Assert.assertEquals(3, partitions.size());
-        Assert.assertEquals(1, 
partitions.get(2).getDistributionInfo().getBucketNum());
+        Assert.assertEquals(53, partitions.size());
+        Assert.assertEquals(1, partitions.get(partitions.size() - 
1).getDistributionInfo().getBucketNum());
+
+        table.readLock();
+        try {
+            // first 40 partitions with size 0,  then 13 partitions with size 
100GB(10GB * 10 buckets)
+            for (int i = 0; i < 52; i++) {
+                Partition partition = partitions.get(i);
+                partition.updateVisibleVersion(2L);
+                for (MaterializedIndex idx : partition.getMaterializedIndices(
+                        MaterializedIndex.IndexExtState.VISIBLE)) {
+                    Assert.assertEquals(10, idx.getTablets().size());
+                    for (Tablet tablet : idx.getTablets()) {
+                        for (Replica replica : tablet.getReplicas()) {
+                            replica.updateVersion(2L);
+                            replica.setDataSize(i < 40 ? 0L : 10L << 30);
+                            replica.setRowCount(1000L);
+                        }
+                    }
+                }
+                if (i >= 40) {
+                    // first 52 partitions are 10 
buckets(FeConstants.default_bucket_num)
+                    Assert.assertEquals(10 * (10L << 30), 
partition.getAllDataSize(true));
+                }
+            }
+        } finally {
+            table.readUnlock();
+        }
+
+        String alterStmt2 =
+                "alter table test.test_autobucket_dynamic_partition set 
('dynamic_partition.end' = '3')";
+        ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt2));
+        
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
 false);
+
+        partitions = Lists.newArrayList(table.getAllPartitions());
+        partitions.sort(Comparator.comparing(Partition::getId));
+        Assert.assertEquals(54, partitions.size());
+        // 100GB total, 1GB per bucket, should 100 buckets.
+        Assert.assertEquals(100, partitions.get(partitions.size() - 
1).getDistributionInfo().getBucketNum());
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 407c1544a4b..8e25efdfada 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -517,8 +517,8 @@ public abstract class TestWithFeService {
         Backend be = new Backend(Env.getCurrentEnv().getNextId(), 
backend.getHost(), backend.getHeartbeatPort());
         DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
         diskInfo1.setPathHash(be.getId());
-        diskInfo1.setTotalCapacityB(10L << 30);
-        diskInfo1.setAvailableCapacityB(5L << 30);
+        diskInfo1.setTotalCapacityB(10L << 40);
+        diskInfo1.setAvailableCapacityB(5L << 40);
         diskInfo1.setDataUsedCapacityB(480000);
         diskInfo1.setPathHash(be.getId());
         Map<String, DiskInfo> disks = Maps.newHashMap();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index d09860351bf..22fa581391f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -317,8 +317,8 @@ public class UtFrameUtils {
         Backend be = new Backend(Env.getCurrentEnv().getNextId(), 
backend.getHost(), backend.getHeartbeatPort());
         Map<String, DiskInfo> disks = Maps.newHashMap();
         DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
-        diskInfo1.setTotalCapacityB(10L << 30);
-        diskInfo1.setAvailableCapacityB(5L << 30);
+        diskInfo1.setTotalCapacityB(10L << 40);
+        diskInfo1.setAvailableCapacityB(5L << 40);
         diskInfo1.setDataUsedCapacityB(480000);
         diskInfo1.setPathHash(be.getId());
         disks.put(diskInfo1.getRootPath(), diskInfo1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to