This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit e1f833508c089c33aca96cab9f10dfaf5ebd6c98 Merge: 4c259d73f4 0b4288b8be Author: Keith Turner <[email protected]> AuthorDate: Tue Nov 25 22:42:32 2025 +0000 Merge branch '2.1' .../scan/ConfigurableScanServerHostSelector.java | 215 +++++++------ .../spi/scan/ConfigurableScanServerSelector.java | 151 ++++----- .../accumulo/core/spi/scan/RendezvousHasher.java | 336 +++++++++++++++++++++ .../core/spi/scan/ScanServersSnapshot.java | 127 ++++++++ .../ConfigurableScanServerHostSelectorTest.java | 111 +++---- .../scan/ConfigurableScanServerSelectorTest.java | 109 +++++++ .../core/spi/scan/RendezvousHasherTest.java | 201 ++++++++++++ 7 files changed, 1020 insertions(+), 230 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java index f43f21e8c2,fef92c8043..3585f5b2bc --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java @@@ -18,24 -18,21 +18,24 @@@ */ package org.apache.accumulo.core.spi.scan; + import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.HOST; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.util.ArrayList; - import java.util.Collections; - import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; + import java.util.stream.Collectors; ++import org.apache.accumulo.core.data.ResourceGroupId; import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.util.HostAndPort; + - import com.google.common.hash.HashCode; +import com.google.common.net.HostAndPort; /** - * Extension of the {@code ConfigurableScanServerSelector} that can be used when there are multiple + * Extension of the {@link ConfigurableScanServerSelector} that can be used when there are multiple * ScanServers running on the same host and for some reason, like using a shared off-heap cache, * sending scans for the same tablet to the same host may provide a better experience. * @@@ -47,111 -79,110 +82,111 @@@ */ public class ConfigurableScanServerHostSelector extends ConfigurableScanServerSelector { - private static final class PriorHostServersComparator implements Comparator<PriorHostServers> { + /** + * @return map of previous failure keyed on host name with a set of servers per host + */ + Map<String,Set<String>> computeFailuresByHost(TabletId tablet, SelectorParameters params) { + var attempts = params.getAttempts(tablet); + if (attempts.isEmpty()) { + return Map.of(); + } - @Override - public int compare(PriorHostServers o1, PriorHostServers o2) { - return Integer.compare(o1.getPriorServers().size(), o2.getPriorServers().size()); + Map<String,Set<String>> previousFailures = new HashMap<>(); + for (var attempt : attempts) { + var hp = HostAndPort.fromString(attempt.getServer()); + previousFailures.computeIfAbsent(hp.getHost(), h -> new HashSet<>()).add(attempt.getServer()); } + return previousFailures; } - private static final class PriorHostServers { - private final String priorHost; - private final List<String> priorServers = new ArrayList<>(); - - public PriorHostServers(String priorHost) { - this.priorHost = priorHost; - List<String> removeFailedHost(String group, Map<String,Set<String>> prevFailures, ++ List<String> removeFailedHost(ResourceGroupId group, Map<String,Set<String>> prevFailures, + List<String> rendezvousHosts, ScanServersSnapshot serversSnapshot) { + if (prevFailures.isEmpty()) { + return rendezvousHosts; } - public String getPriorHost() { - return priorHost; + // filter out hosts where all servers failed + List<String> availableHost = new ArrayList<>(rendezvousHosts.size()); + for (String host : rendezvousHosts) { + var hostsFailures = prevFailures.getOrDefault(host, Set.of()); + if (hostsFailures.isEmpty()) { + availableHost.add(host); + } else { + var hostsServers = serversSnapshot.getServersForHost(group, host); + if (!hostsFailures.containsAll(hostsServers)) { + // this host has some servers that did not fail + availableHost.add(host); + } + } } - public List<String> getPriorServers() { - return priorServers; + return availableHost; + } + + private List<String> removeFailedServers(Set<String> failedServers, + List<String> scanServersOnHost) { + if (failedServers.isEmpty()) { + return scanServersOnHost; } + return scanServersOnHost.stream().filter(s -> !failedServers.contains(s)) + .collect(Collectors.toList()); } - @Override - protected int selectServers(SelectorParameters params, Profile profile, - List<String> orderedScanServers, Map<TabletId,String> serversToUse) { - - // orderedScanServers is the set of ScanServers addresses (host:port) - // for the resource group designated for the profile being used for - // this scan. We want to group these scan servers by hostname and - // hash the tablet to the hostname, then randomly pick one of the - // scan servers in that group. - - final Map<String,List<String>> scanServerHosts = new HashMap<>(); - for (final String address : orderedScanServers) { - final HostAndPort hp = HostAndPort.fromString(address); - scanServerHosts.computeIfAbsent(hp.getHost(), (k) -> { - return new ArrayList<String>(); - }).add(address); + /** + * Finds the set of scan servers to use for a given attempt. If all servers have previously failed + * for all host this will return an empty list. + * + */ + private List<String> getServersForHostAttempt(int hostAttempt, TabletId tablet, Profile profile, + RendezvousHasher rhasher, Map<String,Set<String>> prevFailures) { + final var snapshot = rhasher.getSnapshot(); + final int numHostToUse = - profile.getNumServers(hostAttempt, snapshot.getHostsForGroup(profile.group).size()); - List<String> rendezvousHosts = - rhasher.rendezvous(HOST, profile.group, tablet, profile.getSalt(hostAttempt), numHostToUse); - rendezvousHosts = removeFailedHost(profile.group, prevFailures, rendezvousHosts, snapshot); ++ profile.getNumServers(hostAttempt, snapshot.getHostsForGroup(profile.getGroupId()).size()); ++ List<String> rendezvousHosts = rhasher.rendezvous(HOST, profile.getGroupId(), tablet, ++ profile.getSalt(hostAttempt), numHostToUse); ++ rendezvousHosts = ++ removeFailedHost(profile.getGroupId(), prevFailures, rendezvousHosts, snapshot); + if (rendezvousHosts.isEmpty()) { + return List.of(); } - final List<String> hostIndex = new ArrayList<>(scanServerHosts.keySet()); - var hostToUse = rendezvousHosts.get(RANDOM.nextInt(rendezvousHosts.size())); - List<String> hostServers = snapshot.getServersForHost(profile.group, hostToUse); ++ var hostToUse = rendezvousHosts.get(RANDOM.get().nextInt(rendezvousHosts.size())); ++ List<String> hostServers = snapshot.getServersForHost(profile.getGroupId(), hostToUse); + return removeFailedServers(prevFailures.getOrDefault(hostToUse, Set.of()), hostServers); + } - final int numberOfPreviousAttempts = params.getTablets().stream() - .mapToInt(tablet -> params.getAttempts(tablet).size()).max().orElse(0); + @Override + int selectServers(SelectorParameters params, Profile profile, RendezvousHasher rhasher, + Map<TabletId,String> serversToUse) { - final int numServersToUseInAttemptPlan = - profile.getNumServers(numberOfPreviousAttempts, orderedScanServers.size()); + int maxHostAttempt = 0; for (TabletId tablet : params.getTablets()) { - - boolean scanServerFound = false; - if (numberOfPreviousAttempts > 0) { - // Sort the prior attempts by the number of scan servers tried in the list - // for each host. In theory the server at the top of the list either has - // scan servers remaining on that host, or has tried them all. - final Map<String,PriorHostServers> priorServers = new HashMap<>(numberOfPreviousAttempts); - params.getAttempts(tablet).forEach(ssa -> { - final String priorServerAddress = ssa.getServer(); - final HostAndPort priorHP = HostAndPort.fromString(priorServerAddress); - priorServers.computeIfAbsent(priorHP.getHost(), (k) -> { - return new PriorHostServers(priorHP.getHost()); - }).getPriorServers().add(priorServerAddress); - }); - final List<PriorHostServers> priors = new ArrayList<>(priorServers.values()); - // sort after populating - Collections.sort(priors, new PriorHostServersComparator()); - - for (PriorHostServers phs : priors) { - final Set<String> scanServersOnPriorHost = - new HashSet<>(scanServerHosts.get(phs.getPriorHost())); - scanServersOnPriorHost.removeAll(phs.getPriorServers()); - if (scanServersOnPriorHost.size() > 0) { - serversToUse.put(tablet, scanServersOnPriorHost.iterator().next()); - scanServerFound = true; - break; - } - } - // If we get here, then we were unable to find a host with a ScanServer that - // we did not try. Remove the hosts from the hostIndex. - for (PriorHostServers phs : priors) { - hostIndex.remove(phs.getPriorHost()); + Map<String,Set<String>> prevFailures = computeFailuresByHost(tablet, params); + + for (int hostAttempt = 0; hostAttempt < profile.getAttemptPlans().size(); hostAttempt++) { + maxHostAttempt = Math.max(hostAttempt, maxHostAttempt); + List<String> scanServers = + getServersForHostAttempt(hostAttempt, tablet, profile, rhasher, prevFailures); + if (!scanServers.isEmpty()) { - String serverToUse = scanServers.get(RANDOM.nextInt(scanServers.size())); ++ String serverToUse = scanServers.get(RANDOM.get().nextInt(scanServers.size())); + serversToUse.put(tablet, serverToUse); + break; } } - if (!scanServerFound) { - if (hostIndex.size() == 0) { - // We tried all servers - serversToUse.put(tablet, null); - } else { - final HashCode hashCode = hashTablet(tablet, profile.getSalt(numberOfPreviousAttempts)); - final int serverIndex = - (Math.abs(hashCode.asInt()) + RANDOM.get().nextInt(numServersToUseInAttemptPlan)) - % hostIndex.size(); - final String hostToUse = hostIndex.get(serverIndex); - final List<String> scanServersOnHost = scanServerHosts.get(hostToUse); - serversToUse.put(tablet, scanServersOnHost.get(0)); + if (!serversToUse.containsKey(tablet) && !prevFailures.isEmpty()) { + // assuming no servers were found because all servers have previously failed, so in the + // case were all servers have previous failed ignore previous failures and try any server + List<String> scanServers = getServersForHostAttempt(profile.getAttemptPlans().size() - 1, + tablet, profile, rhasher, Map.of()); + if (!scanServers.isEmpty()) { - String serverToUse = scanServers.get(RANDOM.nextInt(scanServers.size())); ++ String serverToUse = scanServers.get(RANDOM.get().nextInt(scanServers.size())); + serversToUse.put(tablet, serverToUse); } } - } - return numberOfPreviousAttempts; + return maxHostAttempt; } - } diff --cc core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java index eecf68b905,b54659e33c..3b4c05840b --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java @@@ -18,11 -18,10 +18,11 @@@ */ package org.apache.accumulo.core.spi.scan; - import static java.nio.charset.StandardCharsets.UTF_8; + import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.SERVER; +import static org.apache.accumulo.core.util.LazySingletons.GSON; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.lang.reflect.Type; -import java.security.SecureRandom; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@@ -34,16 -32,15 +33,16 @@@ import java.util.Optional import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; + import java.util.stream.Collectors; ++import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.data.ResourceGroupId; import org.apache.accumulo.core.data.TabletId; + import org.apache.accumulo.core.util.Timer; import com.google.common.base.Preconditions; - import com.google.common.base.Suppliers; import com.google.common.collect.Sets; - import com.google.common.hash.HashCode; - import com.google.common.hash.Hashing; -import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@@ -249,8 -246,8 +247,8 @@@ public class ConfigurableScanServerSele boolean isDefault = false; int busyTimeoutMultiplier; String maxBusyTimeout; -- String group = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME; - String timeToWaitForScanServers = "0s"; ++ String group = Constants.DEFAULT_RESOURCE_GROUP_NAME; + String timeToWaitForScanServers = 100 * 365 + "d"; transient boolean parsed = false; transient long parsedMaxBusyTimeout; @@@ -294,6 -291,10 +292,14 @@@ parse(); return parsedTimeToWaitForScanServers; } + ++ ResourceGroupId getGroupId() { ++ return ResourceGroupId.of(group); ++ } ++ + List<AttemptPlan> getAttemptPlans() { + return attemptPlans; + } } private void parseProfiles(Map<String,String> options) { @@@ -368,18 -389,22 +393,23 @@@ Duration scanServerWaitTime = profile.getTimeToWaitForScanServers(); - var finalProfile = profile; - if (orderedScanServers.isEmpty() && !scanServerWaitTime.isZero()) { - if (rhasher.getSnapshot().getServersForGroup(profile.group).isEmpty() ++ if (rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty() + && !scanServerWaitTime.isZero()) { // Wait for scan servers in the configured group to be present. - orderedScanServers = params.waitUntil( - () -> Optional.ofNullable( - orderedScanServersSupplier.get().get(ResourceGroupId.of(finalProfile.group))), - scanServerWaitTime, "scan servers in group : " + profile.group).orElseThrow(); + rhasher = params.waitUntil(() -> { + var r2 = getRendezvous(); - if (r2.getSnapshot().getServersForGroup(profile.group).isEmpty()) { ++ if (r2.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(r2); + } + }, scanServerWaitTime, "scan servers in group : " + profile.group).orElseThrow(); // at this point the list should be non empty unless there is a bug - Preconditions.checkState(!orderedScanServers.isEmpty()); - Preconditions.checkState(!rhasher.getSnapshot().getServersForGroup(profile.group).isEmpty()); ++ Preconditions ++ .checkState(!rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty()); } - if (orderedScanServers.isEmpty()) { - if (rhasher.getSnapshot().getServersForGroup(profile.group).isEmpty()) { ++ if (rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty()) { // there are no scan servers so fall back to the tablet server return new ScanServerSelections() { @Override @@@ -429,18 -453,26 +458,26 @@@ int attempts = params.getTablets().stream() .mapToInt(tablet -> params.getAttempts(tablet).size()).max().orElse(0); - int numServers = profile.getNumServers(attempts, orderedScanServers.size()); + int numServers = profile.getNumServers(attempts, - rhasher.getSnapshot().getServersForGroup(profile.group).size()); ++ rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).size()); for (TabletId tablet : params.getTablets()) { - - String serverToUse = null; - - var hashCode = hashTablet(tablet, profile.getSalt(attempts)); - - int serverIndex = (Math.abs(hashCode.asInt()) + RANDOM.get().nextInt(numServers)) - % orderedScanServers.size(); - - serverToUse = orderedScanServers.get(serverIndex); - - List<String> rendezvousServers = - rhasher.rendezvous(SERVER, profile.group, tablet, profile.getSalt(attempts), numServers); ++ List<String> rendezvousServers = rhasher.rendezvous(SERVER, profile.getGroupId(), tablet, ++ profile.getSalt(attempts), numServers); + + var tabletAttempts = params.getAttempts(tablet); + if (!tabletAttempts.isEmpty()) { + // remove servers that failed in previous attempts + var attemptServers = + tabletAttempts.stream().map(ScanServerAttempt::getServer).collect(Collectors.toSet()); + var copy = rendezvousServers.stream().filter(server -> !attemptServers.contains(server)) + .collect(Collectors.toList()); + if (!copy.isEmpty()) { + // pick from the servers that did not previously fail + rendezvousServers = copy; + } // else all servers have failed, so just try any one of them again + } + // pick a random server from the set of rendezvous servers - String serverToUse = rendezvousServers.get(RANDOM.nextInt(rendezvousServers.size())); ++ String serverToUse = rendezvousServers.get(RANDOM.get().nextInt(rendezvousServers.size())); serversToUse.put(tablet, serverToUse); } return attempts; diff --cc core/src/main/java/org/apache/accumulo/core/spi/scan/RendezvousHasher.java index 0000000000,cf4c753a40..b321ba2122 mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/RendezvousHasher.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/RendezvousHasher.java @@@ -1,0 -1,334 +1,336 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.core.spi.scan; + + import static java.nio.charset.StandardCharsets.UTF_8; + + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Comparator; + import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Objects; + import java.util.PriorityQueue; + import java.util.Set; + ++import org.apache.accumulo.core.data.ResourceGroupId; + import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.util.HostAndPort; + import org.apache.hadoop.io.Text; + + import com.github.benmanes.caffeine.cache.Cache; + import com.github.benmanes.caffeine.cache.Caffeine; + import com.github.benmanes.caffeine.cache.Weigher; + import com.google.common.base.Preconditions; + import com.google.common.hash.HashCode; + import com.google.common.hash.Hashing; ++import com.google.common.net.HostAndPort; + + /** + * Computes and caches the computations of rendezvous hashes to map tablets to scan servers. + */ + // intentionally package private so that it is not part of SPI, this is just internal utility code + class RendezvousHasher { + private static class ServerHash implements Comparable<ServerHash> { + final HashCode hash; + final String server; + + private ServerHash(HashCode hash, String server) { + this.hash = hash; + this.server = server; + } + + @Override + public boolean equals(Object o) { + if (o instanceof ServerHash) { + return compareTo((ServerHash) o) == 0; + } + return false; + } + + @Override + public int hashCode() { + return hash.asInt(); + } + + @Override + public int compareTo(ServerHash o) { + return Arrays.compare(hash.asBytes(), o.hash.asBytes()); + } + } + + private static class CacheKey { + final Mode mode; - final String group; ++ final ResourceGroupId group; + final TabletId tablet; + final int desiredServers; + final String salt; + - private CacheKey(Mode mode, String group, TabletId tablet, int numServers, String salt) { ++ private CacheKey(Mode mode, ResourceGroupId group, TabletId tablet, int numServers, ++ String salt) { + this.mode = mode; + this.group = group; + this.tablet = tablet; + this.desiredServers = numServers; + this.salt = salt; + } + + @Override + public int hashCode() { + return Objects.hash(mode, group, tablet, desiredServers, salt); + } + + @Override + public boolean equals(Object o) { + if (o instanceof CacheKey) { + var ock = (CacheKey) o; + return mode == ock.mode && group.equals(ock.group) && tablet.equals(ock.tablet) + && desiredServers == ock.desiredServers && Objects.equals(salt, ock.salt); + } + + return false; + } + } + + private final ScanServersSnapshot snapshot; + private final Cache<CacheKey,List<String>> cache; + + RendezvousHasher(ScanServersSnapshot snapshot, int maxCacheSize) { + Weigher<CacheKey,List<String>> weigher = (cacheKey, servers) -> { + + // for the list of servers the length of the strings is not considered because the server + // strings may be pointed to by multiple cache entries, so only considering the pointer size + return 64 + cacheKey.tablet.getTable().canonical().length() + + weigh(cacheKey.tablet.getEndRow()) + weigh(cacheKey.tablet.getPrevEndRow()) + + servers.size() * 8; + }; + this.snapshot = snapshot; + cache = Caffeine.newBuilder().weigher(weigher).maximumWeight(maxCacheSize).build(); + } + + enum Mode { + SERVER, HOST + } + + /** + * Finds "desiredServers" number of servers for a given tablet using rendezvous hashing. For a + * given tablet+salt the same subset of servers will always be computed from the same set of all + * servers. The set of servers computed for a tablet+salt is likely to be stable as the set of all + * servers changes, but may change sometimes. This allows many clients to usually choose the same + * set of scan servers for a given tablet even as the set of scan servers changes over time. + */ - List<String> rendezvous(Mode mode, String group, TabletId tablet, String salt, ++ List<String> rendezvous(Mode mode, ResourceGroupId group, TabletId tablet, String salt, + int desiredServers) { + if (mode == Mode.SERVER && desiredServers >= getSnapshot().getServersForGroup(group).size()) { + // no need to compute rendezvous hash because all servers are wanted + return getSnapshot().getServersForGroupAsList(group); + } + + if (mode == Mode.HOST && desiredServers >= getSnapshot().getHostsForGroup(group).size()) { + // no need to compute rendezvous hash because all hosts are wanted + return getSnapshot().getHostsForGroup(group); + } + + var cacheKey = new CacheKey(mode, group, tablet, desiredServers, salt); + + // Because the computations are derived from an immutable snapshot, it is safe to cache the + // computations result as it should always be the same. + return cache.get(cacheKey, this::rendezvousHash); + } + + /** + * @return the snapshot of scan servers used to create this hasher + */ + ScanServersSnapshot getSnapshot() { + return snapshot; + } + + private List<String> rendezvousHash(CacheKey ck) { + switch (ck.mode) { + case HOST: + return rendezvousHashHost(ck); + case SERVER: + return rendezvousHashServers(ck); + default: + throw new IllegalArgumentException(ck.mode.name()); + } + } + + private List<String> rendezvousHashServers(CacheKey ck) { + Preconditions.checkState(ck.desiredServers > 0, "%s", ck.desiredServers); + + if (ck.desiredServers == 1) { + // optimization that does minimal work for finding a single server + ServerHash minServerHash = findMinHash(ck); + + if (minServerHash == null) { + return List.of(); + } else { + return List.of(minServerHash.server); + } + } else { + // Tracks the maximum minimums seen. Used to find the least M hashes w/o sorting all N hashes. + // This algorithm should be O(N*log2(M)) which is a bit better than O(N*log2(N)) when sorting + // everything. + var pq = new PriorityQueue<ServerHash>(ck.desiredServers, Comparator.reverseOrder()); + Iterator<String> iter = getSnapshot().getServersForGroup(ck.group).iterator(); + + // populate the priority queue with the first hashes, these are initial set of minimum hashes + while (iter.hasNext() && pq.size() < ck.desiredServers) { + var server = iter.next(); + var hc = hash(ck.tablet, server, ck.salt); + pq.add(new ServerHash(hc, server)); + } + + // look through the rest of the hashes finding the rest of the minimum hashes + while (iter.hasNext()) { + var server = iter.next(); + var hc = hash(ck.tablet, server, ck.salt); + var serverHash = new ServerHash(hc, server); + if (serverHash.compareTo(pq.peek()) < 0) { + // This hash is less than the maximum minimum seen so replace it. + pq.poll(); // Remove from priority queue before adding to avoid increasing its array size + // and causing an internal reallocation + pq.add(serverHash); + } + } + + List<String> found = new ArrayList<>(pq.size()); + pq.forEach(serverHash -> found.add(serverHash.server)); + // return an immutable list as it will be cached and potentially shared many times + return List.copyOf(found); + } + } + + /** + * Finds a set of host using rendezvous hashing compensating for host that have different numbers + * of servers. When choosing a host the goal of this code is to select it based on its relative + * number of servers. So if TS is the total number of servers and a host has HS servers then the + * probability of that host being chosen for any tablet should be HS/TS. For example if there are + * 30 hosts and 295 servers running across all the hosts then for a host with 3 servers its + * probability of being chosen should be 3/295=1.02% across all tablets. If the host with 3 + * servers were chosen uniformly from the 30 host then it would be chosen 1/30=3.33% of the time + * instead of 1.02% which would cause its scan servers to have higher load than other scan + * servers. + * + * <p> + * The implementation for achieving this goal is to sort all servers (not host) by hash and then + * look for N unique host in the first part of the sorted list. If a host has fewer servers than + * other host then it will show up less frequently in the beginning of the list on average for + * different tablets. This roughly achieves the goal of this code, in limited testing it seems to + * skew slightly higher than the desired probability. This approach could benefit from more + * testing and looking at the statistics, suspect the greedy nature of the algorithm may throw + * things off a bit. + */ + private List<String> rendezvousHashHost(CacheKey ck) { + + Set<String> allServers = getSnapshot().getServersForGroup(ck.group); + if (ck.desiredServers == 1) { + // optimization for finding a single host, avoids sorting everything and allocates fewer + // objects + ServerHash minServerHash = findMinHash(ck); + + if (minServerHash == null) { + return List.of(); + } else { + return List.of(HostAndPort.fromString(minServerHash.server).getHost()); + } + } else { + List<ServerHash> allHashes = new ArrayList<>(allServers.size()); + for (String server : allServers) { + var hc = hash(ck.tablet, server, ck.salt); + var serverHash = new ServerHash(hc, server); + allHashes.add(serverHash); + } + + // TODO can this be optimized to avoid sorting all servers? might be a good follow on + allHashes.sort(Comparator.naturalOrder()); + + Set<String> hostsSeen = new HashSet<>(); + Iterator<ServerHash> iter = allHashes.iterator(); + while (hostsSeen.size() < ck.desiredServers && iter.hasNext()) { + String host = HostAndPort.fromString(iter.next().server).getHost(); + hostsSeen.add(host); + } + + // return an immutable list as it will be cached and potentially shared many times + return List.copyOf(hostsSeen); + } + } + + private ServerHash findMinHash(CacheKey ck) { + ServerHash minServerHash = null; + // This code only exists as a performance optimization to avoid sorting all data for the case of + // finding a single server. It could be shorter if using streams, but would probably be much + // slower which would defeat its reason for existence. + Iterator<String> iter = getSnapshot().getServersForGroup(ck.group).iterator(); + + // this initial check avoids doing a null check in the subsequent while loop + if (iter.hasNext()) { + String server = iter.next(); + var hc = hash(ck.tablet, server, ck.salt); + minServerHash = new ServerHash(hc, server); + } + + while (iter.hasNext()) { + String server = iter.next(); + var hc = hash(ck.tablet, server, ck.salt); + var serverHash = new ServerHash(hc, server); + if (serverHash.compareTo(minServerHash) < 0) { + minServerHash = serverHash; + } + } + + return minServerHash; + } + + private static int weigh(Text t) { + if (t == null) { + return 8; + } else { + return 8 + t.getBytes().length; + } + } + + private HashCode hash(TabletId tablet, String server, String salt) { + var hasher = Hashing.murmur3_128().newHasher(); + + if (tablet.getEndRow() != null) { + hasher.putBytes(tablet.getEndRow().getBytes(), 0, tablet.getEndRow().getLength()); + } else { + hasher.putByte((byte) 5); + } + + if (tablet.getPrevEndRow() != null) { + hasher.putBytes(tablet.getPrevEndRow().getBytes(), 0, tablet.getPrevEndRow().getLength()); + } else { + hasher.putByte((byte) 7); + } + + hasher.putString(tablet.getTable().canonical(), UTF_8); + + hasher.putString(server, UTF_8); + + if (salt != null && !salt.isEmpty()) { + hasher.putString(salt, UTF_8); + } + + return hasher.hash(); + } + } diff --cc core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServersSnapshot.java index 0000000000,309d16aa37..61721f2e04 mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServersSnapshot.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServersSnapshot.java @@@ -1,0 -1,125 +1,127 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.core.spi.scan; + + import static java.util.stream.Collectors.groupingBy; + import static java.util.stream.Collectors.mapping; + import static java.util.stream.Collectors.toUnmodifiableSet; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + -import org.apache.accumulo.core.util.HostAndPort; ++import org.apache.accumulo.core.data.ResourceGroupId; ++ ++import com.google.common.net.HostAndPort; + + /** + * An immutable snapshot of information about scan servers + */ + // This class is intentionally package private + class ScanServersSnapshot { + + private static class PerHostInfo { + final List<String> hosts; + final Map<String,List<String>> serversByHost; + + PerHostInfo(Collection<String> servers) { + this.serversByHost = new HashMap<>(); + + servers.forEach(server -> { + var hp = HostAndPort.fromString(server); + serversByHost.computeIfAbsent(hp.getHost(), h -> new ArrayList<>()).add(server); + }); + + hosts = List.copyOf(serversByHost.keySet()); + } + + List<String> getServersForHost(String host) { + return Collections.unmodifiableList(serversByHost.getOrDefault(host, List.of())); + } + } + + // All code in this class assumes this map is an immutable snapshot and computes derivative data + // structures from its data as needed. - private final Map<String,Set<String>> serversByGroup; - private final Map<String,PerHostInfo> perHostInfo; - private final Map<String,List<String>> serversByGroupList; ++ private final Map<ResourceGroupId,Set<String>> serversByGroup; ++ private final Map<ResourceGroupId,PerHostInfo> perHostInfo; ++ private final Map<ResourceGroupId,List<String>> serversByGroupList; + - private ScanServersSnapshot(Map<String,Set<String>> serversByGroup) { ++ private ScanServersSnapshot(Map<ResourceGroupId,Set<String>> serversByGroup) { + this.serversByGroup = serversByGroup; + this.perHostInfo = new ConcurrentHashMap<>(); + this.serversByGroupList = new ConcurrentHashMap<>(); + } + - Set<String> getServersForGroup(String group) { ++ Set<String> getServersForGroup(ResourceGroupId group) { + return serversByGroup.getOrDefault(group, Set.of()); + } + - List<String> getServersForGroupAsList(String group) { ++ List<String> getServersForGroupAsList(ResourceGroupId group) { + // compute the list as needed since it is not always needed + return serversByGroupList.computeIfAbsent(group, g -> List.copyOf(getServersForGroup(g))); + } + - private PerHostInfo getPerHostInfo(String group) { ++ private PerHostInfo getPerHostInfo(ResourceGroupId group) { + return perHostInfo.computeIfAbsent(group, g -> new PerHostInfo(getServersForGroup(g))); + } + - List<String> getHostsForGroup(String group) { ++ List<String> getHostsForGroup(ResourceGroupId group) { + return getPerHostInfo(group).hosts; + } + - List<String> getServersForHost(String group, String host) { ++ List<String> getServersForHost(ResourceGroupId group, String host) { + return getPerHostInfo(group).getServersForHost(host); + } + + @Override + public boolean equals(Object o) { + if (o instanceof ScanServersSnapshot) { + // only need to compare this fields as the other fields are derived from the information in + // it. + return serversByGroup.equals(((ScanServersSnapshot) o).serversByGroup); + } + return false; + } + + @Override + public int hashCode() { + return serversByGroup.hashCode(); + } + + static ScanServersSnapshot from(Collection<ScanServerInfo> scanServerInfos) { + Objects.requireNonNull(scanServerInfos); + // create an initial immutable snapshot of the information + var initialSnapshot = + Map.copyOf(scanServerInfos.stream().collect(groupingBy(ScanServerInfo::getGroup, + mapping(ScanServerInfo::getAddress, toUnmodifiableSet())))); + return new ScanServersSnapshot(initialSnapshot); + } + - static ScanServersSnapshot from(Map<String,Set<String>> serversByGroup) { - var copy = new HashMap<String,Set<String>>(); ++ static ScanServersSnapshot from(Map<ResourceGroupId,Set<String>> serversByGroup) { ++ var copy = new HashMap<ResourceGroupId,Set<String>>(); + serversByGroup.forEach((k, v) -> copy.put(k, Set.copyOf(v))); + return new ScanServersSnapshot(Map.copyOf(copy)); + } + } diff --cc core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java index b77bd026ba,9cfbffb490..b3dfc24fe5 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java @@@ -18,9 -18,9 +18,10 @@@ */ package org.apache.accumulo.core.spi.scan; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; + import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@@ -545,4 -530,113 +546,112 @@@ public class ConfigurableScanServerSele assertTrue(Set.of("ss1:1", "ss2:2", "ss3:3").contains(actions.getScanServer(tabletId))); assertFalse(scanServers.get().isEmpty()); } + + @Test + public void testServerSetChanges() throws Exception { + String defaultProfile = + "{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s'," + + "'attemptPlans':[{'servers':3, 'busyTimeout':'60s'}]}"; + + var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', '"')); + + ConfigurableScanServerSelector selector = new ConfigurableScanServerSelector(); + + var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME; + // start off w/ one scan server - AtomicReference<Map<String,String>> scanServers = - new AtomicReference<>(Map.of("localhost:8000", dg)); ++ AtomicReference<Map<String,ResourceGroupId>> scanServers = ++ new AtomicReference<>(Map.of("localhost:8000", ResourceGroupId.of(dg))); + + selector.init(new InitParams(scanServers::get, opts)); + + for (int i = 0; i < 50; i++) { + var tabletId = nti("" + i, "m"); + assertEquals("localhost:8000", + selector.selectServers(new SelectorParams(tabletId)).getScanServer(tabletId)); + assertEquals("localhost:8000", + selector.selectServers(new SelectorParams(tabletId)).getScanServer(tabletId)); + } + + // add some new scan servers, the selector should eventually pick these up and start making + // different decisions - HashMap<String,String> newServers = new HashMap<>(); ++ HashMap<String,ResourceGroupId> newServers = new HashMap<>(); + for (int i = 0; i < 30; i++) { - newServers.put(String.format("localhost:%d", 8000 + i), dg); ++ newServers.put(String.format("localhost:%d", 8000 + i), ResourceGroupId.of(dg)); + } + // add some servers in another RG, these should be ignored + for (int i = 0; i < 30; i++) { - newServers.put(String.format("localhost:%d", 9000 + i), "other"); ++ newServers.put(String.format("localhost:%d", 9000 + i), ResourceGroupId.of("other")); + } + scanServers.set(newServers); + + // wait until the new scan servers are noticed + var tabletId = nti("1", "m"); + while ("localhost:8000" + .equals(selector.selectServers(new SelectorParams(tabletId)).getScanServer(tabletId))) { + Thread.sleep(100); + } + + // now should see tablet spread across the new scan servers servers + HashSet<String> allServersSeen = new HashSet<>(); + for (int i = 0; i < 100; i++) { + tabletId = nti("" + i, "m"); + HashSet<String> serversSeen = new HashSet<>(); + for (int j = 0; j < 30; j++) { + var server = selector.selectServers(new SelectorParams(tabletId)).getScanServer(tabletId); + serversSeen.add(server); + allServersSeen.add(server); + } + // each tablet should spread across three servers + assertEquals(3, serversSeen.size()); + } + // all tablets should spread across all scan servers + assertEquals(30, allServersSeen.size()); + } + + /** + * Test that previous failures are not used again unless all servers have failed + */ + @Test + public void testPreviousFailures() { - var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME; - HashMap<String,String> servers = new HashMap<>(); ++ HashMap<String,ResourceGroupId> servers = new HashMap<>(); + for (int i = 0; i < 30; i++) { - servers.put(String.format("localhost:%d", 8000 + i), dg); ++ servers.put(String.format("localhost:%d", 8000 + i), ResourceGroupId.DEFAULT); + } + + String defaultProfile = + "{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s'," + + "'attemptPlans':[{'servers':3, 'busyTimeout':'60s'}]}"; + var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', '"')); + ConfigurableScanServerSelector selector = new ConfigurableScanServerSelector(); + selector.init(new InitParams(() -> servers, opts)); + + var tabletId = nti("1", "m"); + var selected = selector.selectServers(new SelectorParams(tabletId)).getScanServer(tabletId); + assertTrue(servers.containsKey(selected)); + + // try selecting again, should pick a different server + var attempts = new HashSet<ScanServerAttempt>(); + attempts.add(new TestScanServerAttempt(selected, ScanServerAttempt.Result.BUSY)); + var selected2 = + selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, attempts), Map.of())) + .getScanServer(tabletId); + assertTrue(servers.containsKey(selected2)); + assertNotEquals(selected, selected2); + + // try selecting again, should pick a different server + attempts.add(new TestScanServerAttempt(selected2, ScanServerAttempt.Result.BUSY)); + var selected3 = + selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, attempts), Map.of())) + .getScanServer(tabletId); + assertTrue(servers.containsKey(selected3)); + assertNotEquals(selected, selected3); + assertNotEquals(selected2, selected3); + + // try selecting again, at this point all servers failed so should try any one of them + attempts.add(new TestScanServerAttempt(selected3, ScanServerAttempt.Result.BUSY)); + var selected4 = + selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, attempts), Map.of())) + .getScanServer(tabletId); + assertTrue(Set.of(selected, selected2, selected3).contains(selected4)); + } } diff --cc core/src/test/java/org/apache/accumulo/core/spi/scan/RendezvousHasherTest.java index 0000000000,708cc26c7a..2ba334a698 mode 000000,100644..100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/RendezvousHasherTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/RendezvousHasherTest.java @@@ -1,0 -1,196 +1,201 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.core.spi.scan; + + import static java.util.stream.Collectors.toSet; + import static org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelectorTest.nti; + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + ++import org.apache.accumulo.core.data.ResourceGroupId; + import org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode; -import org.apache.accumulo.core.util.HostAndPort; + import org.junit.jupiter.api.Test; + ++import com.google.common.net.HostAndPort; ++ + public class RendezvousHasherTest { ++ private static final ResourceGroupId G1 = ResourceGroupId.of("g1"); ++ private static final ResourceGroupId G2 = ResourceGroupId.of("g2"); ++ + @Test + public void testSameServerCountPerHost() { - Map<String,Set<String>> serversByGroup = new HashMap<>(); ++ Map<ResourceGroupId,Set<String>> serversByGroup = new HashMap<>(); + + Set<String> g1Servers = new HashSet<>(); + for (int h = 0; h < 10; h++) { + String host = "g1h" + h; + for (int p = 1000; p < 1005; p++) { + g1Servers.add(host + ":" + p); + } + } - serversByGroup.put("g1", g1Servers); ++ serversByGroup.put(G1, g1Servers); + + Set<String> g2Servers = Set.of("g2h1:5000", "g2h1:5001", "g2h1:5002"); - serversByGroup.put("g2", g2Servers); ++ serversByGroup.put(G2, g2Servers); + + ScanServersSnapshot snapshot = ScanServersSnapshot.from(serversByGroup); + RendezvousHasher hasher = new RendezvousHasher(snapshot, 1_000_000); + + for (int i = 1; i < 5; i++) { - var g2rh = hasher.rendezvous(Mode.SERVER, "g2", nti("1", "e"), "s1", i); ++ var g2rh = hasher.rendezvous(Mode.SERVER, G2, nti("1", "e"), "s1", i); + assertEquals(Math.min(i, 3), g2rh.size()); + assertTrue(g2Servers.containsAll(g2rh)); + - g2rh = hasher.rendezvous(Mode.HOST, "g2", nti("1", "e"), "s1", i); ++ g2rh = hasher.rendezvous(Mode.HOST, G2, nti("1", "e"), "s1", i); + assertEquals(List.of("g2h1"), g2rh); + } + + int diffCount = 0; + + Map<String,Integer> useCounts = new HashMap<>(); + + for (int t = 0; t < 200; t++) { + var tablet = nti("1", "" + t); - var servers_s1_1 = Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", tablet, "s1", 1)); - var servers_s1_2 = Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", tablet, "s1", 2)); - var servers_s1_3 = Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", tablet, "s1", 3)); ++ var servers_s1_1 = Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, tablet, "s1", 1)); ++ var servers_s1_2 = Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, tablet, "s1", 2)); ++ var servers_s1_3 = Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, tablet, "s1", 3)); + // the same tablet and salt should hash to the same servers - assertEquals(servers_s1_1, Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", tablet, "s1", 1))); - assertEquals(servers_s1_2, Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", tablet, "s1", 2))); - assertEquals(servers_s1_3, Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", tablet, "s1", 3))); ++ assertEquals(servers_s1_1, Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, tablet, "s1", 1))); ++ assertEquals(servers_s1_2, Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, tablet, "s1", 2))); ++ assertEquals(servers_s1_3, Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, tablet, "s1", 3))); + assertEquals(1, servers_s1_1.size()); + assertEquals(2, servers_s1_2.size()); + assertEquals(3, servers_s1_3.size()); + // should contain the smaller set of servers + assertTrue(servers_s1_2.containsAll(servers_s1_1)); + assertTrue(servers_s1_3.containsAll(servers_s1_2)); + + assertTrue(g1Servers.containsAll(servers_s1_1)); + assertTrue(g1Servers.containsAll(servers_s1_2)); + assertTrue(g1Servers.containsAll(servers_s1_3)); + + for (var server : servers_s1_3) { + useCounts.merge(server, 1, Integer::sum); + } + - var hosts_s1_1 = hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 1); - var hosts_s1_2 = hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 2); - var hosts_s1_3 = hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 3); ++ var hosts_s1_1 = hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 1); ++ var hosts_s1_2 = hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 2); ++ var hosts_s1_3 = hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 3); + // the same tablet and salt should hash to the same hosts - assertEquals(hosts_s1_1, hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 1)); - assertEquals(hosts_s1_2, hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 2)); - assertEquals(hosts_s1_3, hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 3)); ++ assertEquals(hosts_s1_1, hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 1)); ++ assertEquals(hosts_s1_2, hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 2)); ++ assertEquals(hosts_s1_3, hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 3)); + assertEquals(1, hosts_s1_1.size()); + assertEquals(2, hosts_s1_2.size()); + assertEquals(3, hosts_s1_3.size()); + assertTrue(hosts_s1_2.containsAll(hosts_s1_1)); + assertTrue(hosts_s1_3.containsAll(hosts_s1_2)); + assertTrue(hosts_s1_3.stream().noneMatch(h -> h.contains(":"))); + + // the way these two are computed the hosts within the servers should be a subset of the + // rendezvous host + assertEquals(hosts_s1_1.iterator().next(), + HostAndPort.fromString(servers_s1_1.iterator().next()).getHost()); + assertTrue(Set.copyOf(hosts_s1_2).containsAll( + servers_s1_2.stream().map(s -> HostAndPort.fromString(s).getHost()).collect(toSet()))); + assertTrue(Set.copyOf(hosts_s1_3).containsAll( + servers_s1_3.stream().map(s -> HostAndPort.fromString(s).getHost()).collect(toSet()))); + + // try a different salt, should usually result in different servers - var servers_s2_3 = Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", tablet, "s2", 3)); - assertEquals(servers_s2_3, Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", tablet, "s2", 3))); ++ var servers_s2_3 = Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, tablet, "s2", 3)); ++ assertEquals(servers_s2_3, Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, tablet, "s2", 3))); + assertEquals(3, servers_s2_3.size()); + assertTrue(g1Servers.containsAll(servers_s2_3)); + if (!servers_s1_3.equals(servers_s2_3)) { + diffCount++; + } + } + + var stats = useCounts.values().stream().mapToInt(i -> i).summaryStatistics(); + assertEquals(50, stats.getCount()); + assertEquals(12, stats.getAverage()); + assertEquals(7, stats.getMin()); + assertEquals(18, stats.getMax()); + + assertEquals(200, diffCount); + } + + @Test + public void testDifferentServerCountPerHost() { - Map<String,Set<String>> serversByGroup = new HashMap<>(); ++ Map<ResourceGroupId,Set<String>> serversByGroup = new HashMap<>(); + + Set<String> g1Servers = new HashSet<>(); + // 10 host with 10 servers + for (int h = 0; h < 10; h++) { + String host = "g1h" + h; + for (int p = 1000; p < 1010; p++) { + g1Servers.add(host + ":" + p); + } + } + // four host w/ two servers + g1Servers.add("gh1a:1000"); + g1Servers.add("gh1a:1001"); + g1Servers.add("gh1b:1000"); + g1Servers.add("gh1b:1001"); + g1Servers.add("gh1c:1000"); + g1Servers.add("gh1c:1001"); + g1Servers.add("gh1d:1000"); + g1Servers.add("gh1d:1001"); + - serversByGroup.put("g1", g1Servers); ++ serversByGroup.put(G1, g1Servers); + + ScanServersSnapshot snapshot = ScanServersSnapshot.from(serversByGroup); + RendezvousHasher hasher = new RendezvousHasher(snapshot, 1_000_000); + + Map<String,Integer> useCounts = new HashMap<>(); + + for (int t = 0; t < 10_000; t++) { + var tablet = nti("1", "" + t); - var hosts_s1_3 = hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 3); ++ var hosts_s1_3 = hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 3); + for (var host : hosts_s1_3) { + useCounts.merge(host, 1, Integer::sum); + } + } + + // Verify the hosts with only 2 servers gets picked a lot less than the host with 10 servers. + // There are 108 total servers, ideally the 4 hosts with only 2 servers would get + // 8.0/108/.0=7.4% of the total usage and not 4/14=29% based being 4 of 14 host. + var stats = useCounts.entrySet().stream().filter(e -> !e.getKey().matches("gh1[abcd]")) + .mapToInt(Map.Entry::getValue).summaryStatistics(); + + // the 10 host w/ 10 servers should get around 30,000 * 100.0 / 108.0 / 10 = 2777 usage each + assertTrue(stats.getAverage() > 2777 - 100 && stats.getAverage() < 2777 + 100); + assertTrue(stats.getMin() > 2777 - 200); + assertTrue(stats.getMax() < 2777 + 200); + + // the 4 host w/ 2 servers should get around 30,000 * 8/108/4 = 555 usage each. Experimenting w/ + // different scenarios, anecdotally this function seems to skew higher than the desired average. + // Suspect this is an artifact of the greedy algorithm used to implement host base rendezvous + // hashing. + var stats2 = useCounts.entrySet().stream().filter(e -> e.getKey().matches("gh1[abcd]")) + .mapToInt(Map.Entry::getValue).summaryStatistics(); + assertTrue(stats2.getAverage() > 555 - 100 && stats2.getAverage() < 555 + 100); + assertTrue(stats2.getMin() > 555 - 200); + assertTrue(stats2.getMax() < 555 + 200); + + // the two groups of hosts should get all usage + assertEquals(30_000, stats2.getSum() + stats.getSum()); + } + }
