This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 586b210e84 Sleeps after eventual scan RPC failures (#6315)
586b210e84 is described below

commit 586b210e8437a842e8150d124c557c55163e4444
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 15 09:04:29 2026 -0700

    Sleeps after eventual scan RPC failures (#6315)
    
    Fixes two problems. First the scan server client side plugins were not
    computing a sleep time based on observed errors.  This could cause
    aggressive retries with scans. Modified the provided plugins to compute
    this.
    
    Second the batch scanner code was not properly collecting the
    information needed by scan server client side plugin to know if errors
    happened.  The batch scanner code was not collecting information for all
    tablets, just a somewhat random subset of them.  Corrected it to collect
    for all tablets.  Also made the batch scanner code properly report
    failed tablets to the client side scan server plugin.
    
    Both changes together fix #6313.  Manually tested the batch scanner code
    changes by adding logs and running test to ensure the correct sleeps
    were happening. Also manually tested the scanner to ensure it was
    working correctly and sleeping as expected after errors.
---
 core/pom.xml                                       |  5 ++
 .../core/clientImpl/ScanServerAttemptImpl.java     |  4 +
 .../core/clientImpl/ScanServerAttemptsImpl.java    | 22 ++++-
 .../TabletServerBatchReaderIterator.java           | 29 ++++---
 .../scan/ConfigurableScanServerHostSelector.java   | 43 ++++++++--
 .../spi/scan/ConfigurableScanServerSelector.java   | 75 +++++++++++------
 .../scan/ConfigurableScanServerSelectorTest.java   | 97 ++++++++++++++++++++--
 7 files changed, 224 insertions(+), 51 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 02ecf146ba..bf5672360f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -174,6 +174,11 @@
       <artifactId>junit-jupiter-api</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.lz4</groupId>
       <artifactId>lz4-java</artifactId>
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
index bda2f26880..0d14d86a11 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
@@ -42,4 +42,8 @@ class ScanServerAttemptImpl implements ScanServerAttempt {
     return result;
   }
 
+  @Override
+  public String toString() {
+    return "server:" + server + " result:" + result;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
index 48ff72b532..4409ebc00f 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
@@ -26,8 +26,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +50,7 @@ public class ScanServerAttemptsImpl {
 
   ScanServerAttemptReporter createReporter(String server, TabletId tablet) {
     return result -> {
-      LOG.trace("Received result: {}", result);
+      LOG.trace("Received result: {} {} {}", result, tablet, server);
       synchronized (attempts) {
         attempts.computeIfAbsent(tablet, k -> new ArrayList<>())
             .add(new ScanServerAttemptImpl(result, server));
@@ -55,6 +58,23 @@ public class ScanServerAttemptsImpl {
     };
   }
 
+  public interface BatchAttemptReporter {
+    void report(Set<KeyExtent> extents, ScanServerAttempt.Result result);
+  }
+
+  BatchAttemptReporter createReporter(String server) {
+    return (tablets, result) -> {
+      LOG.trace("Received result: {} {} {}", result, tablets, server);
+      synchronized (attempts) {
+        var attempt = new ScanServerAttemptImpl(result, server);
+        tablets.forEach(extent -> {
+          var tablet = new TabletIdImpl(extent);
+          attempts.computeIfAbsent(tablet, k -> new 
ArrayList<>()).add(attempt);
+        });
+      }
+    };
+  }
+
   /**
    * Creates and returns a snapshot of {@link ScanServerAttempt} objects that 
were added before this
    * call
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index d05c483335..9c042c961a 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -52,6 +52,7 @@ import 
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TimedOutException;
+import 
org.apache.accumulo.core.clientImpl.ScanServerAttemptsImpl.BatchAttemptReporter;
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
@@ -365,12 +366,12 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
     private final List<Column> columns;
     private int semaphoreSize;
     private final long busyTimeout;
-    private final ScanServerAttemptReporter reporter;
+    private final BatchAttemptReporter reporter;
     private final Duration scanServerSelectorDelay;
 
     QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges,
         Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, 
List<Column> columns,
-        long busyTimeout, ScanServerAttemptReporter reporter, Duration 
scanServerSelectorDelay) {
+        long busyTimeout, BatchAttemptReporter reporter, Duration 
scanServerSelectorDelay) {
       this.tsLocation = tsLocation;
       this.tabletsRanges = tabletsRanges;
       this.receiver = receiver;
@@ -401,6 +402,10 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
             options, authorizations, timeoutTracker, busyTimeout);
 
         if (!tsFailures.isEmpty()) {
+          // On scan servers routine failures that occur on tservers, like not 
serving tablet or a
+          // tablet closing, are not expected. So for scan server record any 
failures seen as an
+          // error.
+          reporter.report(tsFailures.keySet(), ScanServerAttempt.Result.ERROR);
           locator.invalidateCache(tsFailures.keySet());
           synchronized (failures) {
             failures.putAll(tsFailures);
@@ -422,7 +427,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
         if (e.getCause() instanceof ScanServerBusyException) {
           result = ScanServerAttempt.Result.BUSY;
         }
-        reporter.report(result);
+        reporter.report(tabletsRanges.keySet(), result);
       } catch (AccumuloSecurityException e) {
         e.setTableInfo(getTableInfo());
         log.debug("AccumuloSecurityException thrown", e);
@@ -496,7 +501,6 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
         }
       }
     }
-
   }
 
   private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
@@ -506,7 +510,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
     long busyTimeout = 0;
     Duration scanServerSelectorDelay = null;
-    Map<String,ScanServerAttemptReporter> reporters = Map.of();
+    Map<String,BatchAttemptReporter> reporters = Map.of();
 
     if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
       var scanServerData = rebinToScanServers(binnedRanges, startTime);
@@ -564,7 +568,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
       final Map<KeyExtent,List<Range>> tabletsRanges = 
binnedRanges.get(tsLocation);
       if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 
1) {
         QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, 
failures, receiver, columns,
-            busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), 
scanServerSelectorDelay);
+            busyTimeout, reporters.getOrDefault(tsLocation, (t, r) -> {}), 
scanServerSelectorDelay);
         queryTasks.add(queryTask);
       } else {
         HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
@@ -573,15 +577,16 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
           if (tabletSubset.size() >= maxTabletsPerRequest) {
             QueryTask queryTask =
                 new QueryTask(tsLocation, tabletSubset, failures, receiver, 
columns, busyTimeout,
-                    reporters.getOrDefault(tsLocation, r -> {}), 
scanServerSelectorDelay);
+                    reporters.getOrDefault(tsLocation, (t, r) -> {}), 
scanServerSelectorDelay);
             queryTasks.add(queryTask);
             tabletSubset = new HashMap<>();
           }
         }
 
         if (!tabletSubset.isEmpty()) {
-          QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, 
failures, receiver, columns,
-              busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), 
scanServerSelectorDelay);
+          QueryTask queryTask =
+              new QueryTask(tsLocation, tabletSubset, failures, receiver, 
columns, busyTimeout,
+                  reporters.getOrDefault(tsLocation, (t, r) -> {}), 
scanServerSelectorDelay);
           queryTasks.add(queryTask);
         }
       }
@@ -599,7 +604,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
   private static class ScanServerData {
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges;
     ScanServerSelections actions;
-    Map<String,ScanServerAttemptReporter> reporters;
+    Map<String,BatchAttemptReporter> reporters;
   }
 
   private ScanServerData 
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
@@ -652,7 +657,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
 
-    Map<String,ScanServerAttemptReporter> reporters = new HashMap<>();
+    Map<String,BatchAttemptReporter> reporters = new HashMap<>();
 
     for (TabletIdImpl tabletId : tabletIds) {
       KeyExtent extent = tabletId.toKeyExtent();
@@ -672,7 +677,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
       rangeMap.put(extent, ranges);
 
       var server = serverToUse;
-      reporters.computeIfAbsent(serverToUse, k -> 
scanAttempts.createReporter(server, tabletId));
+      reporters.computeIfAbsent(serverToUse, k -> 
scanAttempts.createReporter(server));
     }
 
     ScanServerData ssd = new ScanServerData();
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
index fef92c8043..a492d80af5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
@@ -20,7 +20,9 @@ package org.apache.accumulo.core.spi.scan;
 
 import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.HOST;
 
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -82,8 +84,7 @@ public class ConfigurableScanServerHostSelector extends 
ConfigurableScanServerSe
   /**
    * @return map of previous failure keyed on host name with a set of servers 
per host
    */
-  Map<String,Set<String>> computeFailuresByHost(TabletId tablet, 
SelectorParameters params) {
-    var attempts = params.getAttempts(tablet);
+  Map<String,Set<String>> computeFailuresByHost(Collection<? extends 
ScanServerAttempt> attempts) {
     if (attempts.isEmpty()) {
       return Map.of();
     }
@@ -152,13 +153,25 @@ public class ConfigurableScanServerHostSelector extends 
ConfigurableScanServerSe
   }
 
   @Override
-  int selectServers(SelectorParameters params, Profile profile, 
RendezvousHasher rhasher,
-      Map<TabletId,String> serversToUse) {
+  ScanServerSelections selectServers(ScanServerSelector.SelectorParameters 
params, Profile profile,
+      RendezvousHasher rhasher) {
 
     int maxHostAttempt = 0;
+    int maxTabletErrors = 0;
+
+    HashMap<TabletId,String> serversToUse = new HashMap<>();
 
     for (TabletId tablet : params.getTablets()) {
-      Map<String,Set<String>> prevFailures = computeFailuresByHost(tablet, 
params);
+      var attempts = params.getAttempts(tablet);
+      Map<String,Set<String>> prevFailures = computeFailuresByHost(attempts);
+
+      int tabletErrors = 0;
+      for (var attempt : attempts) {
+        if (attempt.getResult() == ScanServerAttempt.Result.ERROR) {
+          tabletErrors++;
+        }
+      }
+      maxTabletErrors = Math.max(tabletErrors, maxTabletErrors);
 
       for (int hostAttempt = 0; hostAttempt < 
profile.getAttemptPlans().size(); hostAttempt++) {
         maxHostAttempt = Math.max(hostAttempt, maxHostAttempt);
@@ -183,6 +196,24 @@ public class ConfigurableScanServerHostSelector extends 
ConfigurableScanServerSe
       }
     }
 
-    return maxHostAttempt;
+    Duration busyTO = 
Duration.ofMillis(profile.getBusyTimeout(maxHostAttempt));
+    Duration delay = computeDelay(maxTabletErrors);
+
+    return new ScanServerSelections() {
+      @Override
+      public String getScanServer(TabletId tabletId) {
+        return serversToUse.get(tabletId);
+      }
+
+      @Override
+      public Duration getDelay() {
+        return delay;
+      }
+
+      @Override
+      public Duration getBusyTimeout() {
+        return busyTO;
+      }
+    };
   }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
index b54659e33c..64123308c5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
@@ -26,6 +26,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -424,34 +425,27 @@ public class ConfigurableScanServerSelector implements 
ScanServerSelector {
       };
     }
 
-    Map<TabletId,String> serversToUse = new HashMap<>();
+    return selectServers(params, profile, rhasher);
+  }
 
-    int maxAttempts = selectServers(params, profile, rhasher, serversToUse);
+  protected Duration computeDelay(int errorAttempts) {
+    if (errorAttempts == 0) {
+      return Duration.ZERO;
+    } else {
+      return Duration.ofMillis((long) Math.min(30_000, 100 * Math.pow(2, 
(errorAttempts - 1))));
+    }
+  }
 
-    Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxAttempts));
+  ScanServerSelections selectServers(ScanServerSelector.SelectorParameters 
params, Profile profile,
+      RendezvousHasher rhasher) {
+    int attempts = 0;
+    int errorAttempts = 0;
 
-    return new ScanServerSelections() {
-      @Override
-      public String getScanServer(TabletId tabletId) {
-        return serversToUse.get(tabletId);
-      }
+    HashMap<TabletId,String> serversToUse = new HashMap<>();
 
-      @Override
-      public Duration getDelay() {
-        return Duration.ZERO;
-      }
-
-      @Override
-      public Duration getBusyTimeout() {
-        return busyTO;
-      }
-    };
-  }
-
-  int selectServers(ScanServerSelector.SelectorParameters params, Profile 
profile,
-      RendezvousHasher rhasher, Map<TabletId,String> serversToUse) {
-    int attempts = params.getTablets().stream()
-        .mapToInt(tablet -> params.getAttempts(tablet).size()).max().orElse(0);
+    for (TabletId tablet : params.getTablets()) {
+      attempts = Math.max(attempts, params.getAttempts(tablet).size());
+    }
 
     int numServers = profile.getNumServers(attempts,
         rhasher.getSnapshot().getServersForGroup(profile.group).size());
@@ -461,9 +455,17 @@ public class ConfigurableScanServerSelector implements 
ScanServerSelector {
 
       var tabletAttempts = params.getAttempts(tablet);
       if (!tabletAttempts.isEmpty()) {
+        HashSet<String> attemptServers = new HashSet<>();
+        int errorCount = 0;
+        for (var attempt : tabletAttempts) {
+          attemptServers.add(attempt.getServer());
+          if (attempt.getResult() == ScanServerAttempt.Result.ERROR) {
+            errorCount++;
+          }
+        }
+        errorAttempts = Math.max(errorCount, errorAttempts);
         // remove servers that failed in previous attempts
-        var attemptServers =
-            
tabletAttempts.stream().map(ScanServerAttempt::getServer).collect(Collectors.toSet());
+
         var copy = rendezvousServers.stream().filter(server -> 
!attemptServers.contains(server))
             .collect(Collectors.toList());
         if (!copy.isEmpty()) {
@@ -475,6 +477,25 @@ public class ConfigurableScanServerSelector implements 
ScanServerSelector {
       String serverToUse = 
rendezvousServers.get(RANDOM.nextInt(rendezvousServers.size()));
       serversToUse.put(tablet, serverToUse);
     }
-    return attempts;
+
+    Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(attempts));
+    Duration delay = computeDelay(errorAttempts);
+
+    return new ScanServerSelections() {
+      @Override
+      public String getScanServer(TabletId tabletId) {
+        return serversToUse.get(tabletId);
+      }
+
+      @Override
+      public Duration getDelay() {
+        return delay;
+      }
+
+      @Override
+      public Duration getBusyTimeout() {
+        return busyTO;
+      }
+    };
   }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index 9cfbffb490..d815421070 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@ -27,9 +27,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.security.SecureRandom;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -46,6 +48,8 @@ import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 public class ConfigurableScanServerSelectorTest {
 
@@ -123,6 +127,13 @@ public class ConfigurableScanServerSelectorTest {
       this.hints = hints;
     }
 
+    SelectorParams(Set<TabletId> tablets,
+        Map<TabletId,Collection<? extends ScanServerAttempt>> attempts, 
Map<String,String> hints) {
+      this.tablets = Set.copyOf(tablets);
+      this.attempts = attempts;
+      this.hints = hints;
+    }
+
     @Override
     public Collection<TabletId> getTablets() {
       return tablets;
@@ -595,8 +606,9 @@ public class ConfigurableScanServerSelectorTest {
   /**
    * Test that previous failures are not used again unless all servers have 
failed
    */
-  @Test
-  public void testPreviousFailures() {
+  @ParameterizedTest
+  @EnumSource
+  public void testPreviousFailures(ScanServerAttempt.Result result) {
     var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
     HashMap<String,String> servers = new HashMap<>();
     for (int i = 0; i < 30; i++) {
@@ -616,7 +628,7 @@ public class ConfigurableScanServerSelectorTest {
 
     // try selecting again, should pick a different server
     var attempts = new HashSet<ScanServerAttempt>();
-    attempts.add(new TestScanServerAttempt(selected, 
ScanServerAttempt.Result.BUSY));
+    attempts.add(new TestScanServerAttempt(selected, result));
     var selected2 =
         selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, 
attempts), Map.of()))
             .getScanServer(tabletId);
@@ -624,7 +636,7 @@ public class ConfigurableScanServerSelectorTest {
     assertNotEquals(selected, selected2);
 
     // try selecting again, should pick a different server
-    attempts.add(new TestScanServerAttempt(selected2, 
ScanServerAttempt.Result.BUSY));
+    attempts.add(new TestScanServerAttempt(selected2, result));
     var selected3 =
         selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, 
attempts), Map.of()))
             .getScanServer(tabletId);
@@ -633,10 +645,85 @@ public class ConfigurableScanServerSelectorTest {
     assertNotEquals(selected2, selected3);
 
     // try selecting again, at this point all servers failed so should try any 
one of them
-    attempts.add(new TestScanServerAttempt(selected3, 
ScanServerAttempt.Result.BUSY));
+    attempts.add(new TestScanServerAttempt(selected3, result));
     var selected4 =
         selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId, 
attempts), Map.of()))
             .getScanServer(tabletId);
     assertTrue(Set.of(selected, selected2, selected3).contains(selected4));
   }
+
+  @Test
+  public void testErrors() {
+    var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+    HashMap<String,String> servers = new HashMap<>();
+    for (int i = 0; i < 30; i++) {
+      servers.put(String.format("localhost:%d", 8000 + i), dg);
+    }
+
+    String defaultProfile =
+        
"{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s',"
+            + "'attemptPlans':[{'servers':3, 'busyTimeout':'60s'}]}";
+    var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', 
'"'));
+    ConfigurableScanServerSelector selector = new 
ConfigurableScanServerSelector();
+    selector.init(new InitParams(() -> servers, opts));
+
+    var tablet1 = nti("1", "m");
+    var tablet2 = nti("1", "x");
+    var selections =
+        selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
Map.of(), Map.of()));
+    // no errors so there should be no delay
+    assertEquals(Duration.ZERO, selections.getDelay());
+    var selected1 = selections.getScanServer(tablet1);
+    var selected2 = selections.getScanServer(tablet2);
+    assertTrue(servers.containsKey(selected1));
+    assertTrue(servers.containsKey(selected2));
+
+    Map<TabletId,Collection<? extends ScanServerAttempt>> attempts = new 
HashMap<>();
+    List<ScanServerAttempt> tablet1Attempts = new ArrayList<>();
+    attempts.put(tablet1, tablet1Attempts);
+    List<ScanServerAttempt> tablet2Attempts = new ArrayList<>();
+    attempts.put(tablet2, tablet2Attempts);
+
+    tablet1Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.BUSY));
+    selections =
+        selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+    // no errors, only a busy timeout, so there should be no delay
+    assertEquals(Duration.ZERO, selections.getDelay());
+
+    // add a single error to single tablet, should cause a delay
+    tablet2Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+    selections =
+        selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+    assertEquals(Duration.ofMillis(100), selections.getDelay());
+
+    // add a single error to another tablet, should not increase the delay
+    tablet1Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+    selections =
+        selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+    assertEquals(Duration.ofMillis(100), selections.getDelay());
+
+    // make tablet 1 have two errors, should cause a 200 ms delay
+    tablet1Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+    selections =
+        selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+    assertEquals(Duration.ofMillis(200), selections.getDelay());
+
+    // make tablet 2 have three errors, should cause a 400ms delay
+    tablet2Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+    tablet2Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+    selections =
+        selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+    assertEquals(Duration.ofMillis(400), selections.getDelay());
+
+    // keep adding errors until max is reached
+    int expected = 400;
+    while (expected < 30_000) {
+      expected *= 2;
+      expected = Math.min(30_000, expected);
+      tablet2Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+      selections =
+          selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+      assertEquals(Duration.ofMillis(expected), selections.getDelay());
+    }
+  }
 }

Reply via email to