This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 5f1628245eeafe99ed3c7a19a0b72a739e80fb7b
Merge: da4ff8c98d c4b2cf3fe9
Author: Keith Turner <[email protected]>
AuthorDate: Thu Apr 16 14:46:18 2026 +0000

    Merge branch '2.1'

 .../apache/accumulo/core/client/ScannerBase.java   |   4 +-
 .../accumulo/core/clientImpl/ScannerIterator.java  |   4 +-
 .../TabletServerBatchReaderIterator.java           |  82 ++++++---
 .../accumulo/core/clientImpl/ThriftScanner.java    |   5 +-
 .../apache/accumulo/tserver/scan/LookupTask.java   |   6 +-
 .../test/functional/ErrorThrowingIterator.java     |  58 ++++--
 .../apache/accumulo/test/functional/ScannerIT.java | 202 +++++++++++++++++++++
 .../apache/accumulo/test/functional/TimeoutIT.java |  36 +++-
 8 files changed, 350 insertions(+), 47 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 372d119f05,53a44dc0bf..267ca5b2db
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@@ -799,43 -733,61 +800,60 @@@ public final class TabletServerBatchRea
  
    private static class TimeoutTracker {
  
 -    final String server;
 -    final Set<String> badServers;
 +    String server;
 +    Set<String> badServers;
      final long timeOut;
-     CountDownTimer timeoutCountDownTimer;
+ 
+     // When failures happen, rpc task to scan a server may be requeued in a 
thread pool. These two
+     // variables track failures across task running in those thread pools.
 -    Long firstErrorTime = null;
 -    Long firstAllFailureTime = null;
 +    CountDownTimer errorTimer;
++    CountDownTimer failureTimer;
  
      TimeoutTracker(String server, Set<String> badServers, long timeOut) {
 -      this.timeOut = timeOut;
 +      this(timeOut);
-       this.server = server;
+       this.server = Objects.requireNonNull(server);
        this.badServers = badServers;
      }
  
      TimeoutTracker(long timeOut) {
        this.timeOut = timeOut;
-       this.timeoutCountDownTimer = CountDownTimer.startNew(timeOut, 
MILLISECONDS);
 -      this.badServers = null;
 -      this.server = null;
      }
  
-     void startingScan() {
-       timeoutCountDownTimer.restart();
-     }
+     class Session {
 -      long activityTime;
++      final CountDownTimer timeoutCountDownTimer;
 +
-     void check() throws IOException {
-       if (timeoutCountDownTimer.isExpired()) {
-         badServers.add(server);
-         throw new IOException("Time exceeded " + timeOut + " ms for server " 
+ server);
++      Session() {
++        timeoutCountDownTimer = CountDownTimer.startNew(timeOut, 
MILLISECONDS);
++      }
+ 
+       void check() throws IOException {
 -        if (System.currentTimeMillis() - activityTime > timeOut) {
++        if (timeoutCountDownTimer.isExpired()) {
+           badServers.add(server);
 -          throw new IOException(
 -              "Time exceeded " + (System.currentTimeMillis() - activityTime) 
+ " " + server);
++          throw new IOException("Time exceeded " + timeOut + " ms for server 
" + server);
+         }
+       }
+ 
+       void madeProgress() {
 -        activityTime = System.currentTimeMillis();
++        timeoutCountDownTimer.restart();
+         synchronized (TimeoutTracker.this) {
 -          firstErrorTime = null;
 -          firstAllFailureTime = null;
++          errorTimer = null;
++          failureTimer = null;
+         }
        }
      }
  
-     void madeProgress() {
-       timeoutCountDownTimer.restart();
-       errorTimer = null;
+     /**
 -     * Multiple threads can scan different exents on the same server at the 
same time. The session
++     * Multiple threads can scan different extents on the same server at the 
same time. The session
+      * allows each potential rpc thread to have its own activityTime.
+      */
 -    Session startingScan() throws IOException {
 -      var session = new Session();
 -      session.activityTime = System.currentTimeMillis();
 -      return session;
++    Session startingScan() {
++      return new Session();
      }
  
-     void errorOccured() {
+     synchronized void errorOccured(Session session) {
 -      if (firstErrorTime == null) {
 -        firstErrorTime = session.activityTime;
 -      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
 +      if (errorTimer == null) {
-         errorTimer = CountDownTimer.startNew(timeOut, MILLISECONDS);
++        errorTimer = 
CountDownTimer.startNew(session.timeoutCountDownTimer.timeLeft());
 +      } else if (errorTimer.isExpired()) {
          badServers.add(server);
        }
      }
@@@ -843,8 -795,27 +861,17 @@@
      public long getTimeOut() {
        return timeOut;
      }
+ 
+     synchronized void sawOnlyFailures(Session session) throws IOException {
 -      if (firstAllFailureTime == null) {
 -        firstAllFailureTime = session.activityTime;
 -      } else if (System.currentTimeMillis() - firstAllFailureTime > timeOut) {
++      if (failureTimer == null) {
++        failureTimer = 
CountDownTimer.startNew(session.timeoutCountDownTimer.timeLeft());
++      } else if (failureTimer.isExpired()) {
+         badServers.add(server);
 -        throw new IOException(
 -            "Time exceeded " + (System.currentTimeMillis() - 
firstAllFailureTime) + " " + server);
++        throw new IOException("Time exceeded " + timeOut + " ms for server " 
+ server);
+       }
+     }
    }
  
 -  public static void doLookup(ClientContext context, String server,
 -      Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> 
failures,
 -      Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, 
List<Column> columns,
 -      ScannerOptions options, Authorizations authorizations)
 -      throws IOException, AccumuloSecurityException, AccumuloServerException {
 -    doLookup(context, server, requested, failures, unscanned, receiver, 
columns, options,
 -        authorizations, new TimeoutTracker(Long.MAX_VALUE), 0L);
 -  }
 -
    static void doLookup(ClientContext context, String server, 
Map<KeyExtent,List<Range>> requested,
        Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> 
unscanned,
        ResultReceiver receiver, List<Column> columns, ScannerOptions options,
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index b3f660ac2f,e418642913..b7fe95bbd4
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@@ -376,262 -321,13 +376,261 @@@ public final class ThriftScanner 
        Thread.sleep(millis);
      }
      // wait 2 * last time, with +-10% random jitter
 -    return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble() 
/ 5));
 +    return (long) (Math.min(millis * 2, maxSleep) * (.9 + 
RANDOM.get().nextDouble() / 5));
 +  }
 +
 +  private static Optional<ScanAddress> getScanServerAddress(ClientContext 
context,
 +      ScanState scanState, CachedTablet loc, Duration timeOut, Timer 
scanTimer) {
 +    Preconditions.checkArgument(scanState.runOnScanServer);
 +
 +    ScanAddress addr = null;
 +
 +    if (scanState.scanID != null && scanState.prevLoc != null
 +        && scanState.prevLoc.serverType == ServerType.SSERVER
 +        && scanState.prevLoc.getExtent().equals(loc.getExtent())) {
 +      // this is the case of continuing a scan on a scan server for the same 
tablet, so lets not
 +      // call the scan server selector and just go back to the previous scan 
server
 +      addr = scanState.prevLoc;
 +      log.trace(
 +          "For tablet {} continuing scan {} on scan server {} without 
consulting scan server selector, using busyTimeout {}",
 +          loc.getExtent(), scanState.scanID, addr.serverAddress, 
scanState.busyTimeout);
 +    } else {
 +      var tabletId = new TabletIdImpl(loc.getExtent());
 +      // obtain a snapshot once and only expose this snapshot to the plugin 
for consistency
 +      var attempts = scanState.scanAttempts.snapshot();
 +
 +      Duration timeoutLeft = timeOut.minus(scanTimer.elapsed());
 +
 +      var params = new ScanServerSelector.SelectorParameters() {
 +
 +        @Override
 +        public List<TabletId> getTablets() {
 +          return List.of(tabletId);
 +        }
 +
 +        @Override
 +        public Collection<? extends ScanServerAttempt> getAttempts(TabletId 
tabletId) {
 +          return attempts.getOrDefault(tabletId, Set.of());
 +        }
 +
 +        @Override
 +        public Map<String,String> getHints() {
 +          if (scanState.executionHints == null) {
 +            return Map.of();
 +          }
 +          return scanState.executionHints;
 +        }
 +
 +        @Override
 +        public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, 
Duration maxWaitTime,
 +            String description) {
 +          return ThriftScanner.waitUntil(condition, maxWaitTime, description, 
timeoutLeft, context,
 +              loc.getExtent().tableId(), log);
 +        }
 +      };
 +
 +      ScanServerSelections actions = 
context.getScanServerSelector().selectServers(params);
 +
 +      Duration delay = null;
 +
 +      String scanServer = actions.getScanServer(tabletId);
 +      if (scanServer != null) {
 +        addr = new ScanAddress(scanServer, ServerType.SSERVER, loc);
 +        delay = actions.getDelay();
 +        scanState.busyTimeout = actions.getBusyTimeout();
 +        log.trace("For tablet {} scan server selector chose scan_server:{} 
delay:{} busyTimeout:{}",
 +            loc.getExtent(), scanServer, delay, scanState.busyTimeout);
 +      } else {
 +        Optional<String> tserverLoc = loc.getTserverLocation();
 +
 +        if (tserverLoc.isPresent()) {
 +          addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), 
ServerType.TSERVER, loc);
 +          delay = actions.getDelay();
 +          scanState.busyTimeout = Duration.ZERO;
 +          log.trace("For tablet {} scan server selector chose tablet_server: 
{}", loc.getExtent(),
 +              addr.serverAddress);
 +        } else {
 +          log.trace(
 +              "For tablet {} scan server selector chose tablet_server, but 
the tablet is not currently hosted",
 +              loc.getExtent());
 +          return Optional.empty();
 +        }
 +      }
 +
 +      if (!delay.isZero()) {
 +        try {
 +          Thread.sleep(delay.toMillis());
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +
 +    return Optional.of(addr);
 +  }
 +
 +  /**
 +   * @see ClientTabletCache#findTablet(ClientContext, Text, boolean, 
ClientTabletCache.LocationNeed,
 +   *      int, Range)
 +   */
 +  private static int computeMinimumHostAhead(ScanState scanState,
 +      ClientTabletCache.LocationNeed hostingNeed) {
 +    int minimumHostAhead = 0;
 +
 +    if (hostingNeed == ClientTabletCache.LocationNeed.REQUIRED) {
 +      long currTime = System.nanoTime();
 +
 +      double timeRatio = 0;
 +
 +      long totalTime = currTime - scanState.startTimeNanos;
 +      if (totalTime > 0) {
 +        // The following computes (total time spent in this method)/(total 
time scanning and time
 +        // spent in this method)
 +        timeRatio = (double) scanState.getNextScanAddressTimeNanos / (double) 
(totalTime);
 +      }
 +
 +      if (timeRatio < 0 || timeRatio > 1) {
 +        log.warn("Computed ratio of time spent in getNextScanAddress has 
unexpected value : {} ",
 +            timeRatio);
 +        timeRatio = Math.max(0.0, Math.min(1.0, timeRatio));
 +      }
 +
 +      //
 +      // Do not want to host all tablets in the scan range in case not all 
data in the range is
 +      // read. Need to determine how many tablets to host ahead of time. The 
following information
 +      // is used to determine how many tablets to host ahead of time.
 +      //
 +      // 1. The number of tablets this scan has already read. The more 
tablets that this scan has
 +      // read, the more likely that it will read many more tablets.
 +      //
 +      // 2. The timeRatio computed above. As timeRatio approaches 1.0 it 
means we are spending
 +      // most of our time waiting on the next tablet to have an address. When 
are spending most of
 +      // our time waiting for a tablet to have an address we want to increase 
the number of tablets
 +      // we request to host ahead of time so we can hopefully spend less time 
waiting on that.
 +      //
 +
 +      if (timeRatio > .9) {
 +        minimumHostAhead = 16;
 +      } else if (timeRatio > .75) {
 +        minimumHostAhead = 8;
 +      } else if (timeRatio > .5) {
 +        minimumHostAhead = 4;
 +      } else if (timeRatio > .25) {
 +        minimumHostAhead = 2;
 +      } else {
 +        minimumHostAhead = 1;
 +      }
 +
 +      minimumHostAhead = Math.min(scanState.tabletsScanned, minimumHostAhead);
 +    }
 +    return minimumHostAhead;
 +  }
 +
 +  static ScanAddress getNextScanAddress(ClientContext context, ScanState 
scanState,
 +      Duration timeOut, Timer scanTimer, long maxSleepTime)
 +      throws TableNotFoundException, AccumuloSecurityException, 
AccumuloServerException,
 +      InterruptedException, ScanTimedOutException, 
InvalidTabletHostingRequestException {
 +
 +    String lastError = null;
 +    String error = null;
 +    long sleepMillis = 100;
 +
 +    ScanAddress addr = null;
 +
 +    var hostingNeed = scanState.runOnScanServer ? 
ClientTabletCache.LocationNeed.NOT_REQUIRED
 +        : ClientTabletCache.LocationNeed.REQUIRED;
 +
 +    int minimumHostAhead = computeMinimumHostAhead(scanState, hostingNeed);
 +
 +    while (addr == null) {
 +      if (scanTimer.hasElapsed(timeOut)) {
 +        throw new ScanTimedOutException("Failed to locate next server to scan 
before timeout");
 +      }
 +
 +      CachedTablet loc = null;
 +
 +      Span child1 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::locateTablet");
 +      try (Scope locateSpan = child1.makeCurrent()) {
 +
 +        loc = 
context.getTabletLocationCache(scanState.tableId).findTablet(context,
 +            scanState.startRow, scanState.skipStartRow, hostingNeed, 
minimumHostAhead,
 +            scanState.range);
 +
 +        if (loc == null) {
 +          context.requireNotDeleted(scanState.tableId);
 +          context.requireNotOffline(scanState.tableId, null);
 +
 +          error = "Failed to locate tablet for table : " + scanState.tableId 
+ " row : "
 +              + scanState.startRow;
 +          if (!error.equals(lastError)) {
 +            log.debug("{}", error);
 +          } else if (log.isTraceEnabled()) {
 +            log.trace("{}", error);
 +          }
 +          lastError = error;
 +          sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
 +        } else {
 +          scanState.incrementTabletsScanned(loc.getExtent());
 +
 +          // when a tablet splits we do want to continue scanning the low 
child
 +          // of the split if we are already passed it
 +          Range dataRange = loc.getExtent().toDataRange();
 +
 +          if (scanState.range.getStartKey() != null
 +              && dataRange.afterEndKey(scanState.range.getStartKey())) {
 +            // go to the next tablet
 +            scanState.startRow = loc.getExtent().endRow();
 +            scanState.skipStartRow = true;
 +            // force another lookup
 +            loc = null;
 +          } else if (scanState.range.getEndKey() != null
 +              && dataRange.beforeStartKey(scanState.range.getEndKey())) {
 +            // should not happen
 +            throw new IllegalStateException("Unexpected tablet, extent : " + 
loc.getExtent()
 +                + "  range : " + scanState.range + " startRow : " + 
scanState.startRow);
 +          }
 +        }
 +      } catch (AccumuloServerException e) {
 +        TraceUtil.setException(child1, e, true);
 +        log.debug("Scan failed, server side exception : {}", e.getMessage());
 +        throw e;
 +      } catch (AccumuloException e) {
 +        error = "exception from tablet loc " + e.getMessage();
 +        if (!error.equals(lastError)) {
 +          log.debug("{}", error);
 +        } else if (log.isTraceEnabled()) {
 +          log.trace("{}", error);
 +        }
 +
 +        TraceUtil.setException(child1, e, false);
 +
 +        lastError = error;
 +        sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
 +      } finally {
 +        child1.end();
 +      }
 +
 +      if (loc != null) {
 +        if (scanState.runOnScanServer) {
 +          addr = getScanServerAddress(context, scanState, loc, timeOut, 
scanTimer).orElse(null);
 +          if (addr == null && loc.getTserverLocation().isEmpty()) {
 +            // wanted to fall back to tserver but tablet was not hosted so 
make another loop
 +            hostingNeed = ClientTabletCache.LocationNeed.REQUIRED;
 +          }
 +        } else {
 +          addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), 
ServerType.TSERVER, loc);
 +        }
 +      }
 +    }
 +
 +    return addr;
    }
  
-   public static List<KeyValue> scan(ClientContext context, ScanState 
scanState, Duration timeOut)
-       throws ScanTimedOutException, AccumuloException, 
AccumuloSecurityException,
+   public static List<KeyValue> scan(ClientContext context, ScanState 
scanState, Duration timeOut,
+       Timer scanTimer) throws ScanTimedOutException, AccumuloException, 
AccumuloSecurityException,
        TableNotFoundException {
 -    TabletLocation loc = null;
 +
-     Timer scanTimer = Timer.startNew();
      String lastError = null;
      String error = null;
      int tooManyFilesCount = 0;
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 44b29b84a3,2a6d5d9287..c0584b139c
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@@ -37,7 -47,10 +47,11 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.Scanner;
  import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+ import org.apache.accumulo.core.client.TimedOutException;
 +import org.apache.accumulo.core.client.admin.servers.ServerId;
+ import org.apache.accumulo.core.clientImpl.ClientContext;
+ import org.apache.accumulo.core.clientImpl.ThriftScanner;
+ import org.apache.accumulo.core.conf.ClientProperty;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.Mutation;
@@@ -45,9 -58,13 +59,12 @@@ import org.apache.accumulo.core.data.Ra
  import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.util.Timer;
 -import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.minicluster.ServerType;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
  import org.apache.accumulo.test.CloseScannerIT;
  import org.apache.accumulo.test.util.Wait;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.Text;
  import org.junit.jupiter.api.Test;
  import org.junit.jupiter.params.ParameterizedTest;
  import org.junit.jupiter.params.provider.EnumSource;
@@@ -59,6 -78,11 +78,11 @@@ public class ScannerIT extends Configur
      return Duration.ofMinutes(1);
    }
  
+   @Override
+   protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 -    cfg.setNumScanServers(1);
++    cfg.getClusterServerConfiguration().setNumDefaultScanServers(1);
+   }
+ 
    @Test
    public void testScannerReadaheadConfiguration() throws Exception {
      final String table = getUniqueNames(1)[0];
@@@ -227,7 -257,189 +251,185 @@@
        throw new IllegalArgumentException("Unsupported server type " + 
serverType);
      }
  
 -    long count = 0;
 -    for (String server : servers) {
 -      count += c.instanceOperations().getActiveScans(server).stream()
 -          .filter(activeScan -> 
activeScan.getTable().equals(tableName)).count();
 -    }
 -    return count;
 +    return c.instanceOperations().getActiveScans(servers).stream()
 +        .filter(activeScan -> 
activeScan.getTable().equals(tableName)).count();
    }
+ 
+   @Test
+   public void testIOExceptionDuringScanIterator() throws Exception {
+ 
+     getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+     var random = new SecureRandom();
+ 
+     Properties props = getClientProperties();
+     // configure scan server not to fallback to tablet servers
+     String profiles = "[{'isDefault':true,'maxBusyTimeout':'1s', 
'busyTimeoutMultiplier':8,"
+         + "'timeToWaitForScanServers':10h, "
+         + "'attemptPlans':[{'servers':'3', 'busyTimeout':'100ms'}]}]";
+     props.put(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() + 
"profiles", profiles);
+ 
+     final String table = getUniqueNames(1)[0];
+     try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+       client.tableOperations().create(table);
+ 
+       try (var writer = client.createBatchWriter(table)) {
+         for (int i = 0; i < 10; i++) {
+           Mutation m = new Mutation("row" + i);
+           m.put("", "", "");
+           writer.addMutation(m);
+         }
+       }
+ 
+       // need to flush data to disk so its visible to scan server
+       client.tableOperations().flush(table, null, null, true);
+ 
+       IteratorSetting iteratorSetting = new IteratorSetting(1000, 
ErrorThrowingIterator.class);
+       iteratorSetting.addOption(ErrorThrowingIterator.TIMES, "3");
+       // Set a single row to fail so that after splitting some tablets fail 
and some do not fail.
+       iteratorSetting.addOption(ErrorThrowingIterator.ROW, "row5");
+ 
+       // The batch scanner sends multiple extents in a single RPC. Need to 
try a mixture of failing
+       // and non failing extents for this RPC, so test w/ single tablet and 
three tablets.
+       for (List<String> splitsToAdd : List.of(List.<String>of(), 
List.of("row3", "row7"))) {
+         if (!splitsToAdd.isEmpty()) {
+           TreeSet<Text> splits =
+               
splitsToAdd.stream().map(Text::new).collect(Collectors.toCollection(TreeSet::new));
+           client.tableOperations().addSplits(table, splits);
+           // The scan server would not see these splits as it caches tablet 
info for a bit
+           getCluster().getClusterControl().stopAllServers(SCAN_SERVER);
+           getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+         }
+         // try tablet and scan server to ensure both have same behavior
+         for (var cl : ConsistencyLevel.values()) {
+           log.debug("Starting scan {} {}", cl, splitsToAdd);
+           try (var scanner = client.createScanner(table)) {
+             iteratorSetting.addOption(ErrorThrowingIterator.NAME, 
random.nextLong() + "");
+             scanner.addScanIterator(iteratorSetting);
+             scanner.setConsistencyLevel(cl);
+             assertEquals(10, scanner.stream().count());
+           }
+ 
+           log.debug("Starting batch scan {} {}", cl, splitsToAdd);
+           iteratorSetting.addOption(ErrorThrowingIterator.NAME, 
random.nextLong() + "");
+           try (var scanner = client.createBatchScanner(table)) {
+             scanner.setRanges(List.of(new Range()));
+             scanner.addScanIterator(iteratorSetting);
+             scanner.setConsistencyLevel(cl);
+             assertEquals(10, scanner.stream().count());
+           }
+         }
+       }
+ 
+       // ensure a repeating IOException in an iterator times out eventually
+       iteratorSetting.addOption(ErrorThrowingIterator.TIMES, "1000000");
+       var executor = Executors.newCachedThreadPool();
+       try {
+         List<Future<?>> futures = new ArrayList<>();
+         for (var consistencyLevel : List.of(IMMEDIATE, EVENTUAL)) {
+           iteratorSetting.addOption(ErrorThrowingIterator.NAME, 
random.nextLong() + "");
+           futures.add(executor
+               .submit(() -> expectScanTimeout(client, table, 
consistencyLevel, iteratorSetting)));
+           iteratorSetting.addOption(ErrorThrowingIterator.NAME, 
random.nextLong() + "");
+           futures.add(executor.submit(
+               () -> expectBatchScanTimeout(client, table, consistencyLevel, 
iteratorSetting)));
+         }
+ 
+         for (var future : futures) {
+           future.get();
+         }
+       } finally {
+         executor.shutdownNow();
+       }
+ 
+     }
+   }
+ 
+   @Test
+   public void testIOExceptionDuringScanFileOpen() throws Exception {
+ 
+     getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+ 
+     final String table = getUniqueNames(1)[0];
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+       client.tableOperations().create(table);
+ 
+       try (var writer = client.createBatchWriter(table)) {
+         for (int i = 0; i < 10; i++) {
+           Mutation m = new Mutation("row" + i);
+           m.put("", "", "");
+           writer.addMutation(m);
+         }
+       }
+ 
+       client.tableOperations().flush(table, null, null, true);
+ 
+       var ctx = (ClientContext) client;
+       var tableId = ctx.getTableId(table);
+ 
+       // Delete the tablets file to cause an IOException during opening the 
file. By default
+       // scanners will retry indefinitely when an IOException happens. Test 
setting a timeout on the
+       // scans for this case.
+       try (var tablets = 
ctx.getAmple().readTablets().forTable(tableId).fetch(FILES).build()) {
+         var tabletList = tablets.stream().collect(Collectors.toList());
+         assertEquals(1, tabletList.size());
+         for (var tablet : tabletList) {
+           var file = 
tablet.getFiles().stream().collect(MoreCollectors.onlyElement());
+           assertTrue(getCluster().getFileSystem().delete(file.getPath(), 
false));
+         }
+       }
+ 
+       // Run scans all concurrently to avoid waiting on each one to timeout 
sequentially.
+       var executor = Executors.newCachedThreadPool();
+       try {
+         List<Future<?>> futures = new ArrayList<>();
+         for (var consistencyLevel : List.of(IMMEDIATE, EVENTUAL)) {
+           futures.add(executor.submit(() -> expectScanTimeout(client, table, 
consistencyLevel)));
+           futures
+               .add(executor.submit(() -> expectBatchScanTimeout(client, 
table, consistencyLevel)));
+         }
+ 
+         for (var future : futures) {
+           future.get();
+         }
+       } finally {
+         executor.shutdownNow();
+       }
+     }
+   }
+ 
+   private static void expectBatchScanTimeout(AccumuloClient client, String 
table,
+       ConsistencyLevel consistencyLevel, IteratorSetting... iters) {
+     try (var scanner = client.createBatchScanner(table)) {
+       scanner.setRanges(List.of(new Range()));
+       scanner.setTimeout(5, TimeUnit.SECONDS);
+       scanner.setConsistencyLevel(consistencyLevel);
+       for (var iter : iters) {
+         scanner.addScanIterator(iter);
+       }
+       Timer timer = Timer.startNew();
+       assertThrows(TimedOutException.class, () -> scanner.stream().count());
+       long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+       assertTrue(elapsed >= 5000, () -> "elapsed : " + elapsed);
+     } catch (Exception e) {
+       throw new IllegalStateException(e);
+     }
+   }
+ 
+   private static void expectScanTimeout(AccumuloClient client, String table,
+       ConsistencyLevel consistencyLevel, IteratorSetting... iters) {
+     try (var scanner = client.createScanner(table)) {
+       scanner.setTimeout(5, TimeUnit.SECONDS);
+       scanner.setConsistencyLevel(consistencyLevel);
+       for (var iter : iters) {
+         scanner.addScanIterator(iter);
+       }
+       Timer timer = Timer.startNew();
+       var exception = assertThrows(RuntimeException.class, () -> 
scanner.stream().count());
+       assertEquals(ThriftScanner.ScanTimedOutException.class, 
exception.getCause().getClass());
+       long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+       assertTrue(elapsed >= 5000, () -> "elapsed : " + elapsed);
+     } catch (Exception e) {
+       throw new IllegalStateException(e);
+     }
+   }
  }
diff --cc test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
index b1b17a6b54,1152943247..53b5607bb4
--- a/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
@@@ -18,7 -18,8 +18,8 @@@
   */
  package org.apache.accumulo.test.functional;
  
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 +import static java.util.concurrent.TimeUnit.SECONDS;
+ import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertThrows;
  
  import java.time.Duration;
@@@ -91,9 -96,9 +95,9 @@@ public class TimeoutIT extends Accumulo
        bs.setRanges(Collections.singletonList(new Range()));
  
        // should not timeout
 -      bs.setTimeout(5, TimeUnit.SECONDS);
++      bs.setTimeout(5, SECONDS);
        bs.forEach((k, v) -> {});
  
-       bs.setTimeout(5, SECONDS);
        IteratorSetting iterSetting = new IteratorSetting(100, 
SlowIterator.class);
        iterSetting.addOption("sleepTime", 2000 + "");
        bs.addScanIterator(iterSetting);
@@@ -103,4 -108,32 +107,32 @@@
      }
    }
  
+   public void testScannerTimeout(AccumuloClient client, String tableName) 
throws Exception {
+     client.tableOperations().create(tableName);
+ 
+     try (BatchWriter bw = client.createBatchWriter(tableName)) {
+       Mutation m = new Mutation("r1");
+       m.put("cf1", "cq1", "v1");
+       m.put("cf1", "cq2", "v2");
+       m.put("cf1", "cq3", "v3");
+       m.put("cf1", "cq4", "v4");
+       bw.addMutation(m);
+     }
+ 
+     try (Scanner scanner = client.createScanner(tableName)) {
+       scanner.setRange(new Range());
+ 
+       // should not timeout
 -      scanner.setTimeout(5, TimeUnit.SECONDS);
++      scanner.setTimeout(5, SECONDS);
+       scanner.forEach((k, v) -> {});
+ 
+       IteratorSetting iterSetting = new IteratorSetting(100, 
SlowIterator.class);
+       iterSetting.addOption("sleepTime", 6000 + "");
+       scanner.addScanIterator(iterSetting);
+ 
+       var exception = assertThrows(RuntimeException.class, () -> 
scanner.iterator().next(),
+           "scanner did not time out");
+       assertEquals(ThriftScanner.ScanTimedOutException.class, 
exception.getCause().getClass());
+     }
+   }
  }

Reply via email to