This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 71eecdfc6e1 [fix](cloud) fix corner case when warm up data larger than cache capacity (#49050) 71eecdfc6e1 is described below commit 71eecdfc6e1a6528582e9457391fc861aa8b988d Author: zhengyu <zhangzhen...@selectdb.com> AuthorDate: Mon Mar 31 10:42:23 2025 +0800 [fix](cloud) fix corner case when warm up data larger than cache capacity (#49050) when warm up target table's first partition is larger cache capacity of the cluster/compute group, the deleted code will break and leave warmUpTotalFileCache zero, resulting no error&warning but 0 batch size. this commit will warn user this situation and allow user use FORCE warm up to load partial data into the file cache. --- .../apache/doris/cloud/CacheHotspotManager.java | 83 +++++++++----- .../doris/cloud/cache/CacheHotspotManagerTest.java | 124 +++++++++++++++++++++ 2 files changed, 182 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index f4c7392eb75..a05518a6ee2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -330,7 +330,7 @@ public class CacheHotspotManager extends MasterDaemon { return responseList; } - private Long getFileCacheCapacity(String clusterName) throws RuntimeException { + Long getFileCacheCapacity(String clusterName) throws RuntimeException { List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterName(clusterName); Long totalFileCache = 0L; @@ -516,56 +516,89 @@ public class CacheHotspotManager extends MasterDaemon { } } - private Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String dstClusterName, + public List<Partition> getPartitionsFromTriple(Triple<String, String, String> tableTriple) { + String dbName = tableTriple.getLeft(); + String tableName = tableTriple.getMiddle(); + String partitionName = tableTriple.getRight(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName); + OlapTable table = (OlapTable) db.getTableNullable(tableName); + List<Partition> partitions = new ArrayList<>(); + if (partitionName.length() != 0) { + partitions.add(table.getPartition(partitionName)); + } else { + partitions.addAll(table.getPartitions()); + } + return partitions; + } + + public List<Backend> getBackendsFromCluster(String dstClusterName) { + return ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getBackendsByClusterName(dstClusterName); + } + + public Set<Long> getTabletIdsFromBe(long beId) { + return ((CloudEnv) Env.getCurrentEnv()) + .getCloudTabletRebalancer() + .getSnapshotTabletsInPrimaryByBeId(beId); + } + + public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> indexes) { + List<Tablet> tablets = new ArrayList<>(); + for (MaterializedIndex index : indexes) { + tablets.addAll(index.getTablets()); + } + return tablets; + } + + public Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String dstClusterName, List<Triple<String, String, String>> tables, boolean isForce) throws RuntimeException { Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>(); Long totalFileCache = getFileCacheCapacity(dstClusterName); Long warmUpTotalFileCache = 0L; + LOG.info("Start warm up job {}, cluster {}, total cache size: {}", + jobId, dstClusterName, totalFileCache); for (Triple<String, String, String> tableTriple : tables) { if (warmUpTotalFileCache > totalFileCache) { + LOG.info("Warm up size {} exceeds total cache size {}, breaking loop", + warmUpTotalFileCache, totalFileCache); break; } - String dbName = tableTriple.getLeft(); - String tableName = tableTriple.getMiddle(); - String partitionName = tableTriple.getRight(); - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName); - OlapTable table = (OlapTable) db.getTableNullable(tableName); - List<Partition> partitions = new ArrayList<>(); - if (partitionName.length() != 0) { - partitions.add(table.getPartition(partitionName)); - } else { - partitions.addAll(table.getPartitions()); - } - List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) - .getBackendsByClusterName(dstClusterName); + + List<Partition> partitions = getPartitionsFromTriple(tableTriple); + LOG.info("Got {} partitions for table {}.{}.{}", partitions.size(), + tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight()); + List<Backend> backends = getBackendsFromCluster(dstClusterName); + LOG.info("Got {} backends for cluster {}", backends.size(), dstClusterName); List<Partition> warmUpPartitions = new ArrayList<>(); for (Partition partition : partitions) { Long partitionSize = partition.getDataSize(true); - if ((warmUpTotalFileCache + partitionSize) > totalFileCache) { - break; - } warmUpTotalFileCache += partitionSize; warmUpPartitions.add(partition); + if (warmUpTotalFileCache > totalFileCache) { + LOG.info("Warm up size {} exceeds total cache size {}, current partition size {}", + warmUpTotalFileCache, totalFileCache, partitionSize); + break; + } } List<MaterializedIndex> indexes = new ArrayList<>(); for (Partition partition : warmUpPartitions) { indexes.addAll(partition.getMaterializedIndices(IndexExtState.VISIBLE)); } - List<Tablet> tablets = new ArrayList<>(); - for (MaterializedIndex index : indexes) { - tablets.addAll(index.getTablets()); - } + LOG.info("Got {} materialized indexes for table {}.{}.{}", indexes.size(), + tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight()); + List<Tablet> tablets = getTabletsFromIndexs(indexes); + LOG.info("Got {} tablets for table {}.{}.{}", tablets.size(), + tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight()); for (Backend backend : backends) { - Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv()) - .getCloudTabletRebalancer() - .getSnapshotTabletsInPrimaryByBeId(backend.getId()); + Set<Long> beTabletIds = getTabletIdsFromBe(backend.getId()); List<Tablet> warmUpTablets = new ArrayList<>(); for (Tablet tablet : tablets) { if (beTabletIds.contains(tablet.getId())) { warmUpTablets.add(tablet); } } + LOG.info("Assigning {} tablets to backend {}", warmUpTablets.size(), backend.getId()); beToWarmUpTablets.computeIfAbsent(backend.getId(), k -> new ArrayList<>()).addAll(warmUpTablets); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java new file mode 100644 index 00000000000..ff42ea31bcb --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.cache; + +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.cloud.CacheHotspotManager; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.system.Backend; + +import mockit.Mock; +import mockit.MockUp; +import org.apache.commons.lang3.tuple.Triple; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CacheHotspotManagerTest { + private CacheHotspotManager cacheHotspotManager; + private CloudSystemInfoService cloudSystemInfoService; + private Partition partition; + + @Test + public void testWarmUpNewClusterByTable() { + partition = new Partition(0, null, null, null); + new MockUp<Partition>() { + + @Mock + public long getDataSize(boolean singleReplica) { + return 10000000L; + } + + @Mock + public List<MaterializedIndex> getMaterializedIndices(IndexExtState extState) { + List<MaterializedIndex> list = new ArrayList<>(); + MaterializedIndex ind = new MaterializedIndex(); + list.add(ind); + return list; + } + }; + + cloudSystemInfoService = new CloudSystemInfoService(); + cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService); + new MockUp<CacheHotspotManager>() { + + @Mock + Long getFileCacheCapacity(String clusterName) throws RuntimeException { + return 100L; + } + + @Mock + List<Partition> getPartitionsFromTriple(Triple<String, String, String> tableTriple) { + List<Partition> partitions = new ArrayList<>(); + partition = new Partition(1, "p1", null, null); + partitions.add(partition); + return partitions; + } + + @Mock + List<Backend> getBackendsFromCluster(String dstClusterName) { + List<Backend> backends = new ArrayList<>(); + Backend backend = new Backend(11, dstClusterName, 0); + backends.add(backend); + return backends; + } + + @Mock + public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> indexes) { + List<Tablet> list = new ArrayList<>(); + Tablet tablet = new Tablet(1001L); + list.add(tablet); + return list; + } + + @Mock + Set<Long> getTabletIdsFromBe(long beId) { + Set<Long> tabletIds = new HashSet<Long>(); + tabletIds.add(1001L); + return tabletIds; + } + }; + + // Setup mock data + long jobId = 1L; + String dstClusterName = "test_cluster"; + List<Triple<String, String, String>> tables = new ArrayList<>(); + tables.add(Triple.of("test_db", "test_table", "")); + + + // force = true + Map<Long, List<Tablet>> result = cacheHotspotManager.warmUpNewClusterByTable( + jobId, dstClusterName, tables, true); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(11L).get(0).getId(), 1001L); + + // force = false + RuntimeException exception = Assert.assertThrows(RuntimeException.class, () -> { + cacheHotspotManager.warmUpNewClusterByTable(jobId, dstClusterName, tables, false); + }); + Assert.assertEquals("The cluster " + dstClusterName + " cache size is not enough", exception.getMessage()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org