This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2cc38dff5e Back ported Timeout value to 2.1 (#4671)
2cc38dff5e is described below

commit 2cc38dff5e26f66dd7745c67ec7dadb44d7cc786
Author: Arbaaz Khan <bazzy...@yahoo.com>
AuthorDate: Tue Jun 18 16:28:37 2024 -0400

    Back ported Timeout value to 2.1 (#4671)
    
    * back ported Timeout value to 2.1
    * added changes
---
 .../TabletServerBatchReaderIterator.java           | 19 +++++-
 .../accumulo/core/clientImpl/ThriftScanner.java    | 61 +++++++++++++++++--
 .../spi/scan/ConfigurableScanServerSelector.java   | 70 ++++++++++++++++++----
 .../accumulo/core/spi/scan/ScanServerSelector.java | 39 +++++++++++-
 .../scan/ConfigurableScanServerSelectorTest.java   | 70 ++++++++++++++++++++--
 .../accumulo/test/ScanServerIT_NoServers.java      | 52 ++++++++++++++--
 6 files changed, 283 insertions(+), 28 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 8f30ded1c6..322fd0f098 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -36,11 +36,13 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -502,7 +504,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
   private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
       final ResultReceiver receiver, List<Column> columns) {
-
+    long startTime = System.currentTimeMillis();
     int maxTabletsPerRequest = Integer.MAX_VALUE;
 
     long busyTimeout = 0;
@@ -510,7 +512,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
     Map<String,ScanServerAttemptReporter> reporters = Map.of();
 
     if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
-      var scanServerData = rebinToScanServers(binnedRanges);
+      var scanServerData = rebinToScanServers(binnedRanges, startTime);
       busyTimeout = scanServerData.actions.getBusyTimeout().toMillis();
       reporters = scanServerData.reporters;
       scanServerSelectorDelay = scanServerData.actions.getDelay();
@@ -603,7 +605,8 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
     Map<String,ScanServerAttemptReporter> reporters;
   }
 
-  private ScanServerData 
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
+  private ScanServerData 
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
+      long startTime) {
     ScanServerSelector ecsm = context.getScanServerSelector();
 
     List<TabletIdImpl> tabletIds =
@@ -613,6 +616,9 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
     // get a snapshot of this once,not each time the plugin request it
     var scanAttemptsSnapshot = scanAttempts.snapshot();
 
+    Duration timeoutLeft = Duration.ofMillis(retryTimeout)
+        .minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
     ScanServerSelector.SelectorParameters params = new 
ScanServerSelector.SelectorParameters() {
       @Override
       public Collection<TabletId> getTablets() {
@@ -628,6 +634,13 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
       public Map<String,String> getHints() {
         return options.executionHints;
       }
+
+      @Override
+      public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, 
Duration maxWaitTime,
+          String description) {
+        return ThriftScanner.waitUntil(condition, maxWaitTime, description, 
timeoutLeft, context,
+            tableId, log);
+      }
     };
 
     var actions = ecsm.selectServers(params);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 604aece29d..c76277f79b 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -30,9 +30,12 @@ import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
@@ -40,6 +43,7 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.SampleNotPresentException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation;
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
@@ -75,6 +79,7 @@ import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.Retry;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
@@ -262,6 +267,44 @@ public class ThriftScanner {
     }
   }
 
+  static <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration 
maxWaitTime,
+      String description, Duration timeoutLeft, ClientContext context, TableId 
tableId,
+      Logger log) {
+    Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
TimeUnit.MILLISECONDS)
+        .incrementBy(100, TimeUnit.MILLISECONDS).maxWait(1, 
SECONDS).backOffFactor(1.5)
+        .logInterval(3, TimeUnit.MINUTES).createRetry();
+
+    long startTime = System.nanoTime();
+    Optional<T> optional = condition.get();
+    while (optional.isEmpty()) {
+      log.trace("For tableId {} scan server selector is waiting for '{}'", 
tableId, description);
+
+      var elapsedTime = Duration.ofNanos(System.nanoTime() - startTime);
+
+      if (elapsedTime.compareTo(timeoutLeft) > 0) {
+        throw new TimedOutException("While waiting for '" + description
+            + "' in order to select a scan server, the scan timed out. ");
+      }
+
+      if (elapsedTime.compareTo(maxWaitTime) > 0) {
+        return Optional.empty();
+      }
+
+      context.requireNotDeleted(tableId);
+
+      try {
+        retry.waitForNextAttempt(log, String.format(
+            "For tableId %s scan server selector is waiting for '%s'", 
tableId, description));
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+
+      optional = condition.get();
+    }
+
+    return optional;
+  }
+
   public static class ScanTimedOutException extends IOException {
 
     private static final long serialVersionUID = 1L;
@@ -370,7 +413,7 @@ public class ThriftScanner {
         Span child2 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::location",
             Map.of("tserver", loc.tablet_location));
         try (Scope scanLocation = child2.makeCurrent()) {
-          results = scan(loc, scanState, context);
+          results = scan(loc, scanState, context, timeOut, startTime);
         } catch (AccumuloSecurityException e) {
           context.clearTableListCache();
           context.requireNotDeleted(scanState.tableId);
@@ -510,9 +553,9 @@ public class ThriftScanner {
     }
   }
 
-  private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, 
ClientContext context)
-      throws AccumuloSecurityException, NotServingTabletException, TException,
-      NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException 
{
+  private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, 
ClientContext context,
+      long timeOut, long startTime) throws AccumuloSecurityException, 
NotServingTabletException,
+      TException, NoSuchScanIDException, TooManyFilesException, 
TSampleNotPresentException {
     if (scanState.finished) {
       return null;
     }
@@ -536,6 +579,9 @@ public class ThriftScanner {
         // obtain a snapshot once and only expose this snapshot to the plugin 
for consistency
         var attempts = scanState.scanAttempts.snapshot();
 
+        Duration timeoutLeft = Duration.ofSeconds(timeOut)
+            .minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
         var params = new ScanServerSelector.SelectorParameters() {
 
           @Override
@@ -555,6 +601,13 @@ public class ThriftScanner {
             }
             return scanState.executionHints;
           }
+
+          @Override
+          public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, 
Duration maxWaitTime,
+              String description) {
+            return ThriftScanner.waitUntil(condition, maxWaitTime, 
description, timeoutLeft,
+                context, loc.tablet_extent.tableId(), log);
+          }
         };
 
         ScanServerSelections actions = 
context.getScanServerSelector().selectServers(params);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
index 7dfa7033db..e0b9ddbf3d 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -80,7 +81,28 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  * specified, the set of scan servers that did not specify a group will be 
used. Grouping scan
  * servers supports at least two use cases. First groups can be used to 
dedicate resources for
  * certain scans. Second groups can be used to have different hardware/VM 
types for scans, for
- * example could have some scans use expensive high memory VMs and others use 
cheaper burstable VMs.
+ * example could have some scans use expensive high memory VMs and others use 
cheaper burstable
+ * VMs.</li>
+ * <li><b>timeToWaitForScanServers : </b> When there are no scans servers, 
this setting determines
+ * how long to wait for scan servers to become available before falling back 
to tablet servers.
+ * Falling back to tablet servers may cause tablets to be loaded that are not 
currently loaded. When
+ * this setting is given a wait time and there are no scan servers, it will 
wait for scan servers to
+ * be available. This setting avoids loading tablets on tablet servers when 
scans servers are
+ * temporarily unavailable which could be caused by normal cluster activity. 
You can specify the
+ * wait time using different units to precisely control the wait duration. The 
supported units are:
+ * <ul>
+ * <li>"d" for days</li>
+ * <li>"h" for hours</li>
+ * <li>"m" for minutes</li>
+ * <li>"s" for seconds</li>
+ * <li>"ms" for milliseconds</li>
+ * </ul>
+ * If duration is not specified this setting defaults to 0s, and will disable 
the wait for scan
+ * servers and will fall back to tablet servers immediately. When set to a 
large value, the selector
+ * will effectively wait for scan servers to become available before falling 
back to tablet servers.
+ * To ensure the selector never falls back scanning tablet servers an 
unrealistic wait time can be
+ * set. For instanced should be sufficient. Setting Waiting for scan servers 
is done via
+ * {@link 
org.apache.accumulo.core.spi.scan.ScanServerSelector.SelectorParameters#waitUntil(Supplier,
 Duration, String)}</li>
  * <li><b>attemptPlans : </b> A list of configuration to use for each scan 
attempt. Each list object
  * has the following fields:
  * <ul>
@@ -114,6 +136,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  *       "maxBusyTimeout":"20m",
  *       "busyTimeoutMultiplier":8,
  *       "group":"lowcost",
+ *       "timeToWaitForScanServers": "120s",
  *       "attemptPlans":[
  *         {"servers":"1", "busyTimeout":"10s"},
  *         {"servers":"3", "busyTimeout":"30s","salt":"42"},
@@ -133,16 +156,18 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  * </p>
  *
  * <p>
- * For the profile activated by {@code scan_type=slow} it start off by 
choosing randomly from 1 scan
- * server based on a hash of the tablet with no salt and a busy timeout of 
10s. The second attempt
- * will choose from 3 scan servers based on a hash of the tablet plus the salt 
{@literal 42}.
- * Without the salt, the single scan servers from the first attempt would 
always be included in the
- * set of 3. With the salt the single scan server from the first attempt may 
not be included. The
- * third attempt will choose a scan server from 9 using the salt {@literal 84} 
and a busy timeout of
- * 60s. The different salt means the set of servers that attempts 2 and 3 
choose from may be
- * disjoint. Attempt 4 and greater will continue to choose from the same 9 
servers as attempt 3 and
- * will keep increasing the busy timeout by multiplying 8 until the maximum of 
20 minutes is
- * reached. For this profile it will choose from scan servers in the group 
{@literal lowcost}.
+ * For the profile activated by {@code scan_type=slow} it starts off by 
choosing randomly from 1
+ * scan server based on a hash of the tablet with no salt and a busy timeout 
of 10s. The second
+ * attempt will choose from 3 scan servers based on a hash of the tablet plus 
the salt
+ * {@literal 42}. Without the salt, the single scan servers from the first 
attempt would always be
+ * included in the set of 3. With the salt the single scan server from the 
first attempt may not be
+ * included. The third attempt will choose a scan server from 9 using the salt 
{@literal 84} and a
+ * busy timeout of 60s. The different salt means the set of servers that 
attempts 2 and 3 choose
+ * from may be disjoint. Attempt 4 and greater will continue to choose from 
the same 9 servers as
+ * attempt 3 and will keep increasing the busy timeout by multiplying 8 until 
the maximum of 20
+ * minutes is reached. For this profile it will choose from scan servers in 
the group
+ * {@literal lowcost}. This profile also will not fallback to tablet servers 
when there are
+ * currently no scan servers, it will wait for scan servers to become 
available.
  * </p>
  *
  * @since 2.1.0
@@ -225,10 +250,13 @@ public class ConfigurableScanServerSelector implements 
ScanServerSelector {
     int busyTimeoutMultiplier;
     String maxBusyTimeout;
     String group = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+    String timeToWaitForScanServers = "0s";
 
     transient boolean parsed = false;
     transient long parsedMaxBusyTimeout;
 
+    transient Duration parsedTimeToWaitForScanServers;
+
     int getNumServers(int attempt, int totalServers) {
       int index = Math.min(attempt, attemptPlans.size() - 1);
       return attemptPlans.get(index).getNumServers(totalServers);
@@ -239,6 +267,8 @@ public class ConfigurableScanServerSelector implements 
ScanServerSelector {
         return;
       }
       parsedMaxBusyTimeout = 
ConfigurationTypeHelper.getTimeInMillis(maxBusyTimeout);
+      parsedTimeToWaitForScanServers =
+          
Duration.ofMillis(ConfigurationTypeHelper.getTimeInMillis(timeToWaitForScanServers));
       parsed = true;
     }
 
@@ -259,6 +289,11 @@ public class ConfigurableScanServerSelector implements 
ScanServerSelector {
       int index = Math.min(attempts, attemptPlans.size() - 1);
       return attemptPlans.get(index).salt;
     }
+
+    Duration getTimeToWaitForScanServers() {
+      parse();
+      return parsedTimeToWaitForScanServers;
+    }
   }
 
   private void parseProfiles(Map<String,String> options) {
@@ -332,7 +367,20 @@ public class ConfigurableScanServerSelector implements 
ScanServerSelector {
     List<String> orderedScanServers =
         orderedScanServersSupplier.get().getOrDefault(profile.group, 
List.of());
 
+    Duration scanServerWaitTime = profile.getTimeToWaitForScanServers();
+
+    var finalProfile = profile;
+    if (orderedScanServers.isEmpty() && !scanServerWaitTime.isZero()) {
+      // Wait for scan servers in the configured group to be present.
+      orderedScanServers = params.waitUntil(
+          () -> 
Optional.ofNullable(orderedScanServersSupplier.get().get(finalProfile.group)),
+          scanServerWaitTime, "scan servers in group : " + 
profile.group).orElseThrow();
+      // at this point the list should be non empty unless there is a bug
+      Preconditions.checkState(!orderedScanServers.isEmpty());
+    }
+
     if (orderedScanServers.isEmpty()) {
+      // there are no scan servers so fall back to the tablet server
       return new ScanServerSelections() {
         @Override
         public String getScanServer(TabletId tabletId) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java
index 541ff7a858..d74199a278 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java
@@ -18,8 +18,11 @@
  */
 package org.apache.accumulo.core.spi.scan;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import org.apache.accumulo.core.client.ScannerBase;
@@ -77,7 +80,6 @@ public interface ScanServerSelector {
      *         made using a consistent set of scan servers.
      */
     Supplier<Collection<ScanServerInfo>> getScanServers();
-
   }
 
   /**
@@ -105,11 +107,46 @@ public interface ScanServerSelector {
      *         were set, an empty map is returned.
      */
     Map<String,String> getHints();
+
+    /**
+     * This function helps a scan server selector wait for an optional to 
become non-empty (like
+     * waiting for scan servers to be present) and throws exceptions when 
waiting is no longer
+     * possible OR returning false if the max wait time was exceeded. The 
passed in condition will
+     * be periodically called and as long as it returns an empty optional the 
function will continue
+     * to wait.
+     *
+     * @param condition periodically calls this to see if it is non-empty.
+     * @param maxWaitTime the maximum time to wait for the condition to become 
non-empty
+     * @param description a description of what is being waited on, used for 
error messages and
+     *        logging
+     * @return The first non-empty optional returned by the condition. An 
empty optional if the
+     *         maxWaitTime was exceeded without the condition ever returning a 
non-empty optional.
+     *
+     * @throws org.apache.accumulo.core.client.TableDeletedException if the 
table is deleted while
+     *         waiting for the condition to become non-empty. Do not catch 
this exception, let it
+     *         escape.
+     * @throws org.apache.accumulo.core.client.TimedOutException if the 
timeout specified by
+     *         {@link ScannerBase#setTimeout(long, TimeUnit)} is exceeded 
while waiting. Do not
+     *         catch this exception, let it escape.
+     *
+     * @since 4.0.0
+     */
+    public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration 
maxWaitTime,
+        String description);
   }
 
   /**
+   * <p>
    * Uses the {@link SelectorParameters} to determine which, if any, 
ScanServer should be used for
    * scanning a tablet.
+   * </p>
+   *
+   * <p>
+   * In the case where there are zero scan servers available and an 
implementation does not want to
+   * fall back to tablet servers, its ok to wait and poll for scan servers. 
When waiting its best to
+   * use {@link SelectorParameters#waitUntil(Supplier, Duration, String)} as 
this allows Accumulo to
+   * know about the wait and cancel it via exceptions when it no longer makes 
sense to wait.
+   * </p>
    *
    * @param params parameters for the calculation
    * @return results
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index 432667affb..25ed62e89a 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.core.spi.scan;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -29,7 +30,9 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -39,6 +42,7 @@ import org.apache.accumulo.core.data.TabletId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
 
@@ -47,7 +51,7 @@ public class ConfigurableScanServerSelectorTest {
   static class InitParams implements ScanServerSelector.InitParameters {
 
     private final Map<String,String> opts;
-    private final Map<String,String> scanServers;
+    private final Supplier<Map<String,String>> scanServers;
 
     InitParams(Set<String> scanServers) {
       this(scanServers, Map.of());
@@ -55,12 +59,18 @@ public class ConfigurableScanServerSelectorTest {
 
     InitParams(Set<String> scanServers, Map<String,String> opts) {
       this.opts = opts;
-      this.scanServers = new HashMap<>();
+      var scanServersMap = new HashMap<String,String>();
       scanServers.forEach(
-          sserv -> this.scanServers.put(sserv, 
ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME));
+          sserv -> scanServersMap.put(sserv, 
ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME));
+      this.scanServers = () -> scanServersMap;
     }
 
     InitParams(Map<String,String> scanServers, Map<String,String> opts) {
+      this.opts = opts;
+      this.scanServers = () -> scanServers;
+    }
+
+    InitParams(Supplier<Map<String,String>> scanServers, Map<String,String> 
opts) {
       this.opts = opts;
       this.scanServers = scanServers;
     }
@@ -77,7 +87,7 @@ public class ConfigurableScanServerSelectorTest {
 
     @Override
     public Supplier<Collection<ScanServerInfo>> getScanServers() {
-      return () -> scanServers.entrySet().stream().map(entry -> new 
ScanServerInfo() {
+      return () -> scanServers.get().entrySet().stream().map(entry -> new 
ScanServerInfo() {
 
         @Override
         public String getAddress() {
@@ -126,6 +136,13 @@ public class ConfigurableScanServerSelectorTest {
     public Map<String,String> getHints() {
       return hints;
     }
+
+    @Override
+    public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration 
maxWaitTime,
+        String description) {
+      throw new UnsupportedOperationException();
+    }
+
   }
 
   static class TestScanServerAttempt implements ScanServerAttempt {
@@ -467,4 +484,49 @@ public class ConfigurableScanServerSelectorTest {
 
     assertEquals(Set.of("ss1:1", "ss2:2", "ss3:3"), servers);
   }
+
+  @Test
+  public void testWaitForScanServers() {
+    // this test ensures that when there are no scan servers that the 
ConfigurableScanServerSelector
+    // will wait for scan servers
+
+    String defaultProfile =
+        
"{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s',"
+            + "'attemptPlans':[{'servers':'100%', 'busyTimeout':'60s'}]}";
+
+    var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', 
'"'));
+
+    ConfigurableScanServerSelector selector = new 
ConfigurableScanServerSelector();
+
+    AtomicReference<Map<String,String>> scanServers = new 
AtomicReference<>(Map.of());
+
+    selector.init(new InitParams(scanServers::get, opts));
+
+    var tabletId = nti("1", "m");
+
+    var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+
+    var params = new SelectorParams(tabletId, Map.of(), Map.of()) {
+      @Override
+      public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, 
Duration maxWaitTime,
+          String description) {
+        // make some scan servers available now that wait was called
+        scanServers.set(Map.of("ss1:1", dg, "ss2:2", dg, "ss3:3", dg));
+
+        Optional<T> optional = condition.get();
+
+        while (optional.isEmpty()) {
+          UtilWaitThread.sleep(10);
+          optional = condition.get();
+        }
+
+        return optional;
+      }
+    };
+
+    ScanServerSelections actions = selector.selectServers(params);
+
+    assertTrue(Set.of("ss1:1", "ss2:2", 
"ss3:3").contains(actions.getScanServer(tabletId)));
+    assertFalse(scanServers.get().isEmpty());
+  }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java 
b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
index 02b673db8a..f8dde71bbf 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
@@ -25,16 +25,20 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
-import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelector;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -123,16 +127,54 @@ public class ScanServerIT_NoServers extends 
SharedMiniClusterBase {
   }
 
   @Test
-  public void testScanOfflineTable() throws Exception {
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+  public void testScanWithNoTserverFallback() throws Exception {
+
+    var clientProps = new Properties();
+    clientProps.putAll(getClientProps());
+    String scanServerSelectorProfiles = 
"[{'isDefault':true,'maxBusyTimeout':'5m',"
+        + "'busyTimeoutMultiplier':8, 'scanTypeActivations':[], 
'timeToWaitForScanServers':'120s',"
+        + "'attemptPlans':[{'servers':'3', 'busyTimeout':'1s'}]}]";
+    clientProps.put("scan.server.selector.impl", 
ConfigurableScanServerSelector.class.getName());
+    clientProps.put("scan.server.selector.opts.profiles",
+        scanServerSelectorProfiles.replace("'", "\""));
+
+    try (AccumuloClient client = 
Accumulo.newClient().from(clientProps).build()) {
       String tableName = getUniqueNames(1)[0];
 
       createTableAndIngest(client, tableName, null, 10, 10, "colf");
-      client.tableOperations().offline(tableName, true);
 
-      assertThrows(TableOfflineException.class, () -> {
+      assertThrows(TimedOutException.class, () -> {
         try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
           scanner.setRange(new Range());
+          scanner.setTimeout(1, TimeUnit.SECONDS);
+          scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+          assertEquals(100, Iterables.size(scanner));
+        } // when the scanner is closed, all open sessions should be closed
+      });
+    }
+  }
+
+  @Test
+  public void testBatchScanWithNoTserverFallback() throws Exception {
+
+    var clientProps = new Properties();
+    clientProps.putAll(getClientProps());
+    String scanServerSelectorProfiles = 
"[{'isDefault':true,'maxBusyTimeout':'5m',"
+        + "'busyTimeoutMultiplier':8, 'scanTypeActivations':[], 
'timeToWaitForScanServers':'120s',"
+        + "'attemptPlans':[{'servers':'3', 'busyTimeout':'1s'}]}]";
+    clientProps.put("scan.server.selector.impl", 
ConfigurableScanServerSelector.class.getName());
+    clientProps.put("scan.server.selector.opts.profiles",
+        scanServerSelectorProfiles.replace("'", "\""));
+
+    try (AccumuloClient client = 
Accumulo.newClient().from(clientProps).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      createTableAndIngest(client, tableName, null, 10, 10, "colf");
+
+      assertThrows(TimedOutException.class, () -> {
+        try (BatchScanner scanner = client.createBatchScanner(tableName, 
Authorizations.EMPTY)) {
+          scanner.setRanges(List.of(new Range()));
+          scanner.setTimeout(1, TimeUnit.SECONDS);
           scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
           assertEquals(100, Iterables.size(scanner));
         } // when the scanner is closed, all open sessions should be closed

Reply via email to