This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 2cc38dff5e Back ported Timeout value to 2.1 (#4671)
2cc38dff5e is described below
commit 2cc38dff5e26f66dd7745c67ec7dadb44d7cc786
Author: Arbaaz Khan <[email protected]>
AuthorDate: Tue Jun 18 16:28:37 2024 -0400
Back ported Timeout value to 2.1 (#4671)
* back ported Timeout value to 2.1
* added changes
---
.../TabletServerBatchReaderIterator.java | 19 +++++-
.../accumulo/core/clientImpl/ThriftScanner.java | 61 +++++++++++++++++--
.../spi/scan/ConfigurableScanServerSelector.java | 70 ++++++++++++++++++----
.../accumulo/core/spi/scan/ScanServerSelector.java | 39 +++++++++++-
.../scan/ConfigurableScanServerSelectorTest.java | 70 ++++++++++++++++++++--
.../accumulo/test/ScanServerIT_NoServers.java | 52 ++++++++++++++--
6 files changed, 283 insertions(+), 28 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 8f30ded1c6..322fd0f098 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -36,11 +36,13 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
@@ -502,7 +504,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
final ResultReceiver receiver, List<Column> columns) {
-
+ long startTime = System.currentTimeMillis();
int maxTabletsPerRequest = Integer.MAX_VALUE;
long busyTimeout = 0;
@@ -510,7 +512,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
Map<String,ScanServerAttemptReporter> reporters = Map.of();
if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
- var scanServerData = rebinToScanServers(binnedRanges);
+ var scanServerData = rebinToScanServers(binnedRanges, startTime);
busyTimeout = scanServerData.actions.getBusyTimeout().toMillis();
reporters = scanServerData.reporters;
scanServerSelectorDelay = scanServerData.actions.getDelay();
@@ -603,7 +605,8 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
Map<String,ScanServerAttemptReporter> reporters;
}
- private ScanServerData
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
+ private ScanServerData
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
+ long startTime) {
ScanServerSelector ecsm = context.getScanServerSelector();
List<TabletIdImpl> tabletIds =
@@ -613,6 +616,9 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
// get a snapshot of this once,not each time the plugin request it
var scanAttemptsSnapshot = scanAttempts.snapshot();
+ Duration timeoutLeft = Duration.ofMillis(retryTimeout)
+ .minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
ScanServerSelector.SelectorParameters params = new
ScanServerSelector.SelectorParameters() {
@Override
public Collection<TabletId> getTablets() {
@@ -628,6 +634,13 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
public Map<String,String> getHints() {
return options.executionHints;
}
+
+ @Override
+ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition,
Duration maxWaitTime,
+ String description) {
+ return ThriftScanner.waitUntil(condition, maxWaitTime, description,
timeoutLeft, context,
+ tableId, log);
+ }
};
var actions = ecsm.selectServers(params);
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 604aece29d..c76277f79b 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -30,9 +30,12 @@ import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
@@ -40,6 +43,7 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
@@ -75,6 +79,7 @@ import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.Retry;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
@@ -262,6 +267,44 @@ public class ThriftScanner {
}
}
+ static <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration
maxWaitTime,
+ String description, Duration timeoutLeft, ClientContext context, TableId
tableId,
+ Logger log) {
+ Retry retry = Retry.builder().infiniteRetries().retryAfter(100,
TimeUnit.MILLISECONDS)
+ .incrementBy(100, TimeUnit.MILLISECONDS).maxWait(1,
SECONDS).backOffFactor(1.5)
+ .logInterval(3, TimeUnit.MINUTES).createRetry();
+
+ long startTime = System.nanoTime();
+ Optional<T> optional = condition.get();
+ while (optional.isEmpty()) {
+ log.trace("For tableId {} scan server selector is waiting for '{}'",
tableId, description);
+
+ var elapsedTime = Duration.ofNanos(System.nanoTime() - startTime);
+
+ if (elapsedTime.compareTo(timeoutLeft) > 0) {
+ throw new TimedOutException("While waiting for '" + description
+ + "' in order to select a scan server, the scan timed out. ");
+ }
+
+ if (elapsedTime.compareTo(maxWaitTime) > 0) {
+ return Optional.empty();
+ }
+
+ context.requireNotDeleted(tableId);
+
+ try {
+ retry.waitForNextAttempt(log, String.format(
+ "For tableId %s scan server selector is waiting for '%s'",
tableId, description));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ optional = condition.get();
+ }
+
+ return optional;
+ }
+
public static class ScanTimedOutException extends IOException {
private static final long serialVersionUID = 1L;
@@ -370,7 +413,7 @@ public class ThriftScanner {
Span child2 = TraceUtil.startSpan(ThriftScanner.class,
"scan::location",
Map.of("tserver", loc.tablet_location));
try (Scope scanLocation = child2.makeCurrent()) {
- results = scan(loc, scanState, context);
+ results = scan(loc, scanState, context, timeOut, startTime);
} catch (AccumuloSecurityException e) {
context.clearTableListCache();
context.requireNotDeleted(scanState.tableId);
@@ -510,9 +553,9 @@ public class ThriftScanner {
}
}
- private static List<KeyValue> scan(TabletLocation loc, ScanState scanState,
ClientContext context)
- throws AccumuloSecurityException, NotServingTabletException, TException,
- NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException
{
+ private static List<KeyValue> scan(TabletLocation loc, ScanState scanState,
ClientContext context,
+ long timeOut, long startTime) throws AccumuloSecurityException,
NotServingTabletException,
+ TException, NoSuchScanIDException, TooManyFilesException,
TSampleNotPresentException {
if (scanState.finished) {
return null;
}
@@ -536,6 +579,9 @@ public class ThriftScanner {
// obtain a snapshot once and only expose this snapshot to the plugin
for consistency
var attempts = scanState.scanAttempts.snapshot();
+ Duration timeoutLeft = Duration.ofSeconds(timeOut)
+ .minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
var params = new ScanServerSelector.SelectorParameters() {
@Override
@@ -555,6 +601,13 @@ public class ThriftScanner {
}
return scanState.executionHints;
}
+
+ @Override
+ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition,
Duration maxWaitTime,
+ String description) {
+ return ThriftScanner.waitUntil(condition, maxWaitTime,
description, timeoutLeft,
+ context, loc.tablet_extent.tableId(), log);
+ }
};
ScanServerSelections actions =
context.getScanServerSelector().selectServers(params);
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
index 7dfa7033db..e0b9ddbf3d 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -80,7 +81,28 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
* specified, the set of scan servers that did not specify a group will be
used. Grouping scan
* servers supports at least two use cases. First groups can be used to
dedicate resources for
* certain scans. Second groups can be used to have different hardware/VM
types for scans, for
- * example could have some scans use expensive high memory VMs and others use
cheaper burstable VMs.
+ * example could have some scans use expensive high memory VMs and others use
cheaper burstable
+ * VMs.</li>
+ * <li><b>timeToWaitForScanServers : </b> When there are no scans servers,
this setting determines
+ * how long to wait for scan servers to become available before falling back
to tablet servers.
+ * Falling back to tablet servers may cause tablets to be loaded that are not
currently loaded. When
+ * this setting is given a wait time and there are no scan servers, it will
wait for scan servers to
+ * be available. This setting avoids loading tablets on tablet servers when
scans servers are
+ * temporarily unavailable which could be caused by normal cluster activity.
You can specify the
+ * wait time using different units to precisely control the wait duration. The
supported units are:
+ * <ul>
+ * <li>"d" for days</li>
+ * <li>"h" for hours</li>
+ * <li>"m" for minutes</li>
+ * <li>"s" for seconds</li>
+ * <li>"ms" for milliseconds</li>
+ * </ul>
+ * If duration is not specified this setting defaults to 0s, and will disable
the wait for scan
+ * servers and will fall back to tablet servers immediately. When set to a
large value, the selector
+ * will effectively wait for scan servers to become available before falling
back to tablet servers.
+ * To ensure the selector never falls back scanning tablet servers an
unrealistic wait time can be
+ * set. For instanced should be sufficient. Setting Waiting for scan servers
is done via
+ * {@link
org.apache.accumulo.core.spi.scan.ScanServerSelector.SelectorParameters#waitUntil(Supplier,
Duration, String)}</li>
* <li><b>attemptPlans : </b> A list of configuration to use for each scan
attempt. Each list object
* has the following fields:
* <ul>
@@ -114,6 +136,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
* "maxBusyTimeout":"20m",
* "busyTimeoutMultiplier":8,
* "group":"lowcost",
+ * "timeToWaitForScanServers": "120s",
* "attemptPlans":[
* {"servers":"1", "busyTimeout":"10s"},
* {"servers":"3", "busyTimeout":"30s","salt":"42"},
@@ -133,16 +156,18 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
* </p>
*
* <p>
- * For the profile activated by {@code scan_type=slow} it start off by
choosing randomly from 1 scan
- * server based on a hash of the tablet with no salt and a busy timeout of
10s. The second attempt
- * will choose from 3 scan servers based on a hash of the tablet plus the salt
{@literal 42}.
- * Without the salt, the single scan servers from the first attempt would
always be included in the
- * set of 3. With the salt the single scan server from the first attempt may
not be included. The
- * third attempt will choose a scan server from 9 using the salt {@literal 84}
and a busy timeout of
- * 60s. The different salt means the set of servers that attempts 2 and 3
choose from may be
- * disjoint. Attempt 4 and greater will continue to choose from the same 9
servers as attempt 3 and
- * will keep increasing the busy timeout by multiplying 8 until the maximum of
20 minutes is
- * reached. For this profile it will choose from scan servers in the group
{@literal lowcost}.
+ * For the profile activated by {@code scan_type=slow} it starts off by
choosing randomly from 1
+ * scan server based on a hash of the tablet with no salt and a busy timeout
of 10s. The second
+ * attempt will choose from 3 scan servers based on a hash of the tablet plus
the salt
+ * {@literal 42}. Without the salt, the single scan servers from the first
attempt would always be
+ * included in the set of 3. With the salt the single scan server from the
first attempt may not be
+ * included. The third attempt will choose a scan server from 9 using the salt
{@literal 84} and a
+ * busy timeout of 60s. The different salt means the set of servers that
attempts 2 and 3 choose
+ * from may be disjoint. Attempt 4 and greater will continue to choose from
the same 9 servers as
+ * attempt 3 and will keep increasing the busy timeout by multiplying 8 until
the maximum of 20
+ * minutes is reached. For this profile it will choose from scan servers in
the group
+ * {@literal lowcost}. This profile also will not fallback to tablet servers
when there are
+ * currently no scan servers, it will wait for scan servers to become
available.
* </p>
*
* @since 2.1.0
@@ -225,10 +250,13 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
int busyTimeoutMultiplier;
String maxBusyTimeout;
String group = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+ String timeToWaitForScanServers = "0s";
transient boolean parsed = false;
transient long parsedMaxBusyTimeout;
+ transient Duration parsedTimeToWaitForScanServers;
+
int getNumServers(int attempt, int totalServers) {
int index = Math.min(attempt, attemptPlans.size() - 1);
return attemptPlans.get(index).getNumServers(totalServers);
@@ -239,6 +267,8 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
return;
}
parsedMaxBusyTimeout =
ConfigurationTypeHelper.getTimeInMillis(maxBusyTimeout);
+ parsedTimeToWaitForScanServers =
+
Duration.ofMillis(ConfigurationTypeHelper.getTimeInMillis(timeToWaitForScanServers));
parsed = true;
}
@@ -259,6 +289,11 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
int index = Math.min(attempts, attemptPlans.size() - 1);
return attemptPlans.get(index).salt;
}
+
+ Duration getTimeToWaitForScanServers() {
+ parse();
+ return parsedTimeToWaitForScanServers;
+ }
}
private void parseProfiles(Map<String,String> options) {
@@ -332,7 +367,20 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
List<String> orderedScanServers =
orderedScanServersSupplier.get().getOrDefault(profile.group,
List.of());
+ Duration scanServerWaitTime = profile.getTimeToWaitForScanServers();
+
+ var finalProfile = profile;
+ if (orderedScanServers.isEmpty() && !scanServerWaitTime.isZero()) {
+ // Wait for scan servers in the configured group to be present.
+ orderedScanServers = params.waitUntil(
+ () ->
Optional.ofNullable(orderedScanServersSupplier.get().get(finalProfile.group)),
+ scanServerWaitTime, "scan servers in group : " +
profile.group).orElseThrow();
+ // at this point the list should be non empty unless there is a bug
+ Preconditions.checkState(!orderedScanServers.isEmpty());
+ }
+
if (orderedScanServers.isEmpty()) {
+ // there are no scan servers so fall back to the tablet server
return new ScanServerSelections() {
@Override
public String getScanServer(TabletId tabletId) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java
index 541ff7a858..d74199a278 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java
@@ -18,8 +18,11 @@
*/
package org.apache.accumulo.core.spi.scan;
+import java.time.Duration;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.accumulo.core.client.ScannerBase;
@@ -77,7 +80,6 @@ public interface ScanServerSelector {
* made using a consistent set of scan servers.
*/
Supplier<Collection<ScanServerInfo>> getScanServers();
-
}
/**
@@ -105,11 +107,46 @@ public interface ScanServerSelector {
* were set, an empty map is returned.
*/
Map<String,String> getHints();
+
+ /**
+ * This function helps a scan server selector wait for an optional to
become non-empty (like
+ * waiting for scan servers to be present) and throws exceptions when
waiting is no longer
+ * possible OR returning false if the max wait time was exceeded. The
passed in condition will
+ * be periodically called and as long as it returns an empty optional the
function will continue
+ * to wait.
+ *
+ * @param condition periodically calls this to see if it is non-empty.
+ * @param maxWaitTime the maximum time to wait for the condition to become
non-empty
+ * @param description a description of what is being waited on, used for
error messages and
+ * logging
+ * @return The first non-empty optional returned by the condition. An
empty optional if the
+ * maxWaitTime was exceeded without the condition ever returning a
non-empty optional.
+ *
+ * @throws org.apache.accumulo.core.client.TableDeletedException if the
table is deleted while
+ * waiting for the condition to become non-empty. Do not catch
this exception, let it
+ * escape.
+ * @throws org.apache.accumulo.core.client.TimedOutException if the
timeout specified by
+ * {@link ScannerBase#setTimeout(long, TimeUnit)} is exceeded
while waiting. Do not
+ * catch this exception, let it escape.
+ *
+ * @since 4.0.0
+ */
+ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration
maxWaitTime,
+ String description);
}
/**
+ * <p>
* Uses the {@link SelectorParameters} to determine which, if any,
ScanServer should be used for
* scanning a tablet.
+ * </p>
+ *
+ * <p>
+ * In the case where there are zero scan servers available and an
implementation does not want to
+ * fall back to tablet servers, its ok to wait and poll for scan servers.
When waiting its best to
+ * use {@link SelectorParameters#waitUntil(Supplier, Duration, String)} as
this allows Accumulo to
+ * know about the wait and cancel it via exceptions when it no longer makes
sense to wait.
+ * </p>
*
* @param params parameters for the calculation
* @return results
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index 432667affb..25ed62e89a 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.core.spi.scan;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -29,7 +30,9 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -39,6 +42,7 @@ import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
@@ -47,7 +51,7 @@ public class ConfigurableScanServerSelectorTest {
static class InitParams implements ScanServerSelector.InitParameters {
private final Map<String,String> opts;
- private final Map<String,String> scanServers;
+ private final Supplier<Map<String,String>> scanServers;
InitParams(Set<String> scanServers) {
this(scanServers, Map.of());
@@ -55,12 +59,18 @@ public class ConfigurableScanServerSelectorTest {
InitParams(Set<String> scanServers, Map<String,String> opts) {
this.opts = opts;
- this.scanServers = new HashMap<>();
+ var scanServersMap = new HashMap<String,String>();
scanServers.forEach(
- sserv -> this.scanServers.put(sserv,
ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME));
+ sserv -> scanServersMap.put(sserv,
ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME));
+ this.scanServers = () -> scanServersMap;
}
InitParams(Map<String,String> scanServers, Map<String,String> opts) {
+ this.opts = opts;
+ this.scanServers = () -> scanServers;
+ }
+
+ InitParams(Supplier<Map<String,String>> scanServers, Map<String,String>
opts) {
this.opts = opts;
this.scanServers = scanServers;
}
@@ -77,7 +87,7 @@ public class ConfigurableScanServerSelectorTest {
@Override
public Supplier<Collection<ScanServerInfo>> getScanServers() {
- return () -> scanServers.entrySet().stream().map(entry -> new
ScanServerInfo() {
+ return () -> scanServers.get().entrySet().stream().map(entry -> new
ScanServerInfo() {
@Override
public String getAddress() {
@@ -126,6 +136,13 @@ public class ConfigurableScanServerSelectorTest {
public Map<String,String> getHints() {
return hints;
}
+
+ @Override
+ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration
maxWaitTime,
+ String description) {
+ throw new UnsupportedOperationException();
+ }
+
}
static class TestScanServerAttempt implements ScanServerAttempt {
@@ -467,4 +484,49 @@ public class ConfigurableScanServerSelectorTest {
assertEquals(Set.of("ss1:1", "ss2:2", "ss3:3"), servers);
}
+
+ @Test
+ public void testWaitForScanServers() {
+ // this test ensures that when there are no scan servers that the
ConfigurableScanServerSelector
+ // will wait for scan servers
+
+ String defaultProfile =
+
"{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s',"
+ + "'attemptPlans':[{'servers':'100%', 'busyTimeout':'60s'}]}";
+
+ var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'',
'"'));
+
+ ConfigurableScanServerSelector selector = new
ConfigurableScanServerSelector();
+
+ AtomicReference<Map<String,String>> scanServers = new
AtomicReference<>(Map.of());
+
+ selector.init(new InitParams(scanServers::get, opts));
+
+ var tabletId = nti("1", "m");
+
+ var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+
+ var params = new SelectorParams(tabletId, Map.of(), Map.of()) {
+ @Override
+ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition,
Duration maxWaitTime,
+ String description) {
+ // make some scan servers available now that wait was called
+ scanServers.set(Map.of("ss1:1", dg, "ss2:2", dg, "ss3:3", dg));
+
+ Optional<T> optional = condition.get();
+
+ while (optional.isEmpty()) {
+ UtilWaitThread.sleep(10);
+ optional = condition.get();
+ }
+
+ return optional;
+ }
+ };
+
+ ScanServerSelections actions = selector.selectServers(params);
+
+ assertTrue(Set.of("ss1:1", "ss2:2",
"ss3:3").contains(actions.getScanServer(tabletId)));
+ assertFalse(scanServers.get().isEmpty());
+ }
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
index 02b673db8a..f8dde71bbf 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
@@ -25,16 +25,20 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
-import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelector;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -123,16 +127,54 @@ public class ScanServerIT_NoServers extends
SharedMiniClusterBase {
}
@Test
- public void testScanOfflineTable() throws Exception {
- try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ public void testScanWithNoTserverFallback() throws Exception {
+
+ var clientProps = new Properties();
+ clientProps.putAll(getClientProps());
+ String scanServerSelectorProfiles =
"[{'isDefault':true,'maxBusyTimeout':'5m',"
+ + "'busyTimeoutMultiplier':8, 'scanTypeActivations':[],
'timeToWaitForScanServers':'120s',"
+ + "'attemptPlans':[{'servers':'3', 'busyTimeout':'1s'}]}]";
+ clientProps.put("scan.server.selector.impl",
ConfigurableScanServerSelector.class.getName());
+ clientProps.put("scan.server.selector.opts.profiles",
+ scanServerSelectorProfiles.replace("'", "\""));
+
+ try (AccumuloClient client =
Accumulo.newClient().from(clientProps).build()) {
String tableName = getUniqueNames(1)[0];
createTableAndIngest(client, tableName, null, 10, 10, "colf");
- client.tableOperations().offline(tableName, true);
- assertThrows(TableOfflineException.class, () -> {
+ assertThrows(TimedOutException.class, () -> {
try (Scanner scanner = client.createScanner(tableName,
Authorizations.EMPTY)) {
scanner.setRange(new Range());
+ scanner.setTimeout(1, TimeUnit.SECONDS);
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertEquals(100, Iterables.size(scanner));
+ } // when the scanner is closed, all open sessions should be closed
+ });
+ }
+ }
+
+ @Test
+ public void testBatchScanWithNoTserverFallback() throws Exception {
+
+ var clientProps = new Properties();
+ clientProps.putAll(getClientProps());
+ String scanServerSelectorProfiles =
"[{'isDefault':true,'maxBusyTimeout':'5m',"
+ + "'busyTimeoutMultiplier':8, 'scanTypeActivations':[],
'timeToWaitForScanServers':'120s',"
+ + "'attemptPlans':[{'servers':'3', 'busyTimeout':'1s'}]}]";
+ clientProps.put("scan.server.selector.impl",
ConfigurableScanServerSelector.class.getName());
+ clientProps.put("scan.server.selector.opts.profiles",
+ scanServerSelectorProfiles.replace("'", "\""));
+
+ try (AccumuloClient client =
Accumulo.newClient().from(clientProps).build()) {
+ String tableName = getUniqueNames(1)[0];
+
+ createTableAndIngest(client, tableName, null, 10, 10, "colf");
+
+ assertThrows(TimedOutException.class, () -> {
+ try (BatchScanner scanner = client.createBatchScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRanges(List.of(new Range()));
+ scanner.setTimeout(1, TimeUnit.SECONDS);
scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
assertEquals(100, Iterables.size(scanner));
} // when the scanner is closed, all open sessions should be closed