This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit e1f833508c089c33aca96cab9f10dfaf5ebd6c98
Merge: 4c259d73f4 0b4288b8be
Author: Keith Turner <[email protected]>
AuthorDate: Tue Nov 25 22:42:32 2025 +0000

    Merge branch '2.1'

 .../scan/ConfigurableScanServerHostSelector.java   | 215 +++++++------
 .../spi/scan/ConfigurableScanServerSelector.java   | 151 ++++-----
 .../accumulo/core/spi/scan/RendezvousHasher.java   | 336 +++++++++++++++++++++
 .../core/spi/scan/ScanServersSnapshot.java         | 127 ++++++++
 .../ConfigurableScanServerHostSelectorTest.java    | 111 +++----
 .../scan/ConfigurableScanServerSelectorTest.java   | 109 +++++++
 .../core/spi/scan/RendezvousHasherTest.java        | 201 ++++++++++++
 7 files changed, 1020 insertions(+), 230 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
index f43f21e8c2,fef92c8043..3585f5b2bc
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
@@@ -18,24 -18,21 +18,24 @@@
   */
  package org.apache.accumulo.core.spi.scan;
  
+ import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.HOST;
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  
  import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
+ import java.util.stream.Collectors;
  
++import org.apache.accumulo.core.data.ResourceGroupId;
  import org.apache.accumulo.core.data.TabletId;
 -import org.apache.accumulo.core.util.HostAndPort;
 +
- import com.google.common.hash.HashCode;
 +import com.google.common.net.HostAndPort;
  
  /**
-  * Extension of the {@code ConfigurableScanServerSelector} that can be used 
when there are multiple
+  * Extension of the {@link ConfigurableScanServerSelector} that can be used 
when there are multiple
   * ScanServers running on the same host and for some reason, like using a 
shared off-heap cache,
   * sending scans for the same tablet to the same host may provide a better 
experience.
   *
@@@ -47,111 -79,110 +82,111 @@@
   */
  public class ConfigurableScanServerHostSelector extends 
ConfigurableScanServerSelector {
  
-   private static final class PriorHostServersComparator implements 
Comparator<PriorHostServers> {
+   /**
+    * @return map of previous failure keyed on host name with a set of servers 
per host
+    */
+   Map<String,Set<String>> computeFailuresByHost(TabletId tablet, 
SelectorParameters params) {
+     var attempts = params.getAttempts(tablet);
+     if (attempts.isEmpty()) {
+       return Map.of();
+     }
  
-     @Override
-     public int compare(PriorHostServers o1, PriorHostServers o2) {
-       return Integer.compare(o1.getPriorServers().size(), 
o2.getPriorServers().size());
+     Map<String,Set<String>> previousFailures = new HashMap<>();
+     for (var attempt : attempts) {
+       var hp = HostAndPort.fromString(attempt.getServer());
+       previousFailures.computeIfAbsent(hp.getHost(), h -> new 
HashSet<>()).add(attempt.getServer());
      }
  
+     return previousFailures;
    }
  
-   private static final class PriorHostServers {
-     private final String priorHost;
-     private final List<String> priorServers = new ArrayList<>();
- 
-     public PriorHostServers(String priorHost) {
-       this.priorHost = priorHost;
 -  List<String> removeFailedHost(String group, Map<String,Set<String>> 
prevFailures,
++  List<String> removeFailedHost(ResourceGroupId group, 
Map<String,Set<String>> prevFailures,
+       List<String> rendezvousHosts, ScanServersSnapshot serversSnapshot) {
+     if (prevFailures.isEmpty()) {
+       return rendezvousHosts;
      }
  
-     public String getPriorHost() {
-       return priorHost;
+     // filter out hosts where all servers failed
+     List<String> availableHost = new ArrayList<>(rendezvousHosts.size());
+     for (String host : rendezvousHosts) {
+       var hostsFailures = prevFailures.getOrDefault(host, Set.of());
+       if (hostsFailures.isEmpty()) {
+         availableHost.add(host);
+       } else {
+         var hostsServers = serversSnapshot.getServersForHost(group, host);
+         if (!hostsFailures.containsAll(hostsServers)) {
+           // this host has some servers that did not fail
+           availableHost.add(host);
+         }
+       }
      }
  
-     public List<String> getPriorServers() {
-       return priorServers;
+     return availableHost;
+   }
+ 
+   private List<String> removeFailedServers(Set<String> failedServers,
+       List<String> scanServersOnHost) {
+     if (failedServers.isEmpty()) {
+       return scanServersOnHost;
      }
+     return scanServersOnHost.stream().filter(s -> !failedServers.contains(s))
+         .collect(Collectors.toList());
    }
  
-   @Override
-   protected int selectServers(SelectorParameters params, Profile profile,
-       List<String> orderedScanServers, Map<TabletId,String> serversToUse) {
- 
-     // orderedScanServers is the set of ScanServers addresses (host:port)
-     // for the resource group designated for the profile being used for
-     // this scan. We want to group these scan servers by hostname and
-     // hash the tablet to the hostname, then randomly pick one of the
-     // scan servers in that group.
- 
-     final Map<String,List<String>> scanServerHosts = new HashMap<>();
-     for (final String address : orderedScanServers) {
-       final HostAndPort hp = HostAndPort.fromString(address);
-       scanServerHosts.computeIfAbsent(hp.getHost(), (k) -> {
-         return new ArrayList<String>();
-       }).add(address);
+   /**
+    * Finds the set of scan servers to use for a given attempt. If all servers 
have previously failed
+    * for all host this will return an empty list.
+    *
+    */
+   private List<String> getServersForHostAttempt(int hostAttempt, TabletId 
tablet, Profile profile,
+       RendezvousHasher rhasher, Map<String,Set<String>> prevFailures) {
+     final var snapshot = rhasher.getSnapshot();
+     final int numHostToUse =
 -        profile.getNumServers(hostAttempt, 
snapshot.getHostsForGroup(profile.group).size());
 -    List<String> rendezvousHosts =
 -        rhasher.rendezvous(HOST, profile.group, tablet, 
profile.getSalt(hostAttempt), numHostToUse);
 -    rendezvousHosts = removeFailedHost(profile.group, prevFailures, 
rendezvousHosts, snapshot);
++        profile.getNumServers(hostAttempt, 
snapshot.getHostsForGroup(profile.getGroupId()).size());
++    List<String> rendezvousHosts = rhasher.rendezvous(HOST, 
profile.getGroupId(), tablet,
++        profile.getSalt(hostAttempt), numHostToUse);
++    rendezvousHosts =
++        removeFailedHost(profile.getGroupId(), prevFailures, rendezvousHosts, 
snapshot);
+     if (rendezvousHosts.isEmpty()) {
+       return List.of();
      }
-     final List<String> hostIndex = new ArrayList<>(scanServerHosts.keySet());
 -    var hostToUse = 
rendezvousHosts.get(RANDOM.nextInt(rendezvousHosts.size()));
 -    List<String> hostServers = snapshot.getServersForHost(profile.group, 
hostToUse);
++    var hostToUse = 
rendezvousHosts.get(RANDOM.get().nextInt(rendezvousHosts.size()));
++    List<String> hostServers = 
snapshot.getServersForHost(profile.getGroupId(), hostToUse);
+     return removeFailedServers(prevFailures.getOrDefault(hostToUse, 
Set.of()), hostServers);
+   }
  
-     final int numberOfPreviousAttempts = params.getTablets().stream()
-         .mapToInt(tablet -> 
params.getAttempts(tablet).size()).max().orElse(0);
+   @Override
+   int selectServers(SelectorParameters params, Profile profile, 
RendezvousHasher rhasher,
+       Map<TabletId,String> serversToUse) {
  
-     final int numServersToUseInAttemptPlan =
-         profile.getNumServers(numberOfPreviousAttempts, 
orderedScanServers.size());
+     int maxHostAttempt = 0;
  
      for (TabletId tablet : params.getTablets()) {
- 
-       boolean scanServerFound = false;
-       if (numberOfPreviousAttempts > 0) {
-         // Sort the prior attempts by the number of scan servers tried in the 
list
-         // for each host. In theory the server at the top of the list either 
has
-         // scan servers remaining on that host, or has tried them all.
-         final Map<String,PriorHostServers> priorServers = new 
HashMap<>(numberOfPreviousAttempts);
-         params.getAttempts(tablet).forEach(ssa -> {
-           final String priorServerAddress = ssa.getServer();
-           final HostAndPort priorHP = 
HostAndPort.fromString(priorServerAddress);
-           priorServers.computeIfAbsent(priorHP.getHost(), (k) -> {
-             return new PriorHostServers(priorHP.getHost());
-           }).getPriorServers().add(priorServerAddress);
-         });
-         final List<PriorHostServers> priors = new 
ArrayList<>(priorServers.values());
-         // sort after populating
-         Collections.sort(priors, new PriorHostServersComparator());
- 
-         for (PriorHostServers phs : priors) {
-           final Set<String> scanServersOnPriorHost =
-               new HashSet<>(scanServerHosts.get(phs.getPriorHost()));
-           scanServersOnPriorHost.removeAll(phs.getPriorServers());
-           if (scanServersOnPriorHost.size() > 0) {
-             serversToUse.put(tablet, 
scanServersOnPriorHost.iterator().next());
-             scanServerFound = true;
-             break;
-           }
-         }
-         // If we get here, then we were unable to find a host with a 
ScanServer that
-         // we did not try. Remove the hosts from the hostIndex.
-         for (PriorHostServers phs : priors) {
-           hostIndex.remove(phs.getPriorHost());
+       Map<String,Set<String>> prevFailures = computeFailuresByHost(tablet, 
params);
+ 
+       for (int hostAttempt = 0; hostAttempt < 
profile.getAttemptPlans().size(); hostAttempt++) {
+         maxHostAttempt = Math.max(hostAttempt, maxHostAttempt);
+         List<String> scanServers =
+             getServersForHostAttempt(hostAttempt, tablet, profile, rhasher, 
prevFailures);
+         if (!scanServers.isEmpty()) {
 -          String serverToUse = 
scanServers.get(RANDOM.nextInt(scanServers.size()));
++          String serverToUse = 
scanServers.get(RANDOM.get().nextInt(scanServers.size()));
+           serversToUse.put(tablet, serverToUse);
+           break;
          }
        }
  
-       if (!scanServerFound) {
-         if (hostIndex.size() == 0) {
-           // We tried all servers
-           serversToUse.put(tablet, null);
-         } else {
-           final HashCode hashCode = hashTablet(tablet, 
profile.getSalt(numberOfPreviousAttempts));
-           final int serverIndex =
-               (Math.abs(hashCode.asInt()) + 
RANDOM.get().nextInt(numServersToUseInAttemptPlan))
-                   % hostIndex.size();
-           final String hostToUse = hostIndex.get(serverIndex);
-           final List<String> scanServersOnHost = 
scanServerHosts.get(hostToUse);
-           serversToUse.put(tablet, scanServersOnHost.get(0));
+       if (!serversToUse.containsKey(tablet) && !prevFailures.isEmpty()) {
+         // assuming no servers were found because all servers have previously 
failed, so in the
+         // case were all servers have previous failed ignore previous 
failures and try any server
+         List<String> scanServers = 
getServersForHostAttempt(profile.getAttemptPlans().size() - 1,
+             tablet, profile, rhasher, Map.of());
+         if (!scanServers.isEmpty()) {
 -          String serverToUse = 
scanServers.get(RANDOM.nextInt(scanServers.size()));
++          String serverToUse = 
scanServers.get(RANDOM.get().nextInt(scanServers.size()));
+           serversToUse.put(tablet, serverToUse);
          }
        }
- 
      }
-     return numberOfPreviousAttempts;
  
+     return maxHostAttempt;
    }
- 
  }
diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
index eecf68b905,b54659e33c..3b4c05840b
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
@@@ -18,11 -18,10 +18,11 @@@
   */
  package org.apache.accumulo.core.spi.scan;
  
- import static java.nio.charset.StandardCharsets.UTF_8;
+ import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.SERVER;
 +import static org.apache.accumulo.core.util.LazySingletons.GSON;
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  
  import java.lang.reflect.Type;
 -import java.security.SecureRandom;
  import java.time.Duration;
  import java.util.ArrayList;
  import java.util.Collection;
@@@ -34,16 -32,15 +33,16 @@@ import java.util.Optional
  import java.util.Set;
  import java.util.concurrent.TimeUnit;
  import java.util.function.Supplier;
+ import java.util.stream.Collectors;
  
++import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 +import org.apache.accumulo.core.data.ResourceGroupId;
  import org.apache.accumulo.core.data.TabletId;
+ import org.apache.accumulo.core.util.Timer;
  
  import com.google.common.base.Preconditions;
- import com.google.common.base.Suppliers;
  import com.google.common.collect.Sets;
- import com.google.common.hash.HashCode;
- import com.google.common.hash.Hashing;
 -import com.google.gson.Gson;
  import com.google.gson.reflect.TypeToken;
  
  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@@ -249,8 -246,8 +247,8 @@@ public class ConfigurableScanServerSele
      boolean isDefault = false;
      int busyTimeoutMultiplier;
      String maxBusyTimeout;
--    String group = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
 -    String timeToWaitForScanServers = "0s";
++    String group = Constants.DEFAULT_RESOURCE_GROUP_NAME;
 +    String timeToWaitForScanServers = 100 * 365 + "d";
  
      transient boolean parsed = false;
      transient long parsedMaxBusyTimeout;
@@@ -294,6 -291,10 +292,14 @@@
        parse();
        return parsedTimeToWaitForScanServers;
      }
+ 
++    ResourceGroupId getGroupId() {
++      return ResourceGroupId.of(group);
++    }
++
+     List<AttemptPlan> getAttemptPlans() {
+       return attemptPlans;
+     }
    }
  
    private void parseProfiles(Map<String,String> options) {
@@@ -368,18 -389,22 +393,23 @@@
  
      Duration scanServerWaitTime = profile.getTimeToWaitForScanServers();
  
-     var finalProfile = profile;
-     if (orderedScanServers.isEmpty() && !scanServerWaitTime.isZero()) {
 -    if (rhasher.getSnapshot().getServersForGroup(profile.group).isEmpty()
++    if 
(rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty()
+         && !scanServerWaitTime.isZero()) {
        // Wait for scan servers in the configured group to be present.
-       orderedScanServers = params.waitUntil(
-           () -> Optional.ofNullable(
-               
orderedScanServersSupplier.get().get(ResourceGroupId.of(finalProfile.group))),
-           scanServerWaitTime, "scan servers in group : " + 
profile.group).orElseThrow();
+       rhasher = params.waitUntil(() -> {
+         var r2 = getRendezvous();
 -        if (r2.getSnapshot().getServersForGroup(profile.group).isEmpty()) {
++        if 
(r2.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty()) {
+           return Optional.empty();
+         } else {
+           return Optional.of(r2);
+         }
+       }, scanServerWaitTime, "scan servers in group : " + 
profile.group).orElseThrow();
        // at this point the list should be non empty unless there is a bug
-       Preconditions.checkState(!orderedScanServers.isEmpty());
 -      
Preconditions.checkState(!rhasher.getSnapshot().getServersForGroup(profile.group).isEmpty());
++      Preconditions
++          
.checkState(!rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty());
      }
  
-     if (orderedScanServers.isEmpty()) {
 -    if (rhasher.getSnapshot().getServersForGroup(profile.group).isEmpty()) {
++    if 
(rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty()) {
        // there are no scan servers so fall back to the tablet server
        return new ScanServerSelections() {
          @Override
@@@ -429,18 -453,26 +458,26 @@@
      int attempts = params.getTablets().stream()
          .mapToInt(tablet -> 
params.getAttempts(tablet).size()).max().orElse(0);
  
-     int numServers = profile.getNumServers(attempts, 
orderedScanServers.size());
+     int numServers = profile.getNumServers(attempts,
 -        rhasher.getSnapshot().getServersForGroup(profile.group).size());
++        
rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).size());
      for (TabletId tablet : params.getTablets()) {
- 
-       String serverToUse = null;
- 
-       var hashCode = hashTablet(tablet, profile.getSalt(attempts));
- 
-       int serverIndex = (Math.abs(hashCode.asInt()) + 
RANDOM.get().nextInt(numServers))
-           % orderedScanServers.size();
- 
-       serverToUse = orderedScanServers.get(serverIndex);
- 
 -      List<String> rendezvousServers =
 -          rhasher.rendezvous(SERVER, profile.group, tablet, 
profile.getSalt(attempts), numServers);
++      List<String> rendezvousServers = rhasher.rendezvous(SERVER, 
profile.getGroupId(), tablet,
++          profile.getSalt(attempts), numServers);
+ 
+       var tabletAttempts = params.getAttempts(tablet);
+       if (!tabletAttempts.isEmpty()) {
+         // remove servers that failed in previous attempts
+         var attemptServers =
+             
tabletAttempts.stream().map(ScanServerAttempt::getServer).collect(Collectors.toSet());
+         var copy = rendezvousServers.stream().filter(server -> 
!attemptServers.contains(server))
+             .collect(Collectors.toList());
+         if (!copy.isEmpty()) {
+           // pick from the servers that did not previously fail
+           rendezvousServers = copy;
+         } // else all servers have failed, so just try any one of them again
+       }
+       // pick a random server from the set of rendezvous servers
 -      String serverToUse = 
rendezvousServers.get(RANDOM.nextInt(rendezvousServers.size()));
++      String serverToUse = 
rendezvousServers.get(RANDOM.get().nextInt(rendezvousServers.size()));
        serversToUse.put(tablet, serverToUse);
      }
      return attempts;
diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/scan/RendezvousHasher.java
index 0000000000,cf4c753a40..b321ba2122
mode 000000,100644..100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/RendezvousHasher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/RendezvousHasher.java
@@@ -1,0 -1,334 +1,336 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.core.spi.scan;
+ 
+ import static java.nio.charset.StandardCharsets.UTF_8;
+ 
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Comparator;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Objects;
+ import java.util.PriorityQueue;
+ import java.util.Set;
+ 
++import org.apache.accumulo.core.data.ResourceGroupId;
+ import org.apache.accumulo.core.data.TabletId;
 -import org.apache.accumulo.core.util.HostAndPort;
+ import org.apache.hadoop.io.Text;
+ 
+ import com.github.benmanes.caffeine.cache.Cache;
+ import com.github.benmanes.caffeine.cache.Caffeine;
+ import com.github.benmanes.caffeine.cache.Weigher;
+ import com.google.common.base.Preconditions;
+ import com.google.common.hash.HashCode;
+ import com.google.common.hash.Hashing;
++import com.google.common.net.HostAndPort;
+ 
+ /**
+  * Computes and caches the computations of rendezvous hashes to map tablets 
to scan servers.
+  */
+ // intentionally package private so that it is not part of SPI, this is just 
internal utility code
+ class RendezvousHasher {
+   private static class ServerHash implements Comparable<ServerHash> {
+     final HashCode hash;
+     final String server;
+ 
+     private ServerHash(HashCode hash, String server) {
+       this.hash = hash;
+       this.server = server;
+     }
+ 
+     @Override
+     public boolean equals(Object o) {
+       if (o instanceof ServerHash) {
+         return compareTo((ServerHash) o) == 0;
+       }
+       return false;
+     }
+ 
+     @Override
+     public int hashCode() {
+       return hash.asInt();
+     }
+ 
+     @Override
+     public int compareTo(ServerHash o) {
+       return Arrays.compare(hash.asBytes(), o.hash.asBytes());
+     }
+   }
+ 
+   private static class CacheKey {
+     final Mode mode;
 -    final String group;
++    final ResourceGroupId group;
+     final TabletId tablet;
+     final int desiredServers;
+     final String salt;
+ 
 -    private CacheKey(Mode mode, String group, TabletId tablet, int 
numServers, String salt) {
++    private CacheKey(Mode mode, ResourceGroupId group, TabletId tablet, int 
numServers,
++        String salt) {
+       this.mode = mode;
+       this.group = group;
+       this.tablet = tablet;
+       this.desiredServers = numServers;
+       this.salt = salt;
+     }
+ 
+     @Override
+     public int hashCode() {
+       return Objects.hash(mode, group, tablet, desiredServers, salt);
+     }
+ 
+     @Override
+     public boolean equals(Object o) {
+       if (o instanceof CacheKey) {
+         var ock = (CacheKey) o;
+         return mode == ock.mode && group.equals(ock.group) && 
tablet.equals(ock.tablet)
+             && desiredServers == ock.desiredServers && Objects.equals(salt, 
ock.salt);
+       }
+ 
+       return false;
+     }
+   }
+ 
+   private final ScanServersSnapshot snapshot;
+   private final Cache<CacheKey,List<String>> cache;
+ 
+   RendezvousHasher(ScanServersSnapshot snapshot, int maxCacheSize) {
+     Weigher<CacheKey,List<String>> weigher = (cacheKey, servers) -> {
+ 
+       // for the list of servers the length of the strings is not considered 
because the server
+       // strings may be pointed to by multiple cache entries, so only 
considering the pointer size
+       return 64 + cacheKey.tablet.getTable().canonical().length()
+           + weigh(cacheKey.tablet.getEndRow()) + 
weigh(cacheKey.tablet.getPrevEndRow())
+           + servers.size() * 8;
+     };
+     this.snapshot = snapshot;
+     cache = 
Caffeine.newBuilder().weigher(weigher).maximumWeight(maxCacheSize).build();
+   }
+ 
+   enum Mode {
+     SERVER, HOST
+   }
+ 
+   /**
+    * Finds "desiredServers" number of servers for a given tablet using 
rendezvous hashing. For a
+    * given tablet+salt the same subset of servers will always be computed 
from the same set of all
+    * servers. The set of servers computed for a tablet+salt is likely to be 
stable as the set of all
+    * servers changes, but may change sometimes. This allows many clients to 
usually choose the same
+    * set of scan servers for a given tablet even as the set of scan servers 
changes over time.
+    */
 -  List<String> rendezvous(Mode mode, String group, TabletId tablet, String 
salt,
++  List<String> rendezvous(Mode mode, ResourceGroupId group, TabletId tablet, 
String salt,
+       int desiredServers) {
+     if (mode == Mode.SERVER && desiredServers >= 
getSnapshot().getServersForGroup(group).size()) {
+       // no need to compute rendezvous hash because all servers are wanted
+       return getSnapshot().getServersForGroupAsList(group);
+     }
+ 
+     if (mode == Mode.HOST && desiredServers >= 
getSnapshot().getHostsForGroup(group).size()) {
+       // no need to compute rendezvous hash because all hosts are wanted
+       return getSnapshot().getHostsForGroup(group);
+     }
+ 
+     var cacheKey = new CacheKey(mode, group, tablet, desiredServers, salt);
+ 
+     // Because the computations are derived from an immutable snapshot, it is 
safe to cache the
+     // computations result as it should always be the same.
+     return cache.get(cacheKey, this::rendezvousHash);
+   }
+ 
+   /**
+    * @return the snapshot of scan servers used to create this hasher
+    */
+   ScanServersSnapshot getSnapshot() {
+     return snapshot;
+   }
+ 
+   private List<String> rendezvousHash(CacheKey ck) {
+     switch (ck.mode) {
+       case HOST:
+         return rendezvousHashHost(ck);
+       case SERVER:
+         return rendezvousHashServers(ck);
+       default:
+         throw new IllegalArgumentException(ck.mode.name());
+     }
+   }
+ 
+   private List<String> rendezvousHashServers(CacheKey ck) {
+     Preconditions.checkState(ck.desiredServers > 0, "%s", ck.desiredServers);
+ 
+     if (ck.desiredServers == 1) {
+       // optimization that does minimal work for finding a single server
+       ServerHash minServerHash = findMinHash(ck);
+ 
+       if (minServerHash == null) {
+         return List.of();
+       } else {
+         return List.of(minServerHash.server);
+       }
+     } else {
+       // Tracks the maximum minimums seen. Used to find the least M hashes 
w/o sorting all N hashes.
+       // This algorithm should be O(N*log2(M)) which is a bit better than 
O(N*log2(N)) when sorting
+       // everything.
+       var pq = new PriorityQueue<ServerHash>(ck.desiredServers, 
Comparator.reverseOrder());
+       Iterator<String> iter = 
getSnapshot().getServersForGroup(ck.group).iterator();
+ 
+       // populate the priority queue with the first hashes, these are initial 
set of minimum hashes
+       while (iter.hasNext() && pq.size() < ck.desiredServers) {
+         var server = iter.next();
+         var hc = hash(ck.tablet, server, ck.salt);
+         pq.add(new ServerHash(hc, server));
+       }
+ 
+       // look through the rest of the hashes finding the rest of the minimum 
hashes
+       while (iter.hasNext()) {
+         var server = iter.next();
+         var hc = hash(ck.tablet, server, ck.salt);
+         var serverHash = new ServerHash(hc, server);
+         if (serverHash.compareTo(pq.peek()) < 0) {
+           // This hash is less than the maximum minimum seen so replace it.
+           pq.poll(); // Remove from priority queue before adding to avoid 
increasing its array size
+                      // and causing an internal reallocation
+           pq.add(serverHash);
+         }
+       }
+ 
+       List<String> found = new ArrayList<>(pq.size());
+       pq.forEach(serverHash -> found.add(serverHash.server));
+       // return an immutable list as it will be cached and potentially shared 
many times
+       return List.copyOf(found);
+     }
+   }
+ 
+   /**
+    * Finds a set of host using rendezvous hashing compensating for host that 
have different numbers
+    * of servers. When choosing a host the goal of this code is to select it 
based on its relative
+    * number of servers. So if TS is the total number of servers and a host 
has HS servers then the
+    * probability of that host being chosen for any tablet should be HS/TS. 
For example if there are
+    * 30 hosts and 295 servers running across all the hosts then for a host 
with 3 servers its
+    * probability of being chosen should be 3/295=1.02% across all tablets. If 
the host with 3
+    * servers were chosen uniformly from the 30 host then it would be chosen 
1/30=3.33% of the time
+    * instead of 1.02% which would cause its scan servers to have higher load 
than other scan
+    * servers.
+    *
+    * <p>
+    * The implementation for achieving this goal is to sort all servers (not 
host) by hash and then
+    * look for N unique host in the first part of the sorted list. If a host 
has fewer servers than
+    * other host then it will show up less frequently in the beginning of the 
list on average for
+    * different tablets. This roughly achieves the goal of this code, in 
limited testing it seems to
+    * skew slightly higher than the desired probability. This approach could 
benefit from more
+    * testing and looking at the statistics, suspect the greedy nature of the 
algorithm may throw
+    * things off a bit.
+    */
+   private List<String> rendezvousHashHost(CacheKey ck) {
+ 
+     Set<String> allServers = getSnapshot().getServersForGroup(ck.group);
+     if (ck.desiredServers == 1) {
+       // optimization for finding a single host, avoids sorting everything 
and allocates fewer
+       // objects
+       ServerHash minServerHash = findMinHash(ck);
+ 
+       if (minServerHash == null) {
+         return List.of();
+       } else {
+         return 
List.of(HostAndPort.fromString(minServerHash.server).getHost());
+       }
+     } else {
+       List<ServerHash> allHashes = new ArrayList<>(allServers.size());
+       for (String server : allServers) {
+         var hc = hash(ck.tablet, server, ck.salt);
+         var serverHash = new ServerHash(hc, server);
+         allHashes.add(serverHash);
+       }
+ 
+       // TODO can this be optimized to avoid sorting all servers? might be a 
good follow on
+       allHashes.sort(Comparator.naturalOrder());
+ 
+       Set<String> hostsSeen = new HashSet<>();
+       Iterator<ServerHash> iter = allHashes.iterator();
+       while (hostsSeen.size() < ck.desiredServers && iter.hasNext()) {
+         String host = HostAndPort.fromString(iter.next().server).getHost();
+         hostsSeen.add(host);
+       }
+ 
+       // return an immutable list as it will be cached and potentially shared 
many times
+       return List.copyOf(hostsSeen);
+     }
+   }
+ 
+   private ServerHash findMinHash(CacheKey ck) {
+     ServerHash minServerHash = null;
+     // This code only exists as a performance optimization to avoid sorting 
all data for the case of
+     // finding a single server. It could be shorter if using streams, but 
would probably be much
+     // slower which would defeat its reason for existence.
+     Iterator<String> iter = 
getSnapshot().getServersForGroup(ck.group).iterator();
+ 
+     // this initial check avoids doing a null check in the subsequent while 
loop
+     if (iter.hasNext()) {
+       String server = iter.next();
+       var hc = hash(ck.tablet, server, ck.salt);
+       minServerHash = new ServerHash(hc, server);
+     }
+ 
+     while (iter.hasNext()) {
+       String server = iter.next();
+       var hc = hash(ck.tablet, server, ck.salt);
+       var serverHash = new ServerHash(hc, server);
+       if (serverHash.compareTo(minServerHash) < 0) {
+         minServerHash = serverHash;
+       }
+     }
+ 
+     return minServerHash;
+   }
+ 
+   private static int weigh(Text t) {
+     if (t == null) {
+       return 8;
+     } else {
+       return 8 + t.getBytes().length;
+     }
+   }
+ 
+   private HashCode hash(TabletId tablet, String server, String salt) {
+     var hasher = Hashing.murmur3_128().newHasher();
+ 
+     if (tablet.getEndRow() != null) {
+       hasher.putBytes(tablet.getEndRow().getBytes(), 0, 
tablet.getEndRow().getLength());
+     } else {
+       hasher.putByte((byte) 5);
+     }
+ 
+     if (tablet.getPrevEndRow() != null) {
+       hasher.putBytes(tablet.getPrevEndRow().getBytes(), 0, 
tablet.getPrevEndRow().getLength());
+     } else {
+       hasher.putByte((byte) 7);
+     }
+ 
+     hasher.putString(tablet.getTable().canonical(), UTF_8);
+ 
+     hasher.putString(server, UTF_8);
+ 
+     if (salt != null && !salt.isEmpty()) {
+       hasher.putString(salt, UTF_8);
+     }
+ 
+     return hasher.hash();
+   }
+ }
diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServersSnapshot.java
index 0000000000,309d16aa37..61721f2e04
mode 000000,100644..100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServersSnapshot.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServersSnapshot.java
@@@ -1,0 -1,125 +1,127 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.core.spi.scan;
+ 
+ import static java.util.stream.Collectors.groupingBy;
+ import static java.util.stream.Collectors.mapping;
+ import static java.util.stream.Collectors.toUnmodifiableSet;
+ 
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ 
 -import org.apache.accumulo.core.util.HostAndPort;
++import org.apache.accumulo.core.data.ResourceGroupId;
++
++import com.google.common.net.HostAndPort;
+ 
+ /**
+  * An immutable snapshot of information about scan servers
+  */
+ // This class is intentionally package private
+ class ScanServersSnapshot {
+ 
+   private static class PerHostInfo {
+     final List<String> hosts;
+     final Map<String,List<String>> serversByHost;
+ 
+     PerHostInfo(Collection<String> servers) {
+       this.serversByHost = new HashMap<>();
+ 
+       servers.forEach(server -> {
+         var hp = HostAndPort.fromString(server);
+         serversByHost.computeIfAbsent(hp.getHost(), h -> new 
ArrayList<>()).add(server);
+       });
+ 
+       hosts = List.copyOf(serversByHost.keySet());
+     }
+ 
+     List<String> getServersForHost(String host) {
+       return Collections.unmodifiableList(serversByHost.getOrDefault(host, 
List.of()));
+     }
+   }
+ 
+   // All code in this class assumes this map is an immutable snapshot and 
computes derivative data
+   // structures from its data as needed.
 -  private final Map<String,Set<String>> serversByGroup;
 -  private final Map<String,PerHostInfo> perHostInfo;
 -  private final Map<String,List<String>> serversByGroupList;
++  private final Map<ResourceGroupId,Set<String>> serversByGroup;
++  private final Map<ResourceGroupId,PerHostInfo> perHostInfo;
++  private final Map<ResourceGroupId,List<String>> serversByGroupList;
+ 
 -  private ScanServersSnapshot(Map<String,Set<String>> serversByGroup) {
++  private ScanServersSnapshot(Map<ResourceGroupId,Set<String>> 
serversByGroup) {
+     this.serversByGroup = serversByGroup;
+     this.perHostInfo = new ConcurrentHashMap<>();
+     this.serversByGroupList = new ConcurrentHashMap<>();
+   }
+ 
 -  Set<String> getServersForGroup(String group) {
++  Set<String> getServersForGroup(ResourceGroupId group) {
+     return serversByGroup.getOrDefault(group, Set.of());
+   }
+ 
 -  List<String> getServersForGroupAsList(String group) {
++  List<String> getServersForGroupAsList(ResourceGroupId group) {
+     // compute the list as needed since it is not always needed
+     return serversByGroupList.computeIfAbsent(group, g -> 
List.copyOf(getServersForGroup(g)));
+   }
+ 
 -  private PerHostInfo getPerHostInfo(String group) {
++  private PerHostInfo getPerHostInfo(ResourceGroupId group) {
+     return perHostInfo.computeIfAbsent(group, g -> new 
PerHostInfo(getServersForGroup(g)));
+   }
+ 
 -  List<String> getHostsForGroup(String group) {
++  List<String> getHostsForGroup(ResourceGroupId group) {
+     return getPerHostInfo(group).hosts;
+   }
+ 
 -  List<String> getServersForHost(String group, String host) {
++  List<String> getServersForHost(ResourceGroupId group, String host) {
+     return getPerHostInfo(group).getServersForHost(host);
+   }
+ 
+   @Override
+   public boolean equals(Object o) {
+     if (o instanceof ScanServersSnapshot) {
+       // only need to compare this fields as the other fields are derived 
from the information in
+       // it.
+       return serversByGroup.equals(((ScanServersSnapshot) o).serversByGroup);
+     }
+     return false;
+   }
+ 
+   @Override
+   public int hashCode() {
+     return serversByGroup.hashCode();
+   }
+ 
+   static ScanServersSnapshot from(Collection<ScanServerInfo> scanServerInfos) 
{
+     Objects.requireNonNull(scanServerInfos);
+     // create an initial immutable snapshot of the information
+     var initialSnapshot =
+         
Map.copyOf(scanServerInfos.stream().collect(groupingBy(ScanServerInfo::getGroup,
+             mapping(ScanServerInfo::getAddress, toUnmodifiableSet()))));
+     return new ScanServersSnapshot(initialSnapshot);
+   }
+ 
 -  static ScanServersSnapshot from(Map<String,Set<String>> serversByGroup) {
 -    var copy = new HashMap<String,Set<String>>();
++  static ScanServersSnapshot from(Map<ResourceGroupId,Set<String>> 
serversByGroup) {
++    var copy = new HashMap<ResourceGroupId,Set<String>>();
+     serversByGroup.forEach((k, v) -> copy.put(k, Set.copyOf(v)));
+     return new ScanServersSnapshot(Map.copyOf(copy));
+   }
+ }
diff --cc 
core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index b77bd026ba,9cfbffb490..b3dfc24fe5
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@@ -18,9 -18,9 +18,10 @@@
   */
  package org.apache.accumulo.core.spi.scan;
  
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertFalse;
+ import static org.junit.jupiter.api.Assertions.assertNotEquals;
  import static org.junit.jupiter.api.Assertions.assertNull;
  import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
@@@ -545,4 -530,113 +546,112 @@@ public class ConfigurableScanServerSele
      assertTrue(Set.of("ss1:1", "ss2:2", 
"ss3:3").contains(actions.getScanServer(tabletId)));
      assertFalse(scanServers.get().isEmpty());
    }
+ 
+   @Test
+   public void testServerSetChanges() throws Exception {
+     String defaultProfile =
+         
"{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s',"
+             + "'attemptPlans':[{'servers':3, 'busyTimeout':'60s'}]}";
+ 
+     var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', 
'"'));
+ 
+     ConfigurableScanServerSelector selector = new 
ConfigurableScanServerSelector();
+ 
+     var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+     // start off w/ one scan server
 -    AtomicReference<Map<String,String>> scanServers =
 -        new AtomicReference<>(Map.of("localhost:8000", dg));
++    AtomicReference<Map<String,ResourceGroupId>> scanServers =
++        new AtomicReference<>(Map.of("localhost:8000", 
ResourceGroupId.of(dg)));
+ 
+     selector.init(new InitParams(scanServers::get, opts));
+ 
+     for (int i = 0; i < 50; i++) {
+       var tabletId = nti("" + i, "m");
+       assertEquals("localhost:8000",
+           selector.selectServers(new 
SelectorParams(tabletId)).getScanServer(tabletId));
+       assertEquals("localhost:8000",
+           selector.selectServers(new 
SelectorParams(tabletId)).getScanServer(tabletId));
+     }
+ 
+     // add some new scan servers, the selector should eventually pick these 
up and start making
+     // different decisions
 -    HashMap<String,String> newServers = new HashMap<>();
++    HashMap<String,ResourceGroupId> newServers = new HashMap<>();
+     for (int i = 0; i < 30; i++) {
 -      newServers.put(String.format("localhost:%d", 8000 + i), dg);
++      newServers.put(String.format("localhost:%d", 8000 + i), 
ResourceGroupId.of(dg));
+     }
+     // add some servers in another RG, these should be ignored
+     for (int i = 0; i < 30; i++) {
 -      newServers.put(String.format("localhost:%d", 9000 + i), "other");
++      newServers.put(String.format("localhost:%d", 9000 + i), 
ResourceGroupId.of("other"));
+     }
+     scanServers.set(newServers);
+ 
+     // wait until the new scan servers are noticed
+     var tabletId = nti("1", "m");
+     while ("localhost:8000"
+         .equals(selector.selectServers(new 
SelectorParams(tabletId)).getScanServer(tabletId))) {
+       Thread.sleep(100);
+     }
+ 
+     // now should see tablet spread across the new scan servers servers
+     HashSet<String> allServersSeen = new HashSet<>();
+     for (int i = 0; i < 100; i++) {
+       tabletId = nti("" + i, "m");
+       HashSet<String> serversSeen = new HashSet<>();
+       for (int j = 0; j < 30; j++) {
+         var server = selector.selectServers(new 
SelectorParams(tabletId)).getScanServer(tabletId);
+         serversSeen.add(server);
+         allServersSeen.add(server);
+       }
+       // each tablet should spread across three servers
+       assertEquals(3, serversSeen.size());
+     }
+     // all tablets should spread across all scan servers
+     assertEquals(30, allServersSeen.size());
+   }
+ 
+   /**
+    * Test that previous failures are not used again unless all servers have 
failed
+    */
+   @Test
+   public void testPreviousFailures() {
 -    var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
 -    HashMap<String,String> servers = new HashMap<>();
++    HashMap<String,ResourceGroupId> servers = new HashMap<>();
+     for (int i = 0; i < 30; i++) {
 -      servers.put(String.format("localhost:%d", 8000 + i), dg);
++      servers.put(String.format("localhost:%d", 8000 + i), 
ResourceGroupId.DEFAULT);
+     }
+ 
+     String defaultProfile =
+         
"{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s',"
+             + "'attemptPlans':[{'servers':3, 'busyTimeout':'60s'}]}";
+     var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', 
'"'));
+     ConfigurableScanServerSelector selector = new 
ConfigurableScanServerSelector();
+     selector.init(new InitParams(() -> servers, opts));
+ 
+     var tabletId = nti("1", "m");
+     var selected = selector.selectServers(new 
SelectorParams(tabletId)).getScanServer(tabletId);
+     assertTrue(servers.containsKey(selected));
+ 
+     // try selecting again, should pick a different server
+     var attempts = new HashSet<ScanServerAttempt>();
+     attempts.add(new TestScanServerAttempt(selected, 
ScanServerAttempt.Result.BUSY));
+     var selected2 =
+         selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, 
attempts), Map.of()))
+             .getScanServer(tabletId);
+     assertTrue(servers.containsKey(selected2));
+     assertNotEquals(selected, selected2);
+ 
+     // try selecting again, should pick a different server
+     attempts.add(new TestScanServerAttempt(selected2, 
ScanServerAttempt.Result.BUSY));
+     var selected3 =
+         selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, 
attempts), Map.of()))
+             .getScanServer(tabletId);
+     assertTrue(servers.containsKey(selected3));
+     assertNotEquals(selected, selected3);
+     assertNotEquals(selected2, selected3);
+ 
+     // try selecting again, at this point all servers failed so should try 
any one of them
+     attempts.add(new TestScanServerAttempt(selected3, 
ScanServerAttempt.Result.BUSY));
+     var selected4 =
+         selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, 
attempts), Map.of()))
+             .getScanServer(tabletId);
+     assertTrue(Set.of(selected, selected2, selected3).contains(selected4));
+   }
  }
diff --cc 
core/src/test/java/org/apache/accumulo/core/spi/scan/RendezvousHasherTest.java
index 0000000000,708cc26c7a..2ba334a698
mode 000000,100644..100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/RendezvousHasherTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/RendezvousHasherTest.java
@@@ -1,0 -1,196 +1,201 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.core.spi.scan;
+ 
+ import static java.util.stream.Collectors.toSet;
+ import static 
org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelectorTest.nti;
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ 
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ 
++import org.apache.accumulo.core.data.ResourceGroupId;
+ import org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode;
 -import org.apache.accumulo.core.util.HostAndPort;
+ import org.junit.jupiter.api.Test;
+ 
++import com.google.common.net.HostAndPort;
++
+ public class RendezvousHasherTest {
++  private static final ResourceGroupId G1 = ResourceGroupId.of("g1");
++  private static final ResourceGroupId G2 = ResourceGroupId.of("g2");
++
+   @Test
+   public void testSameServerCountPerHost() {
 -    Map<String,Set<String>> serversByGroup = new HashMap<>();
++    Map<ResourceGroupId,Set<String>> serversByGroup = new HashMap<>();
+ 
+     Set<String> g1Servers = new HashSet<>();
+     for (int h = 0; h < 10; h++) {
+       String host = "g1h" + h;
+       for (int p = 1000; p < 1005; p++) {
+         g1Servers.add(host + ":" + p);
+       }
+     }
 -    serversByGroup.put("g1", g1Servers);
++    serversByGroup.put(G1, g1Servers);
+ 
+     Set<String> g2Servers = Set.of("g2h1:5000", "g2h1:5001", "g2h1:5002");
 -    serversByGroup.put("g2", g2Servers);
++    serversByGroup.put(G2, g2Servers);
+ 
+     ScanServersSnapshot snapshot = ScanServersSnapshot.from(serversByGroup);
+     RendezvousHasher hasher = new RendezvousHasher(snapshot, 1_000_000);
+ 
+     for (int i = 1; i < 5; i++) {
 -      var g2rh = hasher.rendezvous(Mode.SERVER, "g2", nti("1", "e"), "s1", i);
++      var g2rh = hasher.rendezvous(Mode.SERVER, G2, nti("1", "e"), "s1", i);
+       assertEquals(Math.min(i, 3), g2rh.size());
+       assertTrue(g2Servers.containsAll(g2rh));
+ 
 -      g2rh = hasher.rendezvous(Mode.HOST, "g2", nti("1", "e"), "s1", i);
++      g2rh = hasher.rendezvous(Mode.HOST, G2, nti("1", "e"), "s1", i);
+       assertEquals(List.of("g2h1"), g2rh);
+     }
+ 
+     int diffCount = 0;
+ 
+     Map<String,Integer> useCounts = new HashMap<>();
+ 
+     for (int t = 0; t < 200; t++) {
+       var tablet = nti("1", "" + t);
 -      var servers_s1_1 = Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", 
tablet, "s1", 1));
 -      var servers_s1_2 = Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", 
tablet, "s1", 2));
 -      var servers_s1_3 = Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", 
tablet, "s1", 3));
++      var servers_s1_1 = Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, 
tablet, "s1", 1));
++      var servers_s1_2 = Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, 
tablet, "s1", 2));
++      var servers_s1_3 = Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, 
tablet, "s1", 3));
+       // the same tablet and salt should hash to the same servers
 -      assertEquals(servers_s1_1, Set.copyOf(hasher.rendezvous(Mode.SERVER, 
"g1", tablet, "s1", 1)));
 -      assertEquals(servers_s1_2, Set.copyOf(hasher.rendezvous(Mode.SERVER, 
"g1", tablet, "s1", 2)));
 -      assertEquals(servers_s1_3, Set.copyOf(hasher.rendezvous(Mode.SERVER, 
"g1", tablet, "s1", 3)));
++      assertEquals(servers_s1_1, Set.copyOf(hasher.rendezvous(Mode.SERVER, 
G1, tablet, "s1", 1)));
++      assertEquals(servers_s1_2, Set.copyOf(hasher.rendezvous(Mode.SERVER, 
G1, tablet, "s1", 2)));
++      assertEquals(servers_s1_3, Set.copyOf(hasher.rendezvous(Mode.SERVER, 
G1, tablet, "s1", 3)));
+       assertEquals(1, servers_s1_1.size());
+       assertEquals(2, servers_s1_2.size());
+       assertEquals(3, servers_s1_3.size());
+       // should contain the smaller set of servers
+       assertTrue(servers_s1_2.containsAll(servers_s1_1));
+       assertTrue(servers_s1_3.containsAll(servers_s1_2));
+ 
+       assertTrue(g1Servers.containsAll(servers_s1_1));
+       assertTrue(g1Servers.containsAll(servers_s1_2));
+       assertTrue(g1Servers.containsAll(servers_s1_3));
+ 
+       for (var server : servers_s1_3) {
+         useCounts.merge(server, 1, Integer::sum);
+       }
+ 
 -      var hosts_s1_1 = hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 1);
 -      var hosts_s1_2 = hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 2);
 -      var hosts_s1_3 = hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 3);
++      var hosts_s1_1 = hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 1);
++      var hosts_s1_2 = hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 2);
++      var hosts_s1_3 = hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 3);
+       // the same tablet and salt should hash to the same hosts
 -      assertEquals(hosts_s1_1, hasher.rendezvous(Mode.HOST, "g1", tablet, 
"s1", 1));
 -      assertEquals(hosts_s1_2, hasher.rendezvous(Mode.HOST, "g1", tablet, 
"s1", 2));
 -      assertEquals(hosts_s1_3, hasher.rendezvous(Mode.HOST, "g1", tablet, 
"s1", 3));
++      assertEquals(hosts_s1_1, hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 
1));
++      assertEquals(hosts_s1_2, hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 
2));
++      assertEquals(hosts_s1_3, hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 
3));
+       assertEquals(1, hosts_s1_1.size());
+       assertEquals(2, hosts_s1_2.size());
+       assertEquals(3, hosts_s1_3.size());
+       assertTrue(hosts_s1_2.containsAll(hosts_s1_1));
+       assertTrue(hosts_s1_3.containsAll(hosts_s1_2));
+       assertTrue(hosts_s1_3.stream().noneMatch(h -> h.contains(":")));
+ 
+       // the way these two are computed the hosts within the servers should 
be a subset of the
+       // rendezvous host
+       assertEquals(hosts_s1_1.iterator().next(),
+           HostAndPort.fromString(servers_s1_1.iterator().next()).getHost());
+       assertTrue(Set.copyOf(hosts_s1_2).containsAll(
+           servers_s1_2.stream().map(s -> 
HostAndPort.fromString(s).getHost()).collect(toSet())));
+       assertTrue(Set.copyOf(hosts_s1_3).containsAll(
+           servers_s1_3.stream().map(s -> 
HostAndPort.fromString(s).getHost()).collect(toSet())));
+ 
+       // try a different salt, should usually result in different servers
 -      var servers_s2_3 = Set.copyOf(hasher.rendezvous(Mode.SERVER, "g1", 
tablet, "s2", 3));
 -      assertEquals(servers_s2_3, Set.copyOf(hasher.rendezvous(Mode.SERVER, 
"g1", tablet, "s2", 3)));
++      var servers_s2_3 = Set.copyOf(hasher.rendezvous(Mode.SERVER, G1, 
tablet, "s2", 3));
++      assertEquals(servers_s2_3, Set.copyOf(hasher.rendezvous(Mode.SERVER, 
G1, tablet, "s2", 3)));
+       assertEquals(3, servers_s2_3.size());
+       assertTrue(g1Servers.containsAll(servers_s2_3));
+       if (!servers_s1_3.equals(servers_s2_3)) {
+         diffCount++;
+       }
+     }
+ 
+     var stats = useCounts.values().stream().mapToInt(i -> 
i).summaryStatistics();
+     assertEquals(50, stats.getCount());
+     assertEquals(12, stats.getAverage());
+     assertEquals(7, stats.getMin());
+     assertEquals(18, stats.getMax());
+ 
+     assertEquals(200, diffCount);
+   }
+ 
+   @Test
+   public void testDifferentServerCountPerHost() {
 -    Map<String,Set<String>> serversByGroup = new HashMap<>();
++    Map<ResourceGroupId,Set<String>> serversByGroup = new HashMap<>();
+ 
+     Set<String> g1Servers = new HashSet<>();
+     // 10 host with 10 servers
+     for (int h = 0; h < 10; h++) {
+       String host = "g1h" + h;
+       for (int p = 1000; p < 1010; p++) {
+         g1Servers.add(host + ":" + p);
+       }
+     }
+     // four host w/ two servers
+     g1Servers.add("gh1a:1000");
+     g1Servers.add("gh1a:1001");
+     g1Servers.add("gh1b:1000");
+     g1Servers.add("gh1b:1001");
+     g1Servers.add("gh1c:1000");
+     g1Servers.add("gh1c:1001");
+     g1Servers.add("gh1d:1000");
+     g1Servers.add("gh1d:1001");
+ 
 -    serversByGroup.put("g1", g1Servers);
++    serversByGroup.put(G1, g1Servers);
+ 
+     ScanServersSnapshot snapshot = ScanServersSnapshot.from(serversByGroup);
+     RendezvousHasher hasher = new RendezvousHasher(snapshot, 1_000_000);
+ 
+     Map<String,Integer> useCounts = new HashMap<>();
+ 
+     for (int t = 0; t < 10_000; t++) {
+       var tablet = nti("1", "" + t);
 -      var hosts_s1_3 = hasher.rendezvous(Mode.HOST, "g1", tablet, "s1", 3);
++      var hosts_s1_3 = hasher.rendezvous(Mode.HOST, G1, tablet, "s1", 3);
+       for (var host : hosts_s1_3) {
+         useCounts.merge(host, 1, Integer::sum);
+       }
+     }
+ 
+     // Verify the hosts with only 2 servers gets picked a lot less than the 
host with 10 servers.
+     // There are 108 total servers, ideally the 4 hosts with only 2 servers 
would get
+     // 8.0/108/.0=7.4% of the total usage and not 4/14=29% based being 4 of 
14 host.
+     var stats = useCounts.entrySet().stream().filter(e -> 
!e.getKey().matches("gh1[abcd]"))
+         .mapToInt(Map.Entry::getValue).summaryStatistics();
+ 
+     // the 10 host w/ 10 servers should get around 30,000 * 100.0 / 108.0 / 
10 = 2777 usage each
+     assertTrue(stats.getAverage() > 2777 - 100 && stats.getAverage() < 2777 + 
100);
+     assertTrue(stats.getMin() > 2777 - 200);
+     assertTrue(stats.getMax() < 2777 + 200);
+ 
+     // the 4 host w/ 2 servers should get around 30,000 * 8/108/4 = 555 usage 
each. Experimenting w/
+     // different scenarios, anecdotally this function seems to skew higher 
than the desired average.
+     // Suspect this is an artifact of the greedy algorithm used to implement 
host base rendezvous
+     // hashing.
+     var stats2 = useCounts.entrySet().stream().filter(e -> 
e.getKey().matches("gh1[abcd]"))
+         .mapToInt(Map.Entry::getValue).summaryStatistics();
+     assertTrue(stats2.getAverage() > 555 - 100 && stats2.getAverage() < 555 + 
100);
+     assertTrue(stats2.getMin() > 555 - 200);
+     assertTrue(stats2.getMax() < 555 + 200);
+ 
+     // the two groups of hosts should get all usage
+     assertEquals(30_000, stats2.getSum() + stats.getSum());
+   }
+ }

Reply via email to