This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 71eecdfc6e1 [fix](cloud) fix corner case when warm up data larger than 
cache capacity (#49050)
71eecdfc6e1 is described below

commit 71eecdfc6e1a6528582e9457391fc861aa8b988d
Author: zhengyu <zhangzhen...@selectdb.com>
AuthorDate: Mon Mar 31 10:42:23 2025 +0800

    [fix](cloud) fix corner case when warm up data larger than cache capacity 
(#49050)
    
    when warm up target table's first partition is larger cache capacity of
    the cluster/compute group, the deleted code will break and leave
    warmUpTotalFileCache zero, resulting no error&warning but 0 batch size.
    
    this commit will warn user this situation and allow user use FORCE warm
    up to load partial data into the file cache.
---
 .../apache/doris/cloud/CacheHotspotManager.java    |  83 +++++++++-----
 .../doris/cloud/cache/CacheHotspotManagerTest.java | 124 +++++++++++++++++++++
 2 files changed, 182 insertions(+), 25 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
index f4c7392eb75..a05518a6ee2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
@@ -330,7 +330,7 @@ public class CacheHotspotManager extends MasterDaemon {
         return responseList;
     }
 
-    private Long getFileCacheCapacity(String clusterName) throws 
RuntimeException {
+    Long getFileCacheCapacity(String clusterName) throws RuntimeException {
         List<Backend> backends = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
                                         .getBackendsByClusterName(clusterName);
         Long totalFileCache = 0L;
@@ -516,56 +516,89 @@ public class CacheHotspotManager extends MasterDaemon {
         }
     }
 
-    private Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String 
dstClusterName,
+    public List<Partition> getPartitionsFromTriple(Triple<String, String, 
String> tableTriple) {
+        String dbName = tableTriple.getLeft();
+        String tableName = tableTriple.getMiddle();
+        String partitionName = tableTriple.getRight();
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
+        OlapTable table = (OlapTable) db.getTableNullable(tableName);
+        List<Partition> partitions = new ArrayList<>();
+        if (partitionName.length() != 0) {
+            partitions.add(table.getPartition(partitionName));
+        } else {
+            partitions.addAll(table.getPartitions());
+        }
+        return partitions;
+    }
+
+    public List<Backend> getBackendsFromCluster(String dstClusterName) {
+        return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
+        .getBackendsByClusterName(dstClusterName);
+    }
+
+    public Set<Long> getTabletIdsFromBe(long beId) {
+        return ((CloudEnv) Env.getCurrentEnv())
+                                        .getCloudTabletRebalancer()
+                                        
.getSnapshotTabletsInPrimaryByBeId(beId);
+    }
+
+    public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> indexes) {
+        List<Tablet> tablets = new ArrayList<>();
+        for (MaterializedIndex index : indexes) {
+            tablets.addAll(index.getTablets());
+        }
+        return tablets;
+    }
+
+    public Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String 
dstClusterName,
             List<Triple<String, String, String>> tables,
             boolean isForce) throws RuntimeException {
         Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
         Long totalFileCache = getFileCacheCapacity(dstClusterName);
         Long warmUpTotalFileCache = 0L;
+        LOG.info("Start warm up job {}, cluster {}, total cache size: {}",
+                jobId, dstClusterName, totalFileCache);
         for (Triple<String, String, String> tableTriple : tables) {
             if (warmUpTotalFileCache > totalFileCache) {
+                LOG.info("Warm up size {} exceeds total cache size {}, 
breaking loop",
+                        warmUpTotalFileCache, totalFileCache);
                 break;
             }
-            String dbName = tableTriple.getLeft();
-            String tableName = tableTriple.getMiddle();
-            String partitionName = tableTriple.getRight();
-            Database db = 
Env.getCurrentInternalCatalog().getDbNullable(dbName);
-            OlapTable table = (OlapTable) db.getTableNullable(tableName);
-            List<Partition> partitions = new ArrayList<>();
-            if (partitionName.length() != 0) {
-                partitions.add(table.getPartition(partitionName));
-            } else {
-                partitions.addAll(table.getPartitions());
-            }
-            List<Backend> backends = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                                            
.getBackendsByClusterName(dstClusterName);
+
+            List<Partition> partitions = getPartitionsFromTriple(tableTriple);
+            LOG.info("Got {} partitions for table {}.{}.{}", partitions.size(),
+                    tableTriple.getLeft(), tableTriple.getMiddle(), 
tableTriple.getRight());
+            List<Backend> backends = getBackendsFromCluster(dstClusterName);
+            LOG.info("Got {} backends for cluster {}", backends.size(), 
dstClusterName);
             List<Partition> warmUpPartitions = new ArrayList<>();
             for (Partition partition : partitions) {
                 Long partitionSize = partition.getDataSize(true);
-                if ((warmUpTotalFileCache + partitionSize) > totalFileCache) {
-                    break;
-                }
                 warmUpTotalFileCache += partitionSize;
                 warmUpPartitions.add(partition);
+                if (warmUpTotalFileCache > totalFileCache) {
+                    LOG.info("Warm up size {} exceeds total cache size {}, 
current partition size {}",
+                            warmUpTotalFileCache, totalFileCache, 
partitionSize);
+                    break;
+                }
             }
             List<MaterializedIndex> indexes = new ArrayList<>();
             for (Partition partition : warmUpPartitions) {
                 
indexes.addAll(partition.getMaterializedIndices(IndexExtState.VISIBLE));
             }
-            List<Tablet> tablets = new ArrayList<>();
-            for (MaterializedIndex index : indexes) {
-                tablets.addAll(index.getTablets());
-            }
+            LOG.info("Got {} materialized indexes for table {}.{}.{}", 
indexes.size(),
+                    tableTriple.getLeft(), tableTriple.getMiddle(), 
tableTriple.getRight());
+            List<Tablet> tablets = getTabletsFromIndexs(indexes);
+            LOG.info("Got {} tablets for table {}.{}.{}", tablets.size(),
+                    tableTriple.getLeft(), tableTriple.getMiddle(), 
tableTriple.getRight());
             for (Backend backend : backends) {
-                Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
-                                        .getCloudTabletRebalancer()
-                                        
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
+                Set<Long> beTabletIds = getTabletIdsFromBe(backend.getId());
                 List<Tablet> warmUpTablets = new ArrayList<>();
                 for (Tablet tablet : tablets) {
                     if (beTabletIds.contains(tablet.getId())) {
                         warmUpTablets.add(tablet);
                     }
                 }
+                LOG.info("Assigning {} tablets to backend {}", 
warmUpTablets.size(), backend.getId());
                 beToWarmUpTablets.computeIfAbsent(backend.getId(),
                         k -> new ArrayList<>()).addAll(warmUpTablets);
             }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
new file mode 100644
index 00000000000..ff42ea31bcb
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.cache;
+
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.cloud.CacheHotspotManager;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.system.Backend;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.commons.lang3.tuple.Triple;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CacheHotspotManagerTest {
+    private CacheHotspotManager cacheHotspotManager;
+    private CloudSystemInfoService cloudSystemInfoService;
+    private Partition partition;
+
+    @Test
+    public void testWarmUpNewClusterByTable() {
+        partition = new Partition(0, null, null, null);
+        new MockUp<Partition>() {
+
+            @Mock
+            public long getDataSize(boolean singleReplica) {
+                return 10000000L;
+            }
+
+            @Mock
+            public List<MaterializedIndex> 
getMaterializedIndices(IndexExtState extState) {
+                List<MaterializedIndex> list = new ArrayList<>();
+                MaterializedIndex ind = new MaterializedIndex();
+                list.add(ind);
+                return list;
+            }
+        };
+
+        cloudSystemInfoService = new CloudSystemInfoService();
+        cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService);
+        new MockUp<CacheHotspotManager>() {
+
+            @Mock
+            Long getFileCacheCapacity(String clusterName) throws 
RuntimeException {
+                return 100L;
+            }
+
+            @Mock
+            List<Partition> getPartitionsFromTriple(Triple<String, String, 
String> tableTriple) {
+                List<Partition> partitions = new ArrayList<>();
+                partition = new Partition(1, "p1", null, null);
+                partitions.add(partition);
+                return partitions;
+            }
+
+            @Mock
+            List<Backend> getBackendsFromCluster(String dstClusterName) {
+                List<Backend> backends = new ArrayList<>();
+                Backend backend = new Backend(11, dstClusterName, 0);
+                backends.add(backend);
+                return backends;
+            }
+
+            @Mock
+            public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> 
indexes) {
+                List<Tablet> list = new ArrayList<>();
+                Tablet tablet = new Tablet(1001L);
+                list.add(tablet);
+                return list;
+            }
+
+            @Mock
+            Set<Long> getTabletIdsFromBe(long beId) {
+                Set<Long> tabletIds = new HashSet<Long>();
+                tabletIds.add(1001L);
+                return tabletIds;
+            }
+        };
+
+        // Setup mock data
+        long jobId = 1L;
+        String dstClusterName = "test_cluster";
+        List<Triple<String, String, String>> tables = new ArrayList<>();
+        tables.add(Triple.of("test_db", "test_table", ""));
+
+
+        // force = true
+        Map<Long, List<Tablet>> result = 
cacheHotspotManager.warmUpNewClusterByTable(
+                jobId, dstClusterName, tables, true);
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.get(11L).get(0).getId(), 1001L);
+
+        // force = false
+        RuntimeException exception = 
Assert.assertThrows(RuntimeException.class, () -> {
+            cacheHotspotManager.warmUpNewClusterByTable(jobId, dstClusterName, 
tables, false);
+        });
+        Assert.assertEquals("The cluster " + dstClusterName + " cache size is 
not enough", exception.getMessage());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to