This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 61166162843c1937debfa5479176a6c80646fc76 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed May 11 09:40:57 2022 +0800 [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478) There are many places in FE where a group of BE nodes needs to be selected according to certain requirements. for example: 1. When creating replicas for a tablet. 2. When selecting a BE to execute Insert. 3. When Stream Load forwards http requests to BE nodes. These operations all have the same logic. So this CL mainly changes: 1. Create a new `BeSelectionPolicy` class to describe the set of conditions for selecting BE. 2. The logic of selecting BE nodes in `SystemInfoService` has been refactored, and the following two methods are used uniformly: 1. `selectBackendIdsByPolicy`: Select the required number of BE nodes according to the `BeSelectionPolicy`. 2. `selectBackendIdsForReplicaCreation`: Select the BE node for the replica creation operation. Note that there are some changes here: For the replica creation operation, the round-robin method was used to select BE nodes before, but now it is changed to `random` selection for the following reasons: 1. Although the previous logic is round-robin, it is actually random. 2. The final diff of the random algorithm will not be greater than 5%, so it can be considered that the random algorithm can distribute the data evenly. --- .../java/org/apache/doris/backup/RestoreJob.java | 9 +- .../java/org/apache/doris/catalog/Catalog.java | 9 +- .../java/org/apache/doris/catalog/OlapTable.java | 3 +- .../org/apache/doris/httpv2/rest/LoadAction.java | 53 ++-- .../apache/doris/httpv2/util/LoadSubmitter.java | 19 +- .../apache/doris/qe/InsertStreamTxnExecutor.java | 14 +- .../java/org/apache/doris/qe/MultiLoadMgr.java | 14 +- .../org/apache/doris/system/BeSelectionPolicy.java | 131 ++++++++++ .../org/apache/doris/system/SystemInfoService.java | 251 +++++-------------- .../org/apache/doris/backup/RestoreJobTest.java | 132 +--------- .../org/apache/doris/catalog/CreateTableTest.java | 6 +- .../apache/doris/catalog/ModifyBackendTest.java | 6 +- .../doris/load/sync/canal/CanalSyncDataTest.java | 7 +- .../java/org/apache/doris/qe/MultiLoadMgrTest.java | 13 +- .../apache/doris/system/SystemInfoServiceTest.java | 268 +++++++++++++++++++++ 15 files changed, 525 insertions(+), 410 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 30878a29ca..f29ab14be1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -74,9 +74,6 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTaskType; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -87,6 +84,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Table.Cell; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -1018,7 +1018,8 @@ public class RestoreJob extends AbstractJob { // replicas try { - Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null); + Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo() + .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null); for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) { for (Long beId : entry.getValue()) { long newReplicaId = catalog.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 7621d0008c..3bb59eac55 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -270,6 +270,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; +import javax.annotation.Nullable; import java.io.BufferedReader; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -296,8 +297,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import javax.annotation.Nullable; - public class Catalog { private static final Logger LOG = LogManager.getLogger(Catalog.class); // 0 ~ 9999 used for qe @@ -4524,10 +4523,12 @@ public class Catalog { // This is the first colocate table in the group, or just a normal table, // randomly choose backends if (!Config.disable_storage_medium_check) { - chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, + chosenBackendIds = + getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, tabletMeta.getStorageMedium()); } else { - chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null); + chosenBackendIds = + getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null); } for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 4b015c685d..253d53521e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -500,7 +500,8 @@ public class OlapTable extends Table { // replicas try { - Map<Tag, List<Long>> tag2beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters( + Map<Tag, List<Long>> tag2beIds = + Catalog.getCurrentSystemInfo().selectBackendIdsForReplicaCreation( replicaAlloc, db.getClusterName(), null); for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) { for (Long beId : entry3.getValue()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index b9c85dbdfb..ebdda12926 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -21,12 +21,14 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.LoadException; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TNetworkAddress; @@ -41,12 +43,10 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.view.RedirectView; -import java.util.List; - import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import io.netty.handler.codec.http.HttpHeaderNames; +import java.util.List; @RestController public class LoadAction extends RestBaseController { @@ -145,21 +145,7 @@ public class LoadAction extends RestBaseController { return new RestBaseResult(e.getMessage()); } } else { - // Choose a backend sequentially. - SystemInfoService.BeAvailablePredicate beAvailablePredicate = - new SystemInfoService.BeAvailablePredicate(false, false, true); - List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( - 1, beAvailablePredicate, false, clusterName, null, null); - if (backendIds == null) { - return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); - } - - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); - if (backend == null) { - return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); - } - - redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + redirectAddr = selectRedirectBackend(clusterName); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", @@ -194,22 +180,7 @@ public class LoadAction extends RestBaseController { return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected."); } - // Choose a backend sequentially. - SystemInfoService.BeAvailablePredicate beAvailablePredicate = - new SystemInfoService.BeAvailablePredicate(false, false, true); - List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( - 1, beAvailablePredicate, false, clusterName, null, null); - if (backendIds == null) { - return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); - } - - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); - if (backend == null) { - return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); - } - - TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); - + TNetworkAddress redirectAddr = selectRedirectBackend(clusterName); LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}", redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation); @@ -220,4 +191,18 @@ public class LoadAction extends RestBaseController { return new RestBaseResult(e.getMessage()); } } + + private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException { + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(clusterName).needLoadAvailable().build(); + List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (backendIds.isEmpty()) { + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + } + + Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); + if (backend == null) { + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + } + return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java index 1a715cf0e3..31aeba813d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java @@ -19,10 +19,11 @@ package org.apache.doris.httpv2.util; import org.apache.doris.catalog.Catalog; import org.apache.doris.cluster.ClusterNamespace; -import org.apache.doris.common.DdlException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.httpv2.rest.UploadAction; import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.system.SystemInfoService; import com.google.common.base.Strings; @@ -136,19 +137,15 @@ public class LoadSubmitter { return file; } - private Backend selectOneBackend() throws DdlException { - SystemInfoService.BeAvailablePredicate beAvailablePredicate = - new SystemInfoService.BeAvailablePredicate(false, false, true); - List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( - 1, beAvailablePredicate, false, - SystemInfoService.DEFAULT_CLUSTER, null, null); - if (backendIds == null) { - throw new DdlException("No alive backend"); + private Backend selectOneBackend() throws LoadException { + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build(); + List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (backendIds.isEmpty()) { + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); if (backend == null) { - throw new DdlException("No alive backend"); + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } return backend; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 2cb8fe4ea6..ea0cb6cfda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -26,7 +26,7 @@ import org.apache.doris.proto.Types; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; -import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -63,13 +63,11 @@ public class InsertStreamTxnExecutor { StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); - SystemInfoService.BeAvailablePredicate beAvailablePredicate = - new SystemInfoService.BeAvailablePredicate(false, true, true); - List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( - 1, beAvailablePredicate, false, - txnEntry.getDb().getClusterName(), null, null); - if (beIds == null || beIds.isEmpty()) { - throw new UserException("there is no backend load available or scanNode backend available."); + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName()) + .needLoadAvailable().needQueryAvailable().build(); + List<Long> beIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (beIds.isEmpty()) { + throw new UserException("No available backend to match the policy: " + policy); } tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index fd8cf80cac..606d68e6c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -41,6 +41,7 @@ import org.apache.doris.load.loadv2.JobState; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TNetworkAddress; @@ -91,14 +92,13 @@ public class MultiLoadMgr { if (infoMap.containsKey(multiLabel)) { throw new LabelAlreadyUsedException(label); } - MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties); - SystemInfoService.BeAvailablePredicate beAvailablePredicate = - new SystemInfoService.BeAvailablePredicate(false, false, true); - List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(1, - beAvailablePredicate, false, ConnectContext.get().getClusterName(), null, null); - if (backendIds == null) { - throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(ConnectContext.get().getClusterName()) + .needLoadAvailable().build(); + List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (backendIds.isEmpty()) { + throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + " policy: " + policy); } + MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties); multiLoadDesc.setBackendId(backendIds.get(0)); infoMap.put(multiLabel, multiLoadDesc); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java new file mode 100644 index 0000000000..e995c0aff5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.system; + +import org.apache.doris.resource.Tag; +import org.apache.doris.thrift.TStorageMedium; + +import com.google.common.collect.Sets; + +import java.util.Set; + +/** + * Selection policy for building BE nodes + */ +public class BeSelectionPolicy { + public String cluster = SystemInfoService.DEFAULT_CLUSTER; + public boolean needScheduleAvailable = false; + public boolean needQueryAvailable = false; + public boolean needLoadAvailable = false; + // Resource tag. Empty means no need to consider resource tag. + public Set<Tag> resourceTags = Sets.newHashSet(); + // storage medium. null means no need to consider storage medium. + public TStorageMedium storageMedium = null; + // Check if disk usage reaches limit. false means no need to check. + public boolean checkDiskUsage = false; + // If set to false, do not select backends on same host. + public boolean allowOnSameHost = false; + + private BeSelectionPolicy() { + + } + + public static class Builder { + private BeSelectionPolicy policy; + public Builder() { + policy = new BeSelectionPolicy(); + } + + public Builder setCluster(String cluster) { + policy.cluster = cluster; + return this; + } + + public Builder needScheduleAvailable() { + policy.needScheduleAvailable = true; + return this; + } + + public Builder needQueryAvailable() { + policy.needQueryAvailable = true; + return this; + } + + public Builder needLoadAvailable() { + policy.needLoadAvailable = true; + return this; + } + + public Builder addTags(Set<Tag> tags) { + policy.resourceTags.addAll(tags); + return this; + } + + public Builder setStorageMedium(TStorageMedium medium) { + policy.storageMedium = medium; + return this; + } + + public Builder needCheckDiskUsage() { + policy.checkDiskUsage = true; + return this; + } + + public Builder allowOnSameHost() { + policy.allowOnSameHost = true; + return this; + } + + public BeSelectionPolicy build() { + return policy; + } + } + + public boolean isMatch(Backend backend) { + if (needScheduleAvailable && !backend.isScheduleAvailable() + || needQueryAvailable && !backend.isQueryAvailable() + || needLoadAvailable && !backend.isLoadAvailable() + || !resourceTags.isEmpty() && !resourceTags.contains(backend.getTag()) + || storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) { + return false; + } + + if (checkDiskUsage) { + if (storageMedium == null && backend.diskExceedLimit()) { + return false; + } + if (storageMedium != null && backend.diskExceedLimitByStorageMedium(storageMedium)) { + return false; + } + } + return true; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("cluster|query|load|schedule|tags|medium: "); + sb.append(cluster).append("|"); + sb.append(needQueryAvailable).append("|"); + sb.append(needLoadAvailable).append("|"); + sb.append(needScheduleAvailable).append("|"); + sb.append(resourceTags).append("|"); + sb.append(storageMedium); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 39e577f55a..82571cd3d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -40,7 +40,6 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -61,10 +60,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import java.util.stream.Stream; public class SystemInfoService { private static final Logger LOG = LogManager.getLogger(SystemInfoService.class); @@ -75,42 +72,16 @@ public class SystemInfoService { public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is no scanNode Backend available."; - public static class BeAvailablePredicate { - private boolean scheduleAvailable; + private volatile ImmutableMap<Long, Backend> idToBackendRef = ImmutableMap.of(); + private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = ImmutableMap.of(); - private boolean queryAvailable; - - private boolean loadAvailable; - - public BeAvailablePredicate(boolean scheduleAvailable, boolean queryAvailable, boolean loadAvailable) { - this.scheduleAvailable = scheduleAvailable; - this.queryAvailable = queryAvailable; - this.loadAvailable = loadAvailable; - } - - public boolean isMatch(Backend backend) { - if (scheduleAvailable && !backend.isScheduleAvailable() || queryAvailable && !backend.isQueryAvailable() || - loadAvailable && !backend.isLoadAvailable()) { - return false; - } - return true; - } - } - - private volatile ImmutableMap<Long, Backend> idToBackendRef; - private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef; - - // last backend id used by round robin for sequential choosing backends for - // tablet creation - private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap; - // last backend id used by round robin for sequential choosing backends in - // other jobs - private ConcurrentHashMap<String, Long> lastBackendIdForOtherMap; + // last backend id used by round robin for sequential selecting backends for replica creation + private Map<Tag, Long> lastBackendIdForReplicaCreation = Maps.newConcurrentMap(); private long lastBackendIdForCreation = -1; private long lastBackendIdForOther = -1; - private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef; + private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef = ImmutableMap.of(); // sort host backends list by num of backends, descending private static final Comparator<List<Backend>> hostBackendsListComparator = new Comparator<List<Backend>>() { @@ -124,15 +95,6 @@ public class SystemInfoService { } }; - public SystemInfoService() { - idToBackendRef = ImmutableMap.<Long, Backend>of(); - idToReportVersionRef = ImmutableMap.<Long, AtomicLong>of(); - - lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>(); - lastBackendIdForOtherMap = new ConcurrentHashMap<String, Long>(); - pathHashToDishInfoRef = ImmutableMap.<Long, DiskInfo>of(); - } - // for deploy manager public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws UserException { addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG); @@ -432,9 +394,6 @@ public class SystemInfoService { LOG.warn("not enough available backends. require :" + instanceNum + " get:" + chosenBackendIds.size()); return null; } - - lastBackendIdForCreationMap.put(clusterName, (long) -1); - lastBackendIdForOtherMap.put(clusterName, (long) -1); return chosenBackendIds; } @@ -462,9 +421,6 @@ public class SystemInfoService { } } } - - lastBackendIdForCreationMap.remove(clusterName); - lastBackendIdForOtherMap.remove(clusterName); } /** @@ -779,21 +735,35 @@ public class SystemInfoService { } - // Find enough backend to allocate replica of a tablet. - // filters include: tag, cluster, storage medium - public Map<Tag, List<Long>> chooseBackendIdByFilters(ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium) + /** + * Select a set of backends for replica creation. + * The following parameters need to be considered when selecting backends. + * + * @param replicaAlloc + * @param clusterName + * @param storageMedium + * @return return the selected backend ids group by tag. + * @throws DdlException + */ + public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation( + ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium) throws DdlException { Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap(); Map<Tag, Short> allocMap = replicaAlloc.getAllocMap(); short totalReplicaNum = 0; - BeAvailablePredicate beAvailablePredicate = new BeAvailablePredicate(true, false, false); + for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) { - List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(entry.getValue(), - beAvailablePredicate, true, clusterName, storageMedium, entry.getKey()); - if (beIds == null) { - throw new DdlException("Failed to find enough host with storage medium and tag(" - + (storageMedium == null ? "NaN" : storageMedium) + "/" + entry.getKey() - + ") in all backends. need: " + entry.getValue()); + BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName) + .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey())) + .setStorageMedium(storageMedium); + if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { + builder.allowOnSameHost(); + } + + BeSelectionPolicy policy = builder.build(); + List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue()); + if (beIds.isEmpty()) { + throw new DdlException("Failed to find " + entry.getValue() + " backends for policy: " + policy); } chosenBackendIds.put(entry.getKey(), beIds); totalReplicaNum += beIds.size(); @@ -802,61 +772,34 @@ public class SystemInfoService { return chosenBackendIds; } - public List<Long> seqChooseBackendIdsByStorageMediumAndTag(int backendNum, BeAvailablePredicate beAvailablePredicate, - boolean isCreate, String clusterName, - TStorageMedium storageMedium, Tag tag) { - Stream<Backend> beStream = getClusterBackends(clusterName).stream(); - if (storageMedium == null) { - beStream = beStream.filter(v -> !v.diskExceedLimit()); - } else { - beStream = beStream.filter(v -> !v.diskExceedLimitByStorageMedium(storageMedium)); + /** + * Select a set of backends by the given policy. + * + * @param policy + * @param number number of backends which need to be selected. + * @return return #number of backend ids, + * or empty set if no backends match the policy, or the number of matched backends is less than "number"; + */ + public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { + List<Backend> candidates = + idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList()); + if (candidates.size() < number) { + return Lists.newArrayList(); } - if (tag != null) { - beStream = beStream.filter(v -> v.getTag().equals(tag)); + // If only need one Backend, just return a random one. + if (number == 1) { + Collections.shuffle(candidates); + return Lists.newArrayList(candidates.get(0).getId()); } - final List<Backend> backends = beStream.collect(Collectors.toList()); - return seqChooseBackendIds(backendNum, beAvailablePredicate, isCreate, clusterName, backends); - } - // choose backends by round robin - // return null if not enough backend - // use synchronized to run serially - public synchronized List<Long> seqChooseBackendIds(int backendNum, BeAvailablePredicate beAvailablePredicate, - boolean isCreate, String clusterName, - final List<Backend> srcBackends) { - long lastBackendId; - - if (clusterName.equals(DEFAULT_CLUSTER)) { - if (isCreate) { - lastBackendId = lastBackendIdForCreation; - } else { - lastBackendId = lastBackendIdForOther; - } - } else { - if (isCreate) { - if (lastBackendIdForCreationMap.containsKey(clusterName)) { - lastBackendId = lastBackendIdForCreationMap.get(clusterName); - } else { - lastBackendId = -1; - lastBackendIdForCreationMap.put(clusterName, lastBackendId); - } - } else { - if (lastBackendIdForOtherMap.containsKey(clusterName)) { - lastBackendId = lastBackendIdForOtherMap.get(clusterName); - } else { - lastBackendId = -1; - lastBackendIdForOtherMap.put(clusterName, lastBackendId); - } - } + if (policy.allowOnSameHost) { + Collections.shuffle(candidates); + return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); } - // host -> BE list - List<Backend> sourceBackend = srcBackends; - if (sourceBackend == null) { - sourceBackend = getClusterBackends(clusterName); - } + // for each host, random select one backend. Map<String, List<Backend>> backendMaps = Maps.newHashMap(); - for (Backend backend : sourceBackend) { + for (Backend backend : candidates) { if (backendMaps.containsKey(backend.getHost())) { backendMaps.get(backend.getHost()).add(backend); } else { @@ -865,94 +808,16 @@ public class SystemInfoService { backendMaps.put(backend.getHost(), list); } } - - // if more than one backend exists in same host, select a backend at random - List<Backend> backends = Lists.newArrayList(); + candidates.clear(); for (List<Backend> list : backendMaps.values()) { - if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { - backends.addAll(list); - } else { - list = list.stream().filter(beAvailablePredicate::isMatch).collect(Collectors.toList()); - if (list.isEmpty()) { - continue; - } - Collections.shuffle(list); - backends.add(list.get(0)); - } + Collections.shuffle(list); + candidates.add(list.get(0)); } - - Collections.shuffle(backends); - - List<Long> backendIds = Lists.newArrayList(); - // get last backend index - int lastBackendIndex = -1; - int index = -1; - for (Backend backend : backends) { - index++; - if (backend.getId() == lastBackendId) { - lastBackendIndex = index; - break; - } + if (candidates.size() < number) { + return Lists.newArrayList(); } - Iterator<Backend> iterator = Iterators.cycle(backends); - index = -1; - boolean failed = false; - // 2 cycle at most - int maxIndex = 2 * backends.size(); - while (iterator.hasNext() && backendIds.size() < backendNum) { - Backend backend = iterator.next(); - index++; - if (index <= lastBackendIndex) { - continue; - } - - if (index > maxIndex) { - failed = true; - break; - } - - if (!beAvailablePredicate.isMatch(backend)) { - continue; - } - - long backendId = backend.getId(); - if (!backendIds.contains(backendId)) { - backendIds.add(backendId); - lastBackendId = backendId; - } else { - failed = true; - break; - } - } - - if (clusterName.equals(DEFAULT_CLUSTER)) { - if (isCreate) { - lastBackendIdForCreation = lastBackendId; - } else { - lastBackendIdForOther = lastBackendId; - } - } else { - // update last backendId - if (isCreate) { - lastBackendIdForCreationMap.put(clusterName, lastBackendId); - } else { - lastBackendIdForOtherMap.put(clusterName, lastBackendId); - } - } - if (backendIds.size() != backendNum) { - failed = true; - } - - if (!failed) { - return backendIds; - } - - // debug - for (Backend backend : backends) { - LOG.debug("random select: {}", backend.toString()); - } - - return null; + Collections.shuffle(candidates); + return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); } public ImmutableMap<Long, Backend> getIdToBackend() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index 90fc55a891..aaab4b3155 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -22,7 +22,6 @@ import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo; import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo; import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo; import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo; -import org.apache.doris.backup.RestoreJob.RestoreJobState; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; @@ -38,35 +37,19 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.persist.EditLog; -import org.apache.doris.resource.Tag; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.task.AgentTask; -import org.apache.doris.task.AgentTaskQueue; -import org.apache.doris.task.DirMoveTask; -import org.apache.doris.task.DownloadTask; -import org.apache.doris.task.SnapshotTask; -import org.apache.doris.thrift.TBackend; -import org.apache.doris.thrift.TFinishTaskRequest; -import org.apache.doris.thrift.TStatus; -import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageMedium; -import org.apache.doris.thrift.TTaskType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; - import mockit.Delegate; import mockit.Expectations; import mockit.Injectable; @@ -161,12 +144,12 @@ public class RestoreJobTest { new Expectations() { { - systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, - anyBoolean, anyString, (TStorageMedium) any, (Tag) any); + systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, + anyString, (TStorageMedium) any); minTimes = 0; result = new Delegate() { - public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive, - boolean isCreate, String clusterName) { + public synchronized List<Long> selectBackendIdsForReplicaCreation( + ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) { List<Long> beIds = Lists.newArrayList(); beIds.add(CatalogMocker.BACKEND1_ID); beIds.add(CatalogMocker.BACKEND2_ID); @@ -259,113 +242,6 @@ public class RestoreJobTest { backupMeta = new BackupMeta(tbls, resources); } - @Ignore - @Test - public void testRun() { - // pending - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState()); - Assert.assertEquals(12, job.getFileMapping().getMapping().size()); - - // 2. snapshoting - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState()); - Assert.assertEquals(12 * 2, AgentTaskQueue.getTaskNum()); - - // 3. snapshot finished - List<AgentTask> agentTasks = Lists.newArrayList(); - Map<TTaskType, Set<Long>> runningTasks = Maps.newHashMap(); - agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks)); - agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks)); - agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks)); - Assert.assertEquals(12 * 2, agentTasks.size()); - - for (AgentTask agentTask : agentTasks) { - if (agentTask.getTaskType() != TTaskType.MAKE_SNAPSHOT) { - continue; - } - - SnapshotTask task = (SnapshotTask) agentTask; - String snapshotPath = "/path/to/snapshot/" + System.currentTimeMillis(); - TStatus taskStatus = new TStatus(TStatusCode.OK); - TBackend tBackend = new TBackend("", 0, 1); - TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT, - task.getSignature(), taskStatus); - request.setSnapshotPath(snapshotPath); - Assert.assertTrue(job.finishTabletSnapshotTask(task, request)); - } - - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState()); - - // download - AgentTaskQueue.clearAllTasks(); - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState()); - Assert.assertEquals(9, AgentTaskQueue.getTaskNum()); - - // downloading - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState()); - - List<AgentTask> downloadTasks = Lists.newArrayList(); - runningTasks = Maps.newHashMap(); - downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks)); - downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks)); - downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks)); - Assert.assertEquals(9, downloadTasks.size()); - - List<Long> downloadedTabletIds = Lists.newArrayList(); - for (AgentTask agentTask : downloadTasks) { - TStatus taskStatus = new TStatus(TStatusCode.OK); - TBackend tBackend = new TBackend("", 0, 1); - TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT, - agentTask.getSignature(), taskStatus); - request.setDownloadedTabletIds(downloadedTabletIds); - Assert.assertTrue(job.finishTabletDownloadTask((DownloadTask) agentTask, request)); - } - - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.COMMIT, job.getState()); - - // commit - AgentTaskQueue.clearAllTasks(); - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.COMMITTING, job.getState()); - Assert.assertEquals(12, AgentTaskQueue.getTaskNum()); - - // committing - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.COMMITTING, job.getState()); - - List<AgentTask> dirMoveTasks = Lists.newArrayList(); - runningTasks = Maps.newHashMap(); - dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks)); - dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks)); - dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks)); - Assert.assertEquals(12, dirMoveTasks.size()); - - for (AgentTask agentTask : dirMoveTasks) { - TStatus taskStatus = new TStatus(TStatusCode.OK); - TBackend tBackend = new TBackend("", 0, 1); - TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT, - agentTask.getSignature(), taskStatus); - job.finishDirMoveTask((DirMoveTask) agentTask, request); - } - - job.run(); - Assert.assertEquals(Status.OK, job.getStatus()); - Assert.assertEquals(RestoreJobState.FINISHED, job.getState()); - } - @Test public void testSignature() throws AnalysisException { Adler32 sig1 = new Adler32(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index 24b3c89bbc..e4f97607d9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -241,8 +241,7 @@ public class CreateTableTest { + "properties('replication_num' = '1', 'short_key' = '4');")); ExceptionChecker - .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " + - "tag(NaN/{\"location\" : \"default\"}) in all backends. need: 3", + .expectThrowsWithMsg(DdlException.class, "Failed to find 3 backends for policy", () -> createTable("create table test.atbl5\n" + "(k1 int, k2 int, k3 int)\n" + "duplicate key(k1, k2, k3)\n" + "distributed by hash(k1) buckets 1\n" + "properties('replication_num' = '3');")); @@ -259,8 +258,7 @@ public class CreateTableTest { ConfigBase.setMutableConfig("disable_storage_medium_check", "false"); ExceptionChecker - .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " + - "tag(SSD/{\"location\" : \"default\"}) in all backends. need: 1", + .expectThrowsWithMsg(DdlException.class, " Failed to find 1 backends for policy:", () -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n" + "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');")); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java index d755a6a65e..1ded070267 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java @@ -86,7 +86,7 @@ public class ModifyBackendTest { ");"; CreateTableStmt createStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext); ExceptionChecker.expectThrowsWithMsg(DdlException.class, - "Failed to find enough host with storage medium and tag(HDD/{\"location\" : \"default\"}) in all backends. need: 1", + "Failed to find 1 backends for policy:", () -> DdlExecutor.execute(Catalog.getCurrentCatalog(), createStmt)); createStr = "create table test.tbl1(\n" + @@ -119,7 +119,7 @@ public class ModifyBackendTest { Database db = Catalog.getCurrentCatalog().getDbNullable("default_cluster:test"); Table tbl3 = db.getTableNullable("tbl3"); String err = Catalog.getCurrentCatalog().getDynamicPartitionScheduler().getRuntimeInfo(tbl3.getId(), DynamicPartitionScheduler.CREATE_PARTITION_MSG); - Assert.assertTrue(err.contains("Failed to find enough host with storage medium and tag")); + Assert.assertTrue(err.contains("Failed to find 1 backends for policy:")); createStr = "create table test.tbl4(\n" + "k1 date, k2 int\n" + @@ -171,7 +171,7 @@ public class ModifyBackendTest { + " set ('replication_allocation' = 'tag.location.zonex:1')"; AlterTableStmt alterStmt2 = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStr, connectContext); ExceptionChecker.expectThrowsWithMsg(DdlException.class, - "Failed to find enough host with tag({\"location\" : \"zonex\"}) in all backends. need: 1", + "Failed to find enough host with tag", () -> DdlExecutor.execute(Catalog.getCurrentCatalog(), alterStmt2)); tblProperties = tableProperty.getProperties(); Assert.assertTrue(tblProperties.containsKey("default.replication_allocation")); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index a3051c65dd..3b1b4eddfc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -20,12 +20,12 @@ package org.apache.doris.load.sync.canal; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; -import org.apache.doris.resource.Tag; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -59,7 +59,6 @@ import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; - import mockit.Expectations; import mockit.Mock; import mockit.MockUp; @@ -150,8 +149,8 @@ public class CanalSyncDataTest { minTimes = 0; result = execPlanFragmentParams; - systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, anyBoolean, anyString, - (TStorageMedium) any, (Tag) any); + systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, + anyString, (TStorageMedium) any); minTimes = 0; result = backendIds; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java index 8d4d09ae13..bd099083b1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java @@ -20,9 +20,8 @@ package org.apache.doris.qe; import org.apache.doris.backup.CatalogMocker; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.DdlException; -import org.apache.doris.resource.Tag; +import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Lists; @@ -31,7 +30,6 @@ import org.junit.Before; import org.junit.Test; import java.util.List; - import mockit.Delegate; import mockit.Expectations; import mockit.Mocked; @@ -45,7 +43,7 @@ public class MultiLoadMgrTest { @Mocked private SystemInfoService systemInfoService; @Before - public void setUp() { + public void setUp() throws Exception { new Expectations() { { ConnectContext.get(); @@ -62,13 +60,10 @@ public class MultiLoadMgrTest { }; new Expectations() { { - systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, - anyBoolean, anyString, (TStorageMedium) any, (Tag) any); + systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt); minTimes = 0; result = new Delegate() { - public synchronized List<Long> seqChooseBackendIdsByStorageMediumAndTag( - int backendNum, SystemInfoService.BeAvailablePredicate availablePredicate, - boolean isCreate, String clusterName, TStorageMedium medium, Tag tag) { + public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { List<Long> beIds = Lists.newArrayList(); beIds.add(CatalogMocker.BACKEND1_ID); beIds.add(CatalogMocker.BACKEND2_ID); diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java new file mode 100644 index 0000000000..b2570095a0 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.system; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.persist.EditLog; +import org.apache.doris.resource.Tag; +import org.apache.doris.thrift.TStorageMedium; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import mockit.Expectations; +import mockit.Mocked; + +public class SystemInfoServiceTest { + + @Mocked + private Catalog catalog; + @Mocked + private EditLog editLog; + + private SystemInfoService infoService; + + @Before + public void setUp() { + new Expectations() { + { + catalog.getEditLog(); + minTimes = 0; + result = editLog; + + editLog.logAddBackend((Backend) any); + minTimes = 0; + + Catalog.getCurrentCatalog(); + minTimes = 0; + result = catalog; + } + }; + + infoService = new SystemInfoService(); + } + + private void addBackend(long beId, String host, int hbPort) { + Backend backend = new Backend(beId, host, hbPort); + infoService.addBackend(backend); + } + + @Test + public void testSelectBackendIdsByPolicy() throws Exception { + // 1. no backend + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build(); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size()); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 4).size()); + + // 2. add one backend but not alive + addBackend(10001, "192.168.1.1", 9050); + Backend be1 = infoService.getBackend(10001); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size()); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 0).size()); + // policy with no condition + BeSelectionPolicy policy2 = new BeSelectionPolicy.Builder().build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy2, 1).size()); + + // 3. add more backends + addBackend(10002, "192.168.1.2", 9050); + Backend be2 = infoService.getBackend(10002); + be2.setAlive(true); + addBackend(10003, "192.168.1.3", 9050); + Backend be3 = infoService.getBackend(10003); + be3.setAlive(true); + addBackend(10004, "192.168.1.4", 9050); + Backend be4 = infoService.getBackend(10004); + be4.setAlive(true); + addBackend(10005, "192.168.1.5", 9050); + Backend be5 = infoService.getBackend(10005); + + // b1 and be5 is dead, be2,3,4 is alive + BeSelectionPolicy policy3 = new BeSelectionPolicy.Builder().needScheduleAvailable().build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3, 1).size()); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10001L)); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10005L)); + Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy3, 2).size()); + Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy3, 3).size()); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10002L)); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10003L)); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10004L)); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy3, 4).size()); + + // 4. set be status + be2.setLoadDisabled(true); + be3.setQueryDisabled(true); + be4.setDecommissioned(true); + // now, only b3,b4 is loadable, only be2,b4 is queryable, only be2,3 is schedulable + BeSelectionPolicy policy4 = new BeSelectionPolicy.Builder().needScheduleAvailable().build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4, 1).size()); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10001L)); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10004L)); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10005L)); + Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy4, 2).size()); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10002L)); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10003L)); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy4, 3).size()); + + BeSelectionPolicy policy5 = new BeSelectionPolicy.Builder().needLoadAvailable().build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy5, 1).size()); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10001L)); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10002L)); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10005L)); + Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy5, 2).size()); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10003L)); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10004L)); + + // 5. set tags + // reset all be + be1.setAlive(true); + be2.setLoadDisabled(false); + be3.setQueryDisabled(false); + be5.setAlive(true); + be3.setAlive(true); + be4.setAlive(true); + be4.setDecommissioned(false); + be5.setAlive(true); + BeSelectionPolicy policy6 = new BeSelectionPolicy.Builder().needQueryAvailable().build(); + Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy6, 5).size()); + + Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga"); + Tag tagb = Tag.create(Tag.TYPE_LOCATION, "tagb"); + be1.setTag(taga); + be2.setTag(taga); + be3.setTag(tagb); + be4.setTag(tagb); + be5.setTag(tagb); + + BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga)).build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy7, 1).size()); + Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy7, 2).size()); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10001L)); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10002L)); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy7, 3).size()); + + BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(tagb)).build(); + Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy8, 3).size()); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10003L)); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10004L)); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10005L)); + + BeSelectionPolicy policy9 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga, tagb)).build(); + Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy9, 5).size()); + + // 6. check storage medium + addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 1 * 1024 * 1024L); + addDisk(be2, "path2", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + addDisk(be3, "path3", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + addDisk(be4, "path4", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + addDisk(be5, "path5", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + + BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb)) + .setStorageMedium(TStorageMedium.SSD).build(); + Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size()); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size()); + + BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb)) + .setStorageMedium(TStorageMedium.HDD).build(); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size()); + + // 7. check disk usage + BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) + .setStorageMedium(TStorageMedium.HDD).build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size()); + BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) + .setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build(); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy13, 1).size()); + + // 8. check same host + addBackend(10006, "192.168.1.1", 9051); + Backend be6 = infoService.getBackend(10006); + be6.setTag(taga); + be6.setAlive(true); + addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L); + addDisk(be6, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L); + BeSelectionPolicy policy14 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) + .setStorageMedium(TStorageMedium.HDD).build(); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy14, 2).size()); + BeSelectionPolicy policy15 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) + .setStorageMedium(TStorageMedium.HDD).allowOnSameHost().build(); + Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy15, 2).size()); + } + + @Test + public void testSelectBackendIdsForReplicaCreation() throws Exception { + addBackend(10001, "192.168.1.1", 9050); + Backend be1 = infoService.getBackend(10001); + addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + be1.setAlive(true); + addBackend(10002, "192.168.1.2", 9050); + Backend be2 = infoService.getBackend(10002); + addDisk(be2, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + be2.setAlive(true); + addBackend(10003, "192.168.1.3", 9050); + Backend be3 = infoService.getBackend(10003); + addDisk(be3, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + be3.setAlive(true); + addBackend(10004, "192.168.1.4", 9050); + Backend be4 = infoService.getBackend(10004); + addDisk(be4, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + be4.setAlive(true); + addBackend(10005, "192.168.1.5", 9050); + Backend be5 = infoService.getBackend(10005); + addDisk(be5, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L); + be5.setAlive(true); + + ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; + // also check if the random selection logic can evenly distribute the replica. + Map<Long, Integer> beCounterMap = Maps.newHashMap(); + for (int i = 0; i < 10000; ++i) { + Map<Tag, List<Long>> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc, + SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.HDD); + Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size()); + for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) { + beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1); + } + } + System.out.println(beCounterMap); + List<Integer> list = Lists.newArrayList(beCounterMap.values()); + Collections.sort(list); + int diff = list.get(list.size() - 1) - list.get(0); + // The max replica num and min replica num's diff is less than 5%. + Assert.assertTrue((diff * 1.0 / list.get(0)) < 0.05); + } + + private void addDisk(Backend be, String path, TStorageMedium medium, long totalB, long availB) { + DiskInfo diskInfo1 = new DiskInfo(path); + diskInfo1.setTotalCapacityB(totalB); + diskInfo1.setAvailableCapacityB(availB); + diskInfo1.setStorageMedium(medium); + Map<String, DiskInfo> map = Maps.newHashMap(); + map.put(diskInfo1.getRootPath(), diskInfo1); + be.setDisks(ImmutableMap.copyOf(map)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org