This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 7a159622af0 [fix](storage medium) Fix partition show storage medium not right when use default medium (#34123) 7a159622af0 is described below commit 7a159622af04be8eeb42bb8bdda24136ba4024d8 Author: deardeng <565620...@qq.com> AuthorDate: Sat Apr 27 14:13:37 2024 +0800 [fix](storage medium) Fix partition show storage medium not right when use default medium (#34123) Co-authored-by: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> --- .../main/java/org/apache/doris/common/Config.java | 8 +- .../java/org/apache/doris/backup/RestoreJob.java | 6 +- .../org/apache/doris/catalog/DataProperty.java | 4 + .../java/org/apache/doris/catalog/OlapTable.java | 6 +- .../doris/common/util/DynamicPartitionUtil.java | 13 +- .../apache/doris/common/util/PropertyAnalyzer.java | 3 +- .../apache/doris/datasource/InternalCatalog.java | 77 ++++---- .../org/apache/doris/system/BeSelectionPolicy.java | 15 ++ .../org/apache/doris/system/SystemInfoService.java | 215 +++++++++------------ .../org/apache/doris/backup/RestoreJobTest.java | 10 +- .../apache/doris/catalog/ModifyBackendTest.java | 2 +- .../doris/catalog/ReplicaAllocationTest.java | 8 +- .../doris/load/sync/canal/CanalSyncDataTest.java | 2 +- .../apache/doris/system/SystemInfoServiceTest.java | 6 +- 14 files changed, 192 insertions(+), 183 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 6e87e20960a..dccfa777668 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2035,7 +2035,13 @@ public class Config extends ConfigBase { public static boolean skip_localhost_auth_check = true; @ConfField(mutable = true) - public static boolean enable_round_robin_create_tablet = false; + public static boolean enable_round_robin_create_tablet = true; + + @ConfField(mutable = true, masterOnly = true, description = { + "创建分区时,总是从第一个 BE 开始创建。注意:这种方式可能造成BE不均衡", + "When creating tablet of a partition, always start from the first BE. " + + "Note: This method may cause BE imbalance"}) + public static boolean create_tablet_round_robin_from_start = false; /** * To prevent different types (V1, V2, V3) of behavioral inconsistencies, diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 123600f4705..8a79050e8a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1102,6 +1102,7 @@ public class RestoreJob extends AbstractJob { long visibleVersion = remotePart.getVisibleVersion(); // tablets + Map<Tag, Integer> nextIndexs = Maps.newHashMap(); for (MaterializedIndex remoteIdx : remotePart.getMaterializedIndices(IndexExtState.VISIBLE)) { int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId()); int remotetabletSize = remoteIdx.getTablets().size(); @@ -1115,8 +1116,9 @@ public class RestoreJob extends AbstractJob { // replicas try { - Map<Tag, List<Long>> beIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, null, false, false); + Pair<Map<Tag, List<Long>>, TStorageMedium> beIdsAndMedium = Env.getCurrentSystemInfo() + .selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, false, false); + Map<Tag, List<Long>> beIds = beIdsAndMedium.first; for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) { for (Long beId : entry.getValue()) { long newReplicaId = env.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java index 731776384d0..2974d337167 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java @@ -118,6 +118,10 @@ public class DataProperty implements Writable, GsonPostProcessable { storageMediumSpecified = isSpecified; } + public void setStorageMedium(TStorageMedium medium) { + this.storageMedium = medium; + } + public static DataProperty read(DataInput in) throws IOException { if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_108) { String json = Text.readString(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index d5ecc01febb..384e47dc69b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -594,6 +594,7 @@ public class OlapTable extends Table { reserveReplica ? null : restoreReplicaAlloc, isSinglePartition); // for each partition, reset rollup index map + Map<Tag, Integer> nextIndexs = Maps.newHashMap(); for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) { Partition partition = entry.getValue(); // entry.getKey() is the new partition id, use it to get the restore specified @@ -630,9 +631,10 @@ public class OlapTable extends Table { // replicas try { - Map<Tag, List<Long>> tag2beIds = + Pair<Map<Tag, List<Long>>, TStorageMedium> tag2beIdsAndMedium = Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation( - replicaAlloc, null, false, false); + replicaAlloc, nextIndexs, null, false, false); + Map<Tag, List<Long>> tag2beIds = tag2beIdsAndMedium.first; for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) { for (Long beId : entry3.getValue()) { long newReplicaId = env.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java index 6587da5aef6..ca3086c6ede 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java @@ -40,10 +40,12 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.policy.StoragePolicy; +import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import com.google.common.collect.Range; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -228,7 +230,8 @@ public class DynamicPartitionUtil { ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_FORMAT, val); } ReplicaAllocation replicaAlloc = new ReplicaAllocation(Short.valueOf(val)); - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, Maps.newHashMap(), + null, false, true); } private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int hotPartitionNum, @@ -237,14 +240,16 @@ public class DynamicPartitionUtil { ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO); } - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true); + Map<Tag, Integer> nextIndexs = Maps.newHashMap(); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, + false, true); if (hotPartitionNum <= 0) { return; } try { - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, TStorageMedium.SSD, false, - true); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, TStorageMedium.SSD, + false, true); } catch (DdlException e) { throw new DdlException("Failed to find enough backend for ssd storage medium. When setting " + DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the hot partitions will store " diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 901f554b094..686495ef73d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -1093,6 +1093,7 @@ public class PropertyAnalyzer { allocationVal = allocationVal.replaceAll(" ", ""); String[] locations = allocationVal.split(","); int totalReplicaNum = 0; + Map<Tag, Integer> nextIndexs = Maps.newHashMap(); for (String location : locations) { String[] parts = location.split(":"); if (parts.length != 2) { @@ -1116,7 +1117,7 @@ public class PropertyAnalyzer { try { SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); systemInfoService.selectBackendIdsForReplicaCreation( - replicaAlloc, null, false, true); + replicaAlloc, nextIndexs, null, false, true); } catch (DdlException ddlException) { throw new AnalysisException(ddlException.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 64cfc9b745a..506da48d7a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1573,7 +1573,7 @@ public class InternalCatalog implements CatalogIf<Database> { long partitionId = idGeneratorBuffer.getNextId(); Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(), olapTable.getName(), olapTable.getBaseIndexId(), partitionId, partitionName, indexIdToMeta, - distributionInfo, dataProperty.getStorageMedium(), singlePartitionDesc.getReplicaAlloc(), + distributionInfo, dataProperty, singlePartitionDesc.getReplicaAlloc(), singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet, olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(), singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(), @@ -1828,7 +1828,7 @@ public class InternalCatalog implements CatalogIf<Database> { private Partition createPartitionWithIndices(String clusterName, long dbId, long tableId, String tableName, long baseIndexId, long partitionId, String partitionName, Map<Long, MaterializedIndexMeta> indexIdToMeta, - DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc, + DistributionInfo distributionInfo, DataProperty dataProperty, ReplicaAllocation replicaAlloc, Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes, boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType, DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy, @@ -1868,6 +1868,7 @@ public class InternalCatalog implements CatalogIf<Database> { long version = partition.getVisibleVersion(); short totalReplicaNum = replicaAlloc.getTotalReplicaNum(); + TStorageMedium realStorageMedium = null; for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) { long indexId = entry.getKey(); MaterializedIndex index = entry.getValue(); @@ -1875,9 +1876,16 @@ public class InternalCatalog implements CatalogIf<Database> { // create tablets int schemaHash = indexMeta.getSchemaHash(); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium); - createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, tabletMeta, - tabletIdSet, idGeneratorBuffer, isStorageMediumSpecified); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, + schemaHash, dataProperty.getStorageMedium()); + realStorageMedium = createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, + replicaAlloc, tabletMeta, tabletIdSet, idGeneratorBuffer, dataProperty.isStorageMediumSpecified()); + if (realStorageMedium != null && !realStorageMedium.equals(dataProperty.getStorageMedium())) { + dataProperty.setStorageMedium(realStorageMedium); + LOG.info("real medium not eq default " + + "tableName={} tableId={} partitionName={} partitionId={} readMedium {}", + tableName, tableId, partitionName, partitionId, realStorageMedium); + } boolean ok = false; String errMsg = null; @@ -1898,8 +1906,8 @@ public class InternalCatalog implements CatalogIf<Database> { countDownLatch.addMark(backendId, tabletId); CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId, replicaId, shortKeyColumnCount, schemaHash, version, keysType, storageType, - storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType, - dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite, storagePolicy, + realStorageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, + tabletType, dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite, storagePolicy, disableAutoCompaction, enableSingleReplicaCompaction, skipWriteIndexOnLoad, compactionPolicy, timeSeriesCompactionGoalSizeMbytes, timeSeriesCompactionFileCountThreshold, timeSeriesCompactionTimeThresholdSeconds, @@ -2486,7 +2494,7 @@ public class InternalCatalog implements CatalogIf<Database> { Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(), olapTable.getName(), olapTable.getBaseIndexId(), partitionId, partitionName, olapTable.getIndexIdToMeta(), partitionDistributionInfo, - partitionInfo.getDataProperty(partitionId).getStorageMedium(), + partitionInfo.getDataProperty(partitionId), partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType, olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, @@ -2561,7 +2569,7 @@ public class InternalCatalog implements CatalogIf<Database> { Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(), olapTable.getName(), olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(), partitionDistributionInfo, - dataProperty.getStorageMedium(), partitionInfo.getReplicaAllocation(entry.getValue()), + dataProperty, partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat, partitionInfo.getTabletType(entry.getValue()), compressionType, olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), @@ -2760,11 +2768,12 @@ public class InternalCatalog implements CatalogIf<Database> { } @VisibleForTesting - public void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState, + public TStorageMedium createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified) throws DdlException { ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); Map<Tag, List<List<Long>>> backendsPerBucketSeq = null; GroupId groupId = null; if (colocateIndex.isColocateTable(tabletMeta.getTableId())) { @@ -2784,19 +2793,23 @@ public class InternalCatalog implements CatalogIf<Database> { backendsPerBucketSeq = Maps.newHashMap(); } + TStorageMedium storageMedium = Config.disable_storage_medium_check ? null : tabletMeta.getStorageMedium(); + Map<Tag, Integer> nextIndexs = new HashMap<>(); if (Config.enable_round_robin_create_tablet) { - for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) { - int startPos = Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(), - tabletMeta.getStorageMedium()); - if (startPos == -1) { - throw new DdlException("The number of BEs that match the policy is insufficient"); + for (Tag tag : replicaAlloc.getAllocMap().keySet()) { + int startPos = -1; + if (Config.create_tablet_round_robin_from_start) { + startPos = 0; + } else { + startPos = systemInfoService.getStartPosOfRoundRobin(tag, storageMedium, + isStorageMediumSpecified); } - nextIndexs.put(entry.getKey(), startPos); + nextIndexs.put(tag, startPos); } } - + TStorageMedium realStorageMedium = Config.disable_storage_medium_check ? null : tabletMeta.getStorageMedium(); for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { // create a new tablet with random chosen backends Tablet tablet = new Tablet(idGeneratorBuffer.getNextId()); @@ -2807,30 +2820,15 @@ public class InternalCatalog implements CatalogIf<Database> { // get BackendIds Map<Tag, List<Long>> chosenBackendIds; + if (chooseBackendsArbitrary) { // This is the first colocate table in the group, or just a normal table, // choose backends - if (Config.enable_round_robin_create_tablet) { - if (!Config.disable_storage_medium_check) { - chosenBackendIds = Env.getCurrentSystemInfo() - .getBeIdRoundRobinForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(), - nextIndexs); - } else { - chosenBackendIds = Env.getCurrentSystemInfo() - .getBeIdRoundRobinForReplicaCreation(replicaAlloc, null, - nextIndexs); - } - } else { - if (!Config.disable_storage_medium_check) { - chosenBackendIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(), - isStorageMediumSpecified, false); - } else { - chosenBackendIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, null, - isStorageMediumSpecified, false); - } - } + Pair<Map<Tag, List<Long>>, TStorageMedium> chosenBackendIdsAndMedium + = systemInfoService.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, + storageMedium, isStorageMediumSpecified, false); + chosenBackendIds = chosenBackendIdsAndMedium.first; + storageMedium = chosenBackendIdsAndMedium.second; for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) { backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList()); @@ -2863,6 +2861,7 @@ public class InternalCatalog implements CatalogIf<Database> { ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq); Env.getCurrentEnv().getEditLog().logColocateBackendsPerBucketSeq(info); } + return realStorageMedium; } /* @@ -2992,7 +2991,7 @@ public class InternalCatalog implements CatalogIf<Database> { Partition newPartition = createPartitionWithIndices(db.getClusterName(), db.getId(), copiedTbl.getId(), copiedTbl.getName(), copiedTbl.getBaseIndexId(), newPartitionId, entry.getKey(), copiedTbl.getIndexIdToMeta(), partitionsDistributionInfo.get(oldPartitionId), - copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).getStorageMedium(), + copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId), copiedTbl.getPartitionInfo().getReplicaAllocation(oldPartitionId), null /* version info */, copiedTbl.getCopiedBfColumns(), copiedTbl.getBfFpp(), tabletIdSet, copiedTbl.getCopiedIndexes(), copiedTbl.isInMemory(), copiedTbl.getStorageFormat(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 3a711307bd5..ace2ab3e1e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -51,6 +51,11 @@ public class BeSelectionPolicy { public boolean preferComputeNode = false; public int expectBeNum = 0; + public boolean enableRoundRobin = false; + // if enable round robin, choose next be from nextRoundRobinIndex + // call SystemInfoService::selectBackendIdsByPolicy will update nextRoundRobinIndex + public int nextRoundRobinIndex = -1; + public List<String> preferredLocations = new ArrayList<>(); private BeSelectionPolicy() { @@ -114,6 +119,16 @@ public class BeSelectionPolicy { return this; } + public Builder setEnableRoundRobin(boolean enableRoundRobin) { + policy.enableRoundRobin = enableRoundRobin; + return this; + } + + public Builder setNextRoundRobinIndex(int nextRoundRobinIndex) { + policy.nextRoundRobinIndex = nextRoundRobinIndex; + return this; + } + public BeSelectionPolicy build() { return policy; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 050e1cf94f7..380a976cbef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -54,6 +54,7 @@ import org.jetbrains.annotations.NotNull; import java.io.DataInputStream; import java.io.IOException; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -406,71 +407,21 @@ public class SystemInfoService { return idToBackendRef.values().stream().filter(backend -> backend.isComputeNode()).collect(Collectors.toList()); } - class BeComparator implements Comparator<Backend> { + class BeIdComparator implements Comparator<Backend> { public int compare(Backend a, Backend b) { return (int) (a.getId() - b.getId()); } } - public List<Long> selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy policy, int number, - int nextIndex) { - Preconditions.checkArgument(number >= -1); - List<Backend> candidates = getCandidates(policy); - if (number != -1 && candidates.size() < number) { - LOG.info("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); - return Lists.newArrayList(); - } - - int realIndex = nextIndex % candidates.size(); - List<Long> partialOrderList = new ArrayList<Long>(); - partialOrderList.addAll(candidates.subList(realIndex, candidates.size()) - .stream().map(b -> b.getId()).collect(Collectors.toList())); - partialOrderList.addAll(candidates.subList(0, realIndex) - .stream().map(b -> b.getId()).collect(Collectors.toList())); - - if (number == -1) { - return partialOrderList; - } else { - return partialOrderList.subList(0, number); - } - } - - public List<Backend> getCandidates(BeSelectionPolicy policy) { - List<Backend> candidates = policy.getCandidateBackends(idToBackendRef.values()); - if (candidates.isEmpty()) { - LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size()); - return Lists.newArrayList(); - } - - if (!policy.allowOnSameHost) { - Map<String, List<Backend>> backendMaps = Maps.newHashMap(); - for (Backend backend : candidates) { - if (backendMaps.containsKey(backend.getHost())) { - backendMaps.get(backend.getHost()).add(backend); - } else { - List<Backend> list = Lists.newArrayList(); - list.add(backend); - backendMaps.put(backend.getHost(), list); - } - } - candidates.clear(); - for (List<Backend> list : backendMaps.values()) { - candidates.add(list.get(0)); - } - } - - if (candidates.isEmpty()) { - LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size()); - return Lists.newArrayList(); + class BeHostComparator implements Comparator<Backend> { + public int compare(Backend a, Backend b) { + return a.getHost().compareTo(b.getHost()); } - - Collections.sort(candidates, new BeComparator()); - return candidates; } // Select the smallest number of tablets as the starting position of // round robin in the BE that match the policy - public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium) { + public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium, boolean isStorageMediumSpecified) { BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag)) .setStorageMedium(storageMedium); @@ -479,13 +430,17 @@ public class SystemInfoService { } BeSelectionPolicy policy = builder.build(); - List<Backend> candidates = getCandidates(policy); + List<Long> beIds = selectBackendIdsByPolicy(policy, -1); + if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified) { + storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD; + policy = builder.setStorageMedium(storageMedium).build(); + beIds = selectBackendIdsByPolicy(policy, -1); + } long minBeTabletsNum = Long.MAX_VALUE; int minIndex = -1; - for (int i = 0; i < candidates.size(); ++i) { - long tabletsNum = Env.getCurrentInvertedIndex() - .getTabletIdsByBackendId(candidates.get(i).getId()).size(); + for (int i = 0; i < beIds.size(); ++i) { + long tabletsNum = Env.getCurrentInvertedIndex().getTabletIdsByBackendId(beIds.get(i)).size(); if (tabletsNum < minBeTabletsNum) { minBeTabletsNum = tabletsNum; minIndex = i; @@ -494,48 +449,21 @@ public class SystemInfoService { return minIndex; } - public Map<Tag, List<Long>> getBeIdRoundRobinForReplicaCreation( - ReplicaAllocation replicaAlloc, TStorageMedium storageMedium, - Map<Tag, Integer> nextIndexs) throws DdlException { - Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap(); - Map<Tag, Short> allocMap = replicaAlloc.getAllocMap(); - short totalReplicaNum = 0; - for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) { - BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() - .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey())) - .setStorageMedium(storageMedium); - if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { - builder.allowOnSameHost(); - } - - BeSelectionPolicy policy = builder.build(); - int nextIndex = nextIndexs.get(entry.getKey()); - List<Long> beIds = selectBackendIdsRoundRobinByPolicy(policy, entry.getValue(), nextIndex); - nextIndexs.put(entry.getKey(), nextIndex + beIds.size()); - - if (beIds.isEmpty()) { - throw new DdlException("Failed to find " + entry.getValue() + " backend(s) for policy: " + policy); - } - chosenBackendIds.put(entry.getKey(), beIds); - totalReplicaNum += beIds.size(); - } - Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum()); - return chosenBackendIds; - } - /** * Select a set of backends for replica creation. * The following parameters need to be considered when selecting backends. * * @param replicaAlloc + * @param nextIndexs create tablet round robin next be index, when enable_round_robin_create_tablet * @param storageMedium * @param isStorageMediumSpecified * @param isOnlyForCheck set true if only used for check available backend * @return return the selected backend ids group by tag. * @throws DdlException */ - public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation( - ReplicaAllocation replicaAlloc, TStorageMedium storageMedium, boolean isStorageMediumSpecified, + public Pair<Map<Tag, List<Long>>, TStorageMedium> selectBackendIdsForReplicaCreation( + ReplicaAllocation replicaAlloc, Map<Tag, Integer> nextIndexs, TStorageMedium storageMedium, + boolean isStorageMediumSpecified, boolean isOnlyForCheck) throws DdlException { Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef); @@ -552,6 +480,7 @@ public class SystemInfoService { List<String> failedEntries = Lists.newArrayList(); for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) { + Tag tag = entry.getKey(); BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey())) .setStorageMedium(storageMedium); @@ -559,21 +488,34 @@ public class SystemInfoService { builder.allowOnSameHost(); } + if (Config.enable_round_robin_create_tablet) { + builder.setEnableRoundRobin(true); + builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1)); + } + BeSelectionPolicy policy = builder.build(); List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue()); // first time empty, retry with different storage medium // if only for check, no need to retry different storage medium to get backend + TStorageMedium originalStorageMedium = storageMedium; if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified && !isOnlyForCheck) { storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD; - policy = builder.setStorageMedium(storageMedium).build(); + builder.setStorageMedium(storageMedium); + if (Config.enable_round_robin_create_tablet) { + builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1)); + } + policy = builder.build(); beIds = selectBackendIdsByPolicy(policy, entry.getValue()); } + if (Config.enable_round_robin_create_tablet) { + nextIndexs.put(tag, policy.nextRoundRobinIndex); + } // after retry different storage medium, it's still empty if (beIds.isEmpty()) { - LOG.error("failed backend(s) for policy:" + policy); + LOG.error("failed backend(s) for policy: {} real medium {}", policy, originalStorageMedium); String errorReplication = "replication tag: " + entry.getKey() + ", replication num: " + entry.getValue() - + ", storage medium: " + storageMedium; + + ", storage medium: " + originalStorageMedium; failedEntries.add(errorReplication); } else { chosenBackendIds.put(entry.getKey(), beIds); @@ -589,13 +531,13 @@ public class SystemInfoService { } Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum()); - return chosenBackendIds; + return Pair.of(chosenBackendIds, storageMedium); } /** * Select a set of backends by the given policy. * - * @param policy + * @param policy if policy is enableRoundRobin, will update its nextRoundRobinIndex * @param number number of backends which need to be selected. -1 means return as many as possible. * @return return #number of backend ids, * or empty set if no backends match the policy, or the number of matched backends is less than "number"; @@ -603,50 +545,75 @@ public class SystemInfoService { public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { Preconditions.checkArgument(number >= -1); List<Backend> candidates = policy.getCandidateBackends(idToBackendRef.values()); - if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) { + if (candidates.size() < number || candidates.isEmpty()) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } // If only need one Backend, just return a random one. - if (number == 1) { + if (number == 1 && !policy.enableRoundRobin) { Collections.shuffle(candidates); return Lists.newArrayList(candidates.get(0).getId()); } - if (policy.allowOnSameHost) { - Collections.shuffle(candidates); - if (number == -1) { - return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); - } else { - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + boolean hasSameHost = false; + if (!policy.allowOnSameHost) { + // for each host, random select one backend. + Map<String, List<Backend>> backendMaps = Maps.newHashMap(); + for (Backend backend : candidates) { + if (backendMaps.containsKey(backend.getHost())) { + backendMaps.get(backend.getHost()).add(backend); + } else { + List<Backend> list = Lists.newArrayList(); + list.add(backend); + backendMaps.put(backend.getHost(), list); + } } - } - - // for each host, random select one backend. - Map<String, List<Backend>> backendMaps = Maps.newHashMap(); - for (Backend backend : candidates) { - if (backendMaps.containsKey(backend.getHost())) { - backendMaps.get(backend.getHost()).add(backend); - } else { - List<Backend> list = Lists.newArrayList(); - list.add(backend); - backendMaps.put(backend.getHost(), list); + candidates.clear(); + for (List<Backend> list : backendMaps.values()) { + if (list.size() > 1) { + Collections.shuffle(list); + hasSameHost = true; + } + candidates.add(list.get(0)); } } - candidates.clear(); - for (List<Backend> list : backendMaps.values()) { - Collections.shuffle(list); - candidates.add(list.get(0)); - } - if (number != -1 && candidates.size() < number) { + + if (candidates.size() < number) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } - Collections.shuffle(candidates); - if (number != -1) { - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + + if (policy.enableRoundRobin) { + if (!policy.allowOnSameHost && hasSameHost) { + // not allow same host and has same host, + // then we compare them with their host + Collections.sort(candidates, new BeHostComparator()); + } else { + Collections.sort(candidates, new BeIdComparator()); + } + + if (policy.nextRoundRobinIndex < 0) { + policy.nextRoundRobinIndex = new SecureRandom().nextInt(candidates.size()); + } + + int realIndex = policy.nextRoundRobinIndex % candidates.size(); + List<Long> partialOrderList = new ArrayList<Long>(); + partialOrderList.addAll(candidates.subList(realIndex, candidates.size()) + .stream().map(Backend::getId).collect(Collectors.toList())); + partialOrderList.addAll(candidates.subList(0, realIndex) + .stream().map(Backend::getId).collect(Collectors.toList())); + + List<Long> result = number == -1 ? partialOrderList : partialOrderList.subList(0, number); + policy.nextRoundRobinIndex = realIndex + result.size(); + + return result; } else { - return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); + Collections.shuffle(candidates); + if (number != -1) { + return candidates.subList(0, number).stream().map(Backend::getId).collect(Collectors.toList()); + } else { + return candidates.stream().map(Backend::getId).collect(Collectors.toList()); + } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index 64e92b35c83..d361777fdd5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -39,6 +39,7 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.persist.EditLog; +import org.apache.doris.resource.Tag; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -54,6 +55,7 @@ import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; @@ -153,12 +155,14 @@ public class RestoreJobTest { new Expectations() { { - systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any, - false, true); + systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, + Maps.newHashMap(), (TStorageMedium) any, false, true); minTimes = 0; result = new Delegate() { public synchronized List<Long> selectBackendIdsForReplicaCreation( - ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) { + ReplicaAllocation replicaAlloc, Map<Tag, Integer> nextIndexs, + TStorageMedium medium, boolean isStorageMediumSpecified, + boolean isOnlyForCheck) { List<Long> beIds = Lists.newArrayList(); beIds.add(CatalogMocker.BACKEND1_ID); beIds.add(CatalogMocker.BACKEND2_ID); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java index 2cfa4e9b90f..3bd00d2b73a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java @@ -83,7 +83,7 @@ public class ModifyBackendTest { CreateTableStmt createStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext); ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to find enough backend, please check the replication num,replication tag and storage medium.\n" + "Create failed replications:\n" - + "replication tag: {\"location\" : \"default\"}, replication num: 1, storage medium: SSD", + + "replication tag: {\"location\" : \"default\"}, replication num: 1, storage medium: HDD", () -> DdlExecutor.execute(Env.getCurrentEnv(), createStmt)); createStr = "create table test.tbl1(\n" + "k1 int\n" + ") distributed by hash(k1)\n" + "buckets 3 properties(\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java index 14367ea731e..971abe9b803 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.meta.MetaContext; import org.apache.doris.resource.Tag; @@ -52,11 +53,12 @@ public class ReplicaAllocationTest { public void setUp() throws DdlException { new Expectations() { { - systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any, false, true); + systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, Maps.newHashMap(), + (TStorageMedium) any, false, true); minTimes = 0; result = new Delegate() { - Map<Tag, List<Long>> selectBackendIdsForReplicaCreation() { - return Maps.newHashMap(); + Pair<Map<Tag, List<Long>>, TStorageMedium> selectBackendIdsForReplicaCreation() { + return Pair.of(Maps.newHashMap(), TStorageMedium.HDD); } }; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index bf57f21f02f..61228c821a8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -150,7 +150,7 @@ public class CanalSyncDataTest { result = execPlanFragmentParams; systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, - (TStorageMedium) any, false, true); + Maps.newHashMap(), (TStorageMedium) any, false, true); minTimes = 0; result = backendIds; diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index d207e0ce2a4..9578ed1c7ff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.Pair; import org.apache.doris.meta.MetaContext; import org.apache.doris.resource.Tag; import org.apache.doris.system.SystemInfoService.HostInfo; @@ -403,8 +404,9 @@ public class SystemInfoServiceTest { // also check if the random selection logic can evenly distribute the replica. Map<Long, Integer> beCounterMap = Maps.newHashMap(); for (int i = 0; i < 10000; ++i) { - Map<Tag, List<Long>> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc, - TStorageMedium.HDD, false, false); + Pair<Map<Tag, List<Long>>, TStorageMedium> ret = infoService.selectBackendIdsForReplicaCreation(replicaAlloc, + Maps.newHashMap(), TStorageMedium.HDD, false, false); + Map<Tag, List<Long>> res = ret.first; Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size()); for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) { beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org