This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 586b210e84 Sleeps after eventual scan RPC failures (#6315)
586b210e84 is described below
commit 586b210e8437a842e8150d124c557c55163e4444
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 15 09:04:29 2026 -0700
Sleeps after eventual scan RPC failures (#6315)
Fixes two problems. First the scan server client side plugins were not
computing a sleep time based on observed errors. This could cause
aggressive retries with scans. Modified the provided plugins to compute
this.
Second the batch scanner code was not properly collecting the
information needed by scan server client side plugin to know if errors
happened. The batch scanner code was not collecting information for all
tablets, just a somewhat random subset of them. Corrected it to collect
for all tablets. Also made the batch scanner code properly report
failed tablets to the client side scan server plugin.
Both changes together fix #6313. Manually tested the batch scanner code
changes by adding logs and running test to ensure the correct sleeps
were happening. Also manually tested the scanner to ensure it was
working correctly and sleeping as expected after errors.
---
core/pom.xml | 5 ++
.../core/clientImpl/ScanServerAttemptImpl.java | 4 +
.../core/clientImpl/ScanServerAttemptsImpl.java | 22 ++++-
.../TabletServerBatchReaderIterator.java | 29 ++++---
.../scan/ConfigurableScanServerHostSelector.java | 43 ++++++++--
.../spi/scan/ConfigurableScanServerSelector.java | 75 +++++++++++------
.../scan/ConfigurableScanServerSelectorTest.java | 97 ++++++++++++++++++++--
7 files changed, 224 insertions(+), 51 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 02ecf146ba..bf5672360f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -174,6 +174,11 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
index bda2f26880..0d14d86a11 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
@@ -42,4 +42,8 @@ class ScanServerAttemptImpl implements ScanServerAttempt {
return result;
}
+ @Override
+ public String toString() {
+ return "server:" + server + " result:" + result;
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
index 48ff72b532..4409ebc00f 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
@@ -26,8 +26,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +50,7 @@ public class ScanServerAttemptsImpl {
ScanServerAttemptReporter createReporter(String server, TabletId tablet) {
return result -> {
- LOG.trace("Received result: {}", result);
+ LOG.trace("Received result: {} {} {}", result, tablet, server);
synchronized (attempts) {
attempts.computeIfAbsent(tablet, k -> new ArrayList<>())
.add(new ScanServerAttemptImpl(result, server));
@@ -55,6 +58,23 @@ public class ScanServerAttemptsImpl {
};
}
+ public interface BatchAttemptReporter {
+ void report(Set<KeyExtent> extents, ScanServerAttempt.Result result);
+ }
+
+ BatchAttemptReporter createReporter(String server) {
+ return (tablets, result) -> {
+ LOG.trace("Received result: {} {} {}", result, tablets, server);
+ synchronized (attempts) {
+ var attempt = new ScanServerAttemptImpl(result, server);
+ tablets.forEach(extent -> {
+ var tablet = new TabletIdImpl(extent);
+ attempts.computeIfAbsent(tablet, k -> new
ArrayList<>()).add(attempt);
+ });
+ }
+ };
+ }
+
/**
* Creates and returns a snapshot of {@link ScanServerAttempt} objects that
were added before this
* call
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index d05c483335..9c042c961a 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -52,6 +52,7 @@ import
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TimedOutException;
+import
org.apache.accumulo.core.clientImpl.ScanServerAttemptsImpl.BatchAttemptReporter;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
@@ -365,12 +366,12 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
private final List<Column> columns;
private int semaphoreSize;
private final long busyTimeout;
- private final ScanServerAttemptReporter reporter;
+ private final BatchAttemptReporter reporter;
private final Duration scanServerSelectorDelay;
QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges,
Map<KeyExtent,List<Range>> failures, ResultReceiver receiver,
List<Column> columns,
- long busyTimeout, ScanServerAttemptReporter reporter, Duration
scanServerSelectorDelay) {
+ long busyTimeout, BatchAttemptReporter reporter, Duration
scanServerSelectorDelay) {
this.tsLocation = tsLocation;
this.tabletsRanges = tabletsRanges;
this.receiver = receiver;
@@ -401,6 +402,10 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
options, authorizations, timeoutTracker, busyTimeout);
if (!tsFailures.isEmpty()) {
+ // On scan servers routine failures that occur on tservers, like not
serving tablet or a
+ // tablet closing, are not expected. So for scan server record any
failures seen as an
+ // error.
+ reporter.report(tsFailures.keySet(), ScanServerAttempt.Result.ERROR);
locator.invalidateCache(tsFailures.keySet());
synchronized (failures) {
failures.putAll(tsFailures);
@@ -422,7 +427,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
if (e.getCause() instanceof ScanServerBusyException) {
result = ScanServerAttempt.Result.BUSY;
}
- reporter.report(result);
+ reporter.report(tabletsRanges.keySet(), result);
} catch (AccumuloSecurityException e) {
e.setTableInfo(getTableInfo());
log.debug("AccumuloSecurityException thrown", e);
@@ -496,7 +501,6 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
}
}
}
-
}
private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
@@ -506,7 +510,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
long busyTimeout = 0;
Duration scanServerSelectorDelay = null;
- Map<String,ScanServerAttemptReporter> reporters = Map.of();
+ Map<String,BatchAttemptReporter> reporters = Map.of();
if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
var scanServerData = rebinToScanServers(binnedRanges, startTime);
@@ -564,7 +568,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
final Map<KeyExtent,List<Range>> tabletsRanges =
binnedRanges.get(tsLocation);
if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() ==
1) {
QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges,
failures, receiver, columns,
- busyTimeout, reporters.getOrDefault(tsLocation, r -> {}),
scanServerSelectorDelay);
+ busyTimeout, reporters.getOrDefault(tsLocation, (t, r) -> {}),
scanServerSelectorDelay);
queryTasks.add(queryTask);
} else {
HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
@@ -573,15 +577,16 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
if (tabletSubset.size() >= maxTabletsPerRequest) {
QueryTask queryTask =
new QueryTask(tsLocation, tabletSubset, failures, receiver,
columns, busyTimeout,
- reporters.getOrDefault(tsLocation, r -> {}),
scanServerSelectorDelay);
+ reporters.getOrDefault(tsLocation, (t, r) -> {}),
scanServerSelectorDelay);
queryTasks.add(queryTask);
tabletSubset = new HashMap<>();
}
}
if (!tabletSubset.isEmpty()) {
- QueryTask queryTask = new QueryTask(tsLocation, tabletSubset,
failures, receiver, columns,
- busyTimeout, reporters.getOrDefault(tsLocation, r -> {}),
scanServerSelectorDelay);
+ QueryTask queryTask =
+ new QueryTask(tsLocation, tabletSubset, failures, receiver,
columns, busyTimeout,
+ reporters.getOrDefault(tsLocation, (t, r) -> {}),
scanServerSelectorDelay);
queryTasks.add(queryTask);
}
}
@@ -599,7 +604,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
private static class ScanServerData {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges;
ScanServerSelections actions;
- Map<String,ScanServerAttemptReporter> reporters;
+ Map<String,BatchAttemptReporter> reporters;
}
private ScanServerData
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
@@ -652,7 +657,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
- Map<String,ScanServerAttemptReporter> reporters = new HashMap<>();
+ Map<String,BatchAttemptReporter> reporters = new HashMap<>();
for (TabletIdImpl tabletId : tabletIds) {
KeyExtent extent = tabletId.toKeyExtent();
@@ -672,7 +677,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
rangeMap.put(extent, ranges);
var server = serverToUse;
- reporters.computeIfAbsent(serverToUse, k ->
scanAttempts.createReporter(server, tabletId));
+ reporters.computeIfAbsent(serverToUse, k ->
scanAttempts.createReporter(server));
}
ScanServerData ssd = new ScanServerData();
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
index fef92c8043..a492d80af5 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
@@ -20,7 +20,9 @@ package org.apache.accumulo.core.spi.scan;
import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.HOST;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -82,8 +84,7 @@ public class ConfigurableScanServerHostSelector extends
ConfigurableScanServerSe
/**
* @return map of previous failure keyed on host name with a set of servers
per host
*/
- Map<String,Set<String>> computeFailuresByHost(TabletId tablet,
SelectorParameters params) {
- var attempts = params.getAttempts(tablet);
+ Map<String,Set<String>> computeFailuresByHost(Collection<? extends
ScanServerAttempt> attempts) {
if (attempts.isEmpty()) {
return Map.of();
}
@@ -152,13 +153,25 @@ public class ConfigurableScanServerHostSelector extends
ConfigurableScanServerSe
}
@Override
- int selectServers(SelectorParameters params, Profile profile,
RendezvousHasher rhasher,
- Map<TabletId,String> serversToUse) {
+ ScanServerSelections selectServers(ScanServerSelector.SelectorParameters
params, Profile profile,
+ RendezvousHasher rhasher) {
int maxHostAttempt = 0;
+ int maxTabletErrors = 0;
+
+ HashMap<TabletId,String> serversToUse = new HashMap<>();
for (TabletId tablet : params.getTablets()) {
- Map<String,Set<String>> prevFailures = computeFailuresByHost(tablet,
params);
+ var attempts = params.getAttempts(tablet);
+ Map<String,Set<String>> prevFailures = computeFailuresByHost(attempts);
+
+ int tabletErrors = 0;
+ for (var attempt : attempts) {
+ if (attempt.getResult() == ScanServerAttempt.Result.ERROR) {
+ tabletErrors++;
+ }
+ }
+ maxTabletErrors = Math.max(tabletErrors, maxTabletErrors);
for (int hostAttempt = 0; hostAttempt <
profile.getAttemptPlans().size(); hostAttempt++) {
maxHostAttempt = Math.max(hostAttempt, maxHostAttempt);
@@ -183,6 +196,24 @@ public class ConfigurableScanServerHostSelector extends
ConfigurableScanServerSe
}
}
- return maxHostAttempt;
+ Duration busyTO =
Duration.ofMillis(profile.getBusyTimeout(maxHostAttempt));
+ Duration delay = computeDelay(maxTabletErrors);
+
+ return new ScanServerSelections() {
+ @Override
+ public String getScanServer(TabletId tabletId) {
+ return serversToUse.get(tabletId);
+ }
+
+ @Override
+ public Duration getDelay() {
+ return delay;
+ }
+
+ @Override
+ public Duration getBusyTimeout() {
+ return busyTO;
+ }
+ };
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
index b54659e33c..64123308c5 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
@@ -26,6 +26,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -424,34 +425,27 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
};
}
- Map<TabletId,String> serversToUse = new HashMap<>();
+ return selectServers(params, profile, rhasher);
+ }
- int maxAttempts = selectServers(params, profile, rhasher, serversToUse);
+ protected Duration computeDelay(int errorAttempts) {
+ if (errorAttempts == 0) {
+ return Duration.ZERO;
+ } else {
+ return Duration.ofMillis((long) Math.min(30_000, 100 * Math.pow(2,
(errorAttempts - 1))));
+ }
+ }
- Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxAttempts));
+ ScanServerSelections selectServers(ScanServerSelector.SelectorParameters
params, Profile profile,
+ RendezvousHasher rhasher) {
+ int attempts = 0;
+ int errorAttempts = 0;
- return new ScanServerSelections() {
- @Override
- public String getScanServer(TabletId tabletId) {
- return serversToUse.get(tabletId);
- }
+ HashMap<TabletId,String> serversToUse = new HashMap<>();
- @Override
- public Duration getDelay() {
- return Duration.ZERO;
- }
-
- @Override
- public Duration getBusyTimeout() {
- return busyTO;
- }
- };
- }
-
- int selectServers(ScanServerSelector.SelectorParameters params, Profile
profile,
- RendezvousHasher rhasher, Map<TabletId,String> serversToUse) {
- int attempts = params.getTablets().stream()
- .mapToInt(tablet -> params.getAttempts(tablet).size()).max().orElse(0);
+ for (TabletId tablet : params.getTablets()) {
+ attempts = Math.max(attempts, params.getAttempts(tablet).size());
+ }
int numServers = profile.getNumServers(attempts,
rhasher.getSnapshot().getServersForGroup(profile.group).size());
@@ -461,9 +455,17 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
var tabletAttempts = params.getAttempts(tablet);
if (!tabletAttempts.isEmpty()) {
+ HashSet<String> attemptServers = new HashSet<>();
+ int errorCount = 0;
+ for (var attempt : tabletAttempts) {
+ attemptServers.add(attempt.getServer());
+ if (attempt.getResult() == ScanServerAttempt.Result.ERROR) {
+ errorCount++;
+ }
+ }
+ errorAttempts = Math.max(errorCount, errorAttempts);
// remove servers that failed in previous attempts
- var attemptServers =
-
tabletAttempts.stream().map(ScanServerAttempt::getServer).collect(Collectors.toSet());
+
var copy = rendezvousServers.stream().filter(server ->
!attemptServers.contains(server))
.collect(Collectors.toList());
if (!copy.isEmpty()) {
@@ -475,6 +477,25 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
String serverToUse =
rendezvousServers.get(RANDOM.nextInt(rendezvousServers.size()));
serversToUse.put(tablet, serverToUse);
}
- return attempts;
+
+ Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(attempts));
+ Duration delay = computeDelay(errorAttempts);
+
+ return new ScanServerSelections() {
+ @Override
+ public String getScanServer(TabletId tabletId) {
+ return serversToUse.get(tabletId);
+ }
+
+ @Override
+ public Duration getDelay() {
+ return delay;
+ }
+
+ @Override
+ public Duration getBusyTimeout() {
+ return busyTO;
+ }
+ };
}
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index 9cfbffb490..d815421070 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@ -27,9 +27,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.security.SecureRandom;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -46,6 +48,8 @@ import org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
public class ConfigurableScanServerSelectorTest {
@@ -123,6 +127,13 @@ public class ConfigurableScanServerSelectorTest {
this.hints = hints;
}
+ SelectorParams(Set<TabletId> tablets,
+ Map<TabletId,Collection<? extends ScanServerAttempt>> attempts,
Map<String,String> hints) {
+ this.tablets = Set.copyOf(tablets);
+ this.attempts = attempts;
+ this.hints = hints;
+ }
+
@Override
public Collection<TabletId> getTablets() {
return tablets;
@@ -595,8 +606,9 @@ public class ConfigurableScanServerSelectorTest {
/**
* Test that previous failures are not used again unless all servers have
failed
*/
- @Test
- public void testPreviousFailures() {
+ @ParameterizedTest
+ @EnumSource
+ public void testPreviousFailures(ScanServerAttempt.Result result) {
var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
HashMap<String,String> servers = new HashMap<>();
for (int i = 0; i < 30; i++) {
@@ -616,7 +628,7 @@ public class ConfigurableScanServerSelectorTest {
// try selecting again, should pick a different server
var attempts = new HashSet<ScanServerAttempt>();
- attempts.add(new TestScanServerAttempt(selected,
ScanServerAttempt.Result.BUSY));
+ attempts.add(new TestScanServerAttempt(selected, result));
var selected2 =
selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId,
attempts), Map.of()))
.getScanServer(tabletId);
@@ -624,7 +636,7 @@ public class ConfigurableScanServerSelectorTest {
assertNotEquals(selected, selected2);
// try selecting again, should pick a different server
- attempts.add(new TestScanServerAttempt(selected2,
ScanServerAttempt.Result.BUSY));
+ attempts.add(new TestScanServerAttempt(selected2, result));
var selected3 =
selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId,
attempts), Map.of()))
.getScanServer(tabletId);
@@ -633,10 +645,85 @@ public class ConfigurableScanServerSelectorTest {
assertNotEquals(selected2, selected3);
// try selecting again, at this point all servers failed so should try any
one of them
- attempts.add(new TestScanServerAttempt(selected3,
ScanServerAttempt.Result.BUSY));
+ attempts.add(new TestScanServerAttempt(selected3, result));
var selected4 =
selector.selectServers(new SelectorParams(tabletId, Map.of(tabletId,
attempts), Map.of()))
.getScanServer(tabletId);
assertTrue(Set.of(selected, selected2, selected3).contains(selected4));
}
+
+ @Test
+ public void testErrors() {
+ var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+ HashMap<String,String> servers = new HashMap<>();
+ for (int i = 0; i < 30; i++) {
+ servers.put(String.format("localhost:%d", 8000 + i), dg);
+ }
+
+ String defaultProfile =
+
"{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s',"
+ + "'attemptPlans':[{'servers':3, 'busyTimeout':'60s'}]}";
+ var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'',
'"'));
+ ConfigurableScanServerSelector selector = new
ConfigurableScanServerSelector();
+ selector.init(new InitParams(() -> servers, opts));
+
+ var tablet1 = nti("1", "m");
+ var tablet2 = nti("1", "x");
+ var selections =
+ selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2),
Map.of(), Map.of()));
+ // no errors so there should be no delay
+ assertEquals(Duration.ZERO, selections.getDelay());
+ var selected1 = selections.getScanServer(tablet1);
+ var selected2 = selections.getScanServer(tablet2);
+ assertTrue(servers.containsKey(selected1));
+ assertTrue(servers.containsKey(selected2));
+
+ Map<TabletId,Collection<? extends ScanServerAttempt>> attempts = new
HashMap<>();
+ List<ScanServerAttempt> tablet1Attempts = new ArrayList<>();
+ attempts.put(tablet1, tablet1Attempts);
+ List<ScanServerAttempt> tablet2Attempts = new ArrayList<>();
+ attempts.put(tablet2, tablet2Attempts);
+
+ tablet1Attempts.add(new TestScanServerAttempt(selected1,
ScanServerAttempt.Result.BUSY));
+ selections =
+ selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2),
attempts, Map.of()));
+ // no errors, only a busy timeout, so there should be no delay
+ assertEquals(Duration.ZERO, selections.getDelay());
+
+ // add a single error to single tablet, should cause a delay
+ tablet2Attempts.add(new TestScanServerAttempt(selected1,
ScanServerAttempt.Result.ERROR));
+ selections =
+ selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2),
attempts, Map.of()));
+ assertEquals(Duration.ofMillis(100), selections.getDelay());
+
+ // add a single error to another tablet, should not increase the delay
+ tablet1Attempts.add(new TestScanServerAttempt(selected1,
ScanServerAttempt.Result.ERROR));
+ selections =
+ selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2),
attempts, Map.of()));
+ assertEquals(Duration.ofMillis(100), selections.getDelay());
+
+ // make tablet 1 have two errors, should cause a 200 ms delay
+ tablet1Attempts.add(new TestScanServerAttempt(selected1,
ScanServerAttempt.Result.ERROR));
+ selections =
+ selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2),
attempts, Map.of()));
+ assertEquals(Duration.ofMillis(200), selections.getDelay());
+
+ // make tablet 2 have three errors, should cause a 400ms delay
+ tablet2Attempts.add(new TestScanServerAttempt(selected1,
ScanServerAttempt.Result.ERROR));
+ tablet2Attempts.add(new TestScanServerAttempt(selected1,
ScanServerAttempt.Result.ERROR));
+ selections =
+ selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2),
attempts, Map.of()));
+ assertEquals(Duration.ofMillis(400), selections.getDelay());
+
+ // keep adding errors until max is reached
+ int expected = 400;
+ while (expected < 30_000) {
+ expected *= 2;
+ expected = Math.min(30_000, expected);
+ tablet2Attempts.add(new TestScanServerAttempt(selected1,
ScanServerAttempt.Result.ERROR));
+ selections =
+ selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2),
attempts, Map.of()));
+ assertEquals(Duration.ofMillis(expected), selections.getDelay());
+ }
+ }
}