This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 2cc38dff5e Back ported Timeout value to 2.1 (#4671) 2cc38dff5e is described below commit 2cc38dff5e26f66dd7745c67ec7dadb44d7cc786 Author: Arbaaz Khan <bazzy...@yahoo.com> AuthorDate: Tue Jun 18 16:28:37 2024 -0400 Back ported Timeout value to 2.1 (#4671) * back ported Timeout value to 2.1 * added changes --- .../TabletServerBatchReaderIterator.java | 19 +++++- .../accumulo/core/clientImpl/ThriftScanner.java | 61 +++++++++++++++++-- .../spi/scan/ConfigurableScanServerSelector.java | 70 ++++++++++++++++++---- .../accumulo/core/spi/scan/ScanServerSelector.java | 39 +++++++++++- .../scan/ConfigurableScanServerSelectorTest.java | 70 ++++++++++++++++++++-- .../accumulo/test/ScanServerIT_NoServers.java | 52 ++++++++++++++-- 6 files changed, 283 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 8f30ded1c6..322fd0f098 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -36,11 +36,13 @@ import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloException; @@ -502,7 +504,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, final ResultReceiver receiver, List<Column> columns) { - + long startTime = System.currentTimeMillis(); int maxTabletsPerRequest = Integer.MAX_VALUE; long busyTimeout = 0; @@ -510,7 +512,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value Map<String,ScanServerAttemptReporter> reporters = Map.of(); if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) { - var scanServerData = rebinToScanServers(binnedRanges); + var scanServerData = rebinToScanServers(binnedRanges, startTime); busyTimeout = scanServerData.actions.getBusyTimeout().toMillis(); reporters = scanServerData.reporters; scanServerSelectorDelay = scanServerData.actions.getDelay(); @@ -603,7 +605,8 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value Map<String,ScanServerAttemptReporter> reporters; } - private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) { + private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, + long startTime) { ScanServerSelector ecsm = context.getScanServerSelector(); List<TabletIdImpl> tabletIds = @@ -613,6 +616,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value // get a snapshot of this once,not each time the plugin request it var scanAttemptsSnapshot = scanAttempts.snapshot(); + Duration timeoutLeft = Duration.ofMillis(retryTimeout) + .minus(Duration.ofMillis(System.currentTimeMillis() - startTime)); + ScanServerSelector.SelectorParameters params = new ScanServerSelector.SelectorParameters() { @Override public Collection<TabletId> getTablets() { @@ -628,6 +634,13 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value public Map<String,String> getHints() { return options.executionHints; } + + @Override + public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description) { + return ThriftScanner.waitUntil(condition, maxWaitTime, description, timeoutLeft, context, + tableId, log); + } }; var actions = ecsm.selectServers(params); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index 604aece29d..c76277f79b 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@ -30,9 +30,12 @@ import java.util.EnumMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -40,6 +43,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; @@ -75,6 +79,7 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.OpTimer; +import org.apache.accumulo.core.util.Retry; import org.apache.hadoop.io.Text; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; @@ -262,6 +267,44 @@ public class ThriftScanner { } } + static <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description, Duration timeoutLeft, ClientContext context, TableId tableId, + Logger log) { + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, TimeUnit.MILLISECONDS) + .incrementBy(100, TimeUnit.MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) + .logInterval(3, TimeUnit.MINUTES).createRetry(); + + long startTime = System.nanoTime(); + Optional<T> optional = condition.get(); + while (optional.isEmpty()) { + log.trace("For tableId {} scan server selector is waiting for '{}'", tableId, description); + + var elapsedTime = Duration.ofNanos(System.nanoTime() - startTime); + + if (elapsedTime.compareTo(timeoutLeft) > 0) { + throw new TimedOutException("While waiting for '" + description + + "' in order to select a scan server, the scan timed out. "); + } + + if (elapsedTime.compareTo(maxWaitTime) > 0) { + return Optional.empty(); + } + + context.requireNotDeleted(tableId); + + try { + retry.waitForNextAttempt(log, String.format( + "For tableId %s scan server selector is waiting for '%s'", tableId, description)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + optional = condition.get(); + } + + return optional; + } + public static class ScanTimedOutException extends IOException { private static final long serialVersionUID = 1L; @@ -370,7 +413,7 @@ public class ThriftScanner { Span child2 = TraceUtil.startSpan(ThriftScanner.class, "scan::location", Map.of("tserver", loc.tablet_location)); try (Scope scanLocation = child2.makeCurrent()) { - results = scan(loc, scanState, context); + results = scan(loc, scanState, context, timeOut, startTime); } catch (AccumuloSecurityException e) { context.clearTableListCache(); context.requireNotDeleted(scanState.tableId); @@ -510,9 +553,9 @@ public class ThriftScanner { } } - private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, ClientContext context) - throws AccumuloSecurityException, NotServingTabletException, TException, - NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException { + private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, ClientContext context, + long timeOut, long startTime) throws AccumuloSecurityException, NotServingTabletException, + TException, NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException { if (scanState.finished) { return null; } @@ -536,6 +579,9 @@ public class ThriftScanner { // obtain a snapshot once and only expose this snapshot to the plugin for consistency var attempts = scanState.scanAttempts.snapshot(); + Duration timeoutLeft = Duration.ofSeconds(timeOut) + .minus(Duration.ofMillis(System.currentTimeMillis() - startTime)); + var params = new ScanServerSelector.SelectorParameters() { @Override @@ -555,6 +601,13 @@ public class ThriftScanner { } return scanState.executionHints; } + + @Override + public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description) { + return ThriftScanner.waitUntil(condition, maxWaitTime, description, timeoutLeft, + context, loc.tablet_extent.tableId(), log); + } }; ScanServerSelections actions = context.getScanServerSelector().selectServers(params); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java index 7dfa7033db..e0b9ddbf3d 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -80,7 +81,28 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; * specified, the set of scan servers that did not specify a group will be used. Grouping scan * servers supports at least two use cases. First groups can be used to dedicate resources for * certain scans. Second groups can be used to have different hardware/VM types for scans, for - * example could have some scans use expensive high memory VMs and others use cheaper burstable VMs. + * example could have some scans use expensive high memory VMs and others use cheaper burstable + * VMs.</li> + * <li><b>timeToWaitForScanServers : </b> When there are no scans servers, this setting determines + * how long to wait for scan servers to become available before falling back to tablet servers. + * Falling back to tablet servers may cause tablets to be loaded that are not currently loaded. When + * this setting is given a wait time and there are no scan servers, it will wait for scan servers to + * be available. This setting avoids loading tablets on tablet servers when scans servers are + * temporarily unavailable which could be caused by normal cluster activity. You can specify the + * wait time using different units to precisely control the wait duration. The supported units are: + * <ul> + * <li>"d" for days</li> + * <li>"h" for hours</li> + * <li>"m" for minutes</li> + * <li>"s" for seconds</li> + * <li>"ms" for milliseconds</li> + * </ul> + * If duration is not specified this setting defaults to 0s, and will disable the wait for scan + * servers and will fall back to tablet servers immediately. When set to a large value, the selector + * will effectively wait for scan servers to become available before falling back to tablet servers. + * To ensure the selector never falls back scanning tablet servers an unrealistic wait time can be + * set. For instanced should be sufficient. Setting Waiting for scan servers is done via + * {@link org.apache.accumulo.core.spi.scan.ScanServerSelector.SelectorParameters#waitUntil(Supplier, Duration, String)}</li> * <li><b>attemptPlans : </b> A list of configuration to use for each scan attempt. Each list object * has the following fields: * <ul> @@ -114,6 +136,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; * "maxBusyTimeout":"20m", * "busyTimeoutMultiplier":8, * "group":"lowcost", + * "timeToWaitForScanServers": "120s", * "attemptPlans":[ * {"servers":"1", "busyTimeout":"10s"}, * {"servers":"3", "busyTimeout":"30s","salt":"42"}, @@ -133,16 +156,18 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; * </p> * * <p> - * For the profile activated by {@code scan_type=slow} it start off by choosing randomly from 1 scan - * server based on a hash of the tablet with no salt and a busy timeout of 10s. The second attempt - * will choose from 3 scan servers based on a hash of the tablet plus the salt {@literal 42}. - * Without the salt, the single scan servers from the first attempt would always be included in the - * set of 3. With the salt the single scan server from the first attempt may not be included. The - * third attempt will choose a scan server from 9 using the salt {@literal 84} and a busy timeout of - * 60s. The different salt means the set of servers that attempts 2 and 3 choose from may be - * disjoint. Attempt 4 and greater will continue to choose from the same 9 servers as attempt 3 and - * will keep increasing the busy timeout by multiplying 8 until the maximum of 20 minutes is - * reached. For this profile it will choose from scan servers in the group {@literal lowcost}. + * For the profile activated by {@code scan_type=slow} it starts off by choosing randomly from 1 + * scan server based on a hash of the tablet with no salt and a busy timeout of 10s. The second + * attempt will choose from 3 scan servers based on a hash of the tablet plus the salt + * {@literal 42}. Without the salt, the single scan servers from the first attempt would always be + * included in the set of 3. With the salt the single scan server from the first attempt may not be + * included. The third attempt will choose a scan server from 9 using the salt {@literal 84} and a + * busy timeout of 60s. The different salt means the set of servers that attempts 2 and 3 choose + * from may be disjoint. Attempt 4 and greater will continue to choose from the same 9 servers as + * attempt 3 and will keep increasing the busy timeout by multiplying 8 until the maximum of 20 + * minutes is reached. For this profile it will choose from scan servers in the group + * {@literal lowcost}. This profile also will not fallback to tablet servers when there are + * currently no scan servers, it will wait for scan servers to become available. * </p> * * @since 2.1.0 @@ -225,10 +250,13 @@ public class ConfigurableScanServerSelector implements ScanServerSelector { int busyTimeoutMultiplier; String maxBusyTimeout; String group = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME; + String timeToWaitForScanServers = "0s"; transient boolean parsed = false; transient long parsedMaxBusyTimeout; + transient Duration parsedTimeToWaitForScanServers; + int getNumServers(int attempt, int totalServers) { int index = Math.min(attempt, attemptPlans.size() - 1); return attemptPlans.get(index).getNumServers(totalServers); @@ -239,6 +267,8 @@ public class ConfigurableScanServerSelector implements ScanServerSelector { return; } parsedMaxBusyTimeout = ConfigurationTypeHelper.getTimeInMillis(maxBusyTimeout); + parsedTimeToWaitForScanServers = + Duration.ofMillis(ConfigurationTypeHelper.getTimeInMillis(timeToWaitForScanServers)); parsed = true; } @@ -259,6 +289,11 @@ public class ConfigurableScanServerSelector implements ScanServerSelector { int index = Math.min(attempts, attemptPlans.size() - 1); return attemptPlans.get(index).salt; } + + Duration getTimeToWaitForScanServers() { + parse(); + return parsedTimeToWaitForScanServers; + } } private void parseProfiles(Map<String,String> options) { @@ -332,7 +367,20 @@ public class ConfigurableScanServerSelector implements ScanServerSelector { List<String> orderedScanServers = orderedScanServersSupplier.get().getOrDefault(profile.group, List.of()); + Duration scanServerWaitTime = profile.getTimeToWaitForScanServers(); + + var finalProfile = profile; + if (orderedScanServers.isEmpty() && !scanServerWaitTime.isZero()) { + // Wait for scan servers in the configured group to be present. + orderedScanServers = params.waitUntil( + () -> Optional.ofNullable(orderedScanServersSupplier.get().get(finalProfile.group)), + scanServerWaitTime, "scan servers in group : " + profile.group).orElseThrow(); + // at this point the list should be non empty unless there is a bug + Preconditions.checkState(!orderedScanServers.isEmpty()); + } + if (orderedScanServers.isEmpty()) { + // there are no scan servers so fall back to the tablet server return new ScanServerSelections() { @Override public String getScanServer(TabletId tabletId) { diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java index 541ff7a858..d74199a278 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java @@ -18,8 +18,11 @@ */ package org.apache.accumulo.core.spi.scan; +import java.time.Duration; import java.util.Collection; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.accumulo.core.client.ScannerBase; @@ -77,7 +80,6 @@ public interface ScanServerSelector { * made using a consistent set of scan servers. */ Supplier<Collection<ScanServerInfo>> getScanServers(); - } /** @@ -105,11 +107,46 @@ public interface ScanServerSelector { * were set, an empty map is returned. */ Map<String,String> getHints(); + + /** + * This function helps a scan server selector wait for an optional to become non-empty (like + * waiting for scan servers to be present) and throws exceptions when waiting is no longer + * possible OR returning false if the max wait time was exceeded. The passed in condition will + * be periodically called and as long as it returns an empty optional the function will continue + * to wait. + * + * @param condition periodically calls this to see if it is non-empty. + * @param maxWaitTime the maximum time to wait for the condition to become non-empty + * @param description a description of what is being waited on, used for error messages and + * logging + * @return The first non-empty optional returned by the condition. An empty optional if the + * maxWaitTime was exceeded without the condition ever returning a non-empty optional. + * + * @throws org.apache.accumulo.core.client.TableDeletedException if the table is deleted while + * waiting for the condition to become non-empty. Do not catch this exception, let it + * escape. + * @throws org.apache.accumulo.core.client.TimedOutException if the timeout specified by + * {@link ScannerBase#setTimeout(long, TimeUnit)} is exceeded while waiting. Do not + * catch this exception, let it escape. + * + * @since 4.0.0 + */ + public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description); } /** + * <p> * Uses the {@link SelectorParameters} to determine which, if any, ScanServer should be used for * scanning a tablet. + * </p> + * + * <p> + * In the case where there are zero scan servers available and an implementation does not want to + * fall back to tablet servers, its ok to wait and poll for scan servers. When waiting its best to + * use {@link SelectorParameters#waitUntil(Supplier, Duration, String)} as this allows Accumulo to + * know about the wait and cancel it via exceptions when it no longer makes sense to wait. + * </p> * * @param params parameters for the calculation * @return results diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java index 432667affb..25ed62e89a 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.spi.scan; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -29,7 +30,9 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -39,6 +42,7 @@ import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -47,7 +51,7 @@ public class ConfigurableScanServerSelectorTest { static class InitParams implements ScanServerSelector.InitParameters { private final Map<String,String> opts; - private final Map<String,String> scanServers; + private final Supplier<Map<String,String>> scanServers; InitParams(Set<String> scanServers) { this(scanServers, Map.of()); @@ -55,12 +59,18 @@ public class ConfigurableScanServerSelectorTest { InitParams(Set<String> scanServers, Map<String,String> opts) { this.opts = opts; - this.scanServers = new HashMap<>(); + var scanServersMap = new HashMap<String,String>(); scanServers.forEach( - sserv -> this.scanServers.put(sserv, ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)); + sserv -> scanServersMap.put(sserv, ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)); + this.scanServers = () -> scanServersMap; } InitParams(Map<String,String> scanServers, Map<String,String> opts) { + this.opts = opts; + this.scanServers = () -> scanServers; + } + + InitParams(Supplier<Map<String,String>> scanServers, Map<String,String> opts) { this.opts = opts; this.scanServers = scanServers; } @@ -77,7 +87,7 @@ public class ConfigurableScanServerSelectorTest { @Override public Supplier<Collection<ScanServerInfo>> getScanServers() { - return () -> scanServers.entrySet().stream().map(entry -> new ScanServerInfo() { + return () -> scanServers.get().entrySet().stream().map(entry -> new ScanServerInfo() { @Override public String getAddress() { @@ -126,6 +136,13 @@ public class ConfigurableScanServerSelectorTest { public Map<String,String> getHints() { return hints; } + + @Override + public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description) { + throw new UnsupportedOperationException(); + } + } static class TestScanServerAttempt implements ScanServerAttempt { @@ -467,4 +484,49 @@ public class ConfigurableScanServerSelectorTest { assertEquals(Set.of("ss1:1", "ss2:2", "ss3:3"), servers); } + + @Test + public void testWaitForScanServers() { + // this test ensures that when there are no scan servers that the ConfigurableScanServerSelector + // will wait for scan servers + + String defaultProfile = + "{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s'," + + "'attemptPlans':[{'servers':'100%', 'busyTimeout':'60s'}]}"; + + var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', '"')); + + ConfigurableScanServerSelector selector = new ConfigurableScanServerSelector(); + + AtomicReference<Map<String,String>> scanServers = new AtomicReference<>(Map.of()); + + selector.init(new InitParams(scanServers::get, opts)); + + var tabletId = nti("1", "m"); + + var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME; + + var params = new SelectorParams(tabletId, Map.of(), Map.of()) { + @Override + public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description) { + // make some scan servers available now that wait was called + scanServers.set(Map.of("ss1:1", dg, "ss2:2", dg, "ss3:3", dg)); + + Optional<T> optional = condition.get(); + + while (optional.isEmpty()) { + UtilWaitThread.sleep(10); + optional = condition.get(); + } + + return optional; + } + }; + + ScanServerSelections actions = selector.selectServers(params); + + assertTrue(Set.of("ss1:1", "ss2:2", "ss3:3").contains(actions.getScanServer(tabletId))); + assertFalse(scanServers.get().isEmpty()); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java index 02b673db8a..f8dde71bbf 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java @@ -25,16 +25,20 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; -import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelector; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -123,16 +127,54 @@ public class ScanServerIT_NoServers extends SharedMiniClusterBase { } @Test - public void testScanOfflineTable() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + public void testScanWithNoTserverFallback() throws Exception { + + var clientProps = new Properties(); + clientProps.putAll(getClientProps()); + String scanServerSelectorProfiles = "[{'isDefault':true,'maxBusyTimeout':'5m'," + + "'busyTimeoutMultiplier':8, 'scanTypeActivations':[], 'timeToWaitForScanServers':'120s'," + + "'attemptPlans':[{'servers':'3', 'busyTimeout':'1s'}]}]"; + clientProps.put("scan.server.selector.impl", ConfigurableScanServerSelector.class.getName()); + clientProps.put("scan.server.selector.opts.profiles", + scanServerSelectorProfiles.replace("'", "\"")); + + try (AccumuloClient client = Accumulo.newClient().from(clientProps).build()) { String tableName = getUniqueNames(1)[0]; createTableAndIngest(client, tableName, null, 10, 10, "colf"); - client.tableOperations().offline(tableName, true); - assertThrows(TableOfflineException.class, () -> { + assertThrows(TimedOutException.class, () -> { try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { scanner.setRange(new Range()); + scanner.setTimeout(1, TimeUnit.SECONDS); + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(100, Iterables.size(scanner)); + } // when the scanner is closed, all open sessions should be closed + }); + } + } + + @Test + public void testBatchScanWithNoTserverFallback() throws Exception { + + var clientProps = new Properties(); + clientProps.putAll(getClientProps()); + String scanServerSelectorProfiles = "[{'isDefault':true,'maxBusyTimeout':'5m'," + + "'busyTimeoutMultiplier':8, 'scanTypeActivations':[], 'timeToWaitForScanServers':'120s'," + + "'attemptPlans':[{'servers':'3', 'busyTimeout':'1s'}]}]"; + clientProps.put("scan.server.selector.impl", ConfigurableScanServerSelector.class.getName()); + clientProps.put("scan.server.selector.opts.profiles", + scanServerSelectorProfiles.replace("'", "\"")); + + try (AccumuloClient client = Accumulo.newClient().from(clientProps).build()) { + String tableName = getUniqueNames(1)[0]; + + createTableAndIngest(client, tableName, null, 10, 10, "colf"); + + assertThrows(TimedOutException.class, () -> { + try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) { + scanner.setRanges(List.of(new Range())); + scanner.setTimeout(1, TimeUnit.SECONDS); scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); assertEquals(100, Iterables.size(scanner)); } // when the scanner is closed, all open sessions should be closed