This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new c4b2cf3fe9 Fixes multiple problems with scan timeout (#6307)
c4b2cf3fe9 is described below

commit c4b2cf3fe9a27cce59ed110634fc61292c70a8bf
Author: Keith Turner <[email protected]>
AuthorDate: Thu Apr 16 07:04:29 2026 -0700

    Fixes multiple problems with scan timeout (#6307)
    
    The following four scanner timeout problems are fixed in this commit.
    
    The batch scanner would timeout when servers repeatedly returned no
    data. The scanner would not timeout for this case. After this change
    both scanner and batch scanner will timeout for this case. This
    fixes #6107. Added tests for this.
    
    The scanner would retry on server side IOException (like a transient
    problem reading a file). The batch scanner would not retry for this
    case. Although the scanner would retry for this case, it would not
    timeout for this case. After this change both will retry for this case
    and both will properly timeout for this case. Added tests for this.
    
    The batch scanner code was not considering partial empty scans when
    deciding if an RPC to server made progress or not for the purpose of
    timeout tracking. A partial empty scan is the case where no data was
    returned by a RPC but the RPC indicated it had completed scanning part
    of a tablet. Update the timeout tracking code to consider this case.
    
    The batch scanner code had a timeout tracking object that could be used
    by multiple scan task running in a thread pool. It was not handling the
    case of the same server having multiple RPC task running concurrently
    against different extents. Slightly refactored the timeout tracking
    object to handle this case.
---
 .../apache/accumulo/core/client/ScannerBase.java   |   4 +-
 .../accumulo/core/clientImpl/ScannerIterator.java  |   4 +-
 .../TabletServerBatchReaderIterator.java           |  92 +++++++---
 .../accumulo/core/clientImpl/ThriftScanner.java    |   5 +-
 .../apache/accumulo/tserver/scan/LookupTask.java   |   6 +-
 .../test/functional/ErrorThrowingIterator.java     |  58 ++++--
 .../apache/accumulo/test/functional/ScannerIT.java | 203 +++++++++++++++++++++
 .../apache/accumulo/test/functional/TimeoutIT.java |  36 +++-
 8 files changed, 357 insertions(+), 51 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java 
b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index a9ae3e561e..6df879a265 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -202,8 +202,8 @@ public interface ScannerBase extends 
Iterable<Entry<Key,Value>>, AutoCloseable {
   Iterator<Entry<Key,Value>> iterator();
 
   /**
-   * This setting determines how long a scanner will automatically retry when 
a failure occurs. By
-   * default, a scanner will retry forever.
+   * This setting determines how long a scanner will automatically retry when 
a failure occurs or
+   * when no data is being returned by servers. By default, a scanner will 
retry forever.
    *
    * <p>
    * Setting the timeout to zero (with any time unit) or {@link 
Long#MAX_VALUE} (with
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
index d1eca1b198..e6e6f0b68f 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Timer;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
@@ -150,11 +151,12 @@ public class ScannerIterator implements 
Iterator<Entry<Key,Value>> {
 
     List<KeyValue> batch;
 
+    Timer scanTimer = Timer.startNew();
     do {
       synchronized (scanState) {
         // this is synchronized so its mutually exclusive with closing
         Preconditions.checkState(!closed.get(), "Scanner was closed");
-        batch = ThriftScanner.scan(scanState.context, scanState, timeOut);
+        batch = ThriftScanner.scan(scanState.context, scanState, timeOut, 
scanTimer);
       }
     } while (batch != null && batch.isEmpty());
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 9c042c961a..53a44dc0bf 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -36,6 +36,7 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -732,42 +733,60 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
   private static class TimeoutTracker {
 
-    String server;
-    Set<String> badServers;
-    long timeOut;
-    long activityTime;
+    final String server;
+    final Set<String> badServers;
+    final long timeOut;
+
+    // When failures happen, rpc task to scan a server may be requeued in a 
thread pool. These two
+    // variables track failures across task running in those thread pools.
     Long firstErrorTime = null;
+    Long firstAllFailureTime = null;
 
     TimeoutTracker(String server, Set<String> badServers, long timeOut) {
-      this(timeOut);
-      this.server = server;
+      this.timeOut = timeOut;
+      this.server = Objects.requireNonNull(server);
       this.badServers = badServers;
     }
 
     TimeoutTracker(long timeOut) {
       this.timeOut = timeOut;
+      this.badServers = null;
+      this.server = null;
     }
 
-    void startingScan() {
-      activityTime = System.currentTimeMillis();
-    }
+    class Session {
+      long activityTime;
 
-    void check() throws IOException {
-      if (System.currentTimeMillis() - activityTime > timeOut) {
-        badServers.add(server);
-        throw new IOException(
-            "Time exceeded " + (System.currentTimeMillis() - activityTime) + " 
" + server);
+      void check() throws IOException {
+        if (System.currentTimeMillis() - activityTime > timeOut) {
+          badServers.add(server);
+          throw new IOException(
+              "Time exceeded " + (System.currentTimeMillis() - activityTime) + 
" " + server);
+        }
+      }
+
+      void madeProgress() {
+        activityTime = System.currentTimeMillis();
+        synchronized (TimeoutTracker.this) {
+          firstErrorTime = null;
+          firstAllFailureTime = null;
+        }
       }
     }
 
-    void madeProgress() {
-      activityTime = System.currentTimeMillis();
-      firstErrorTime = null;
+    /**
+     * Multiple threads can scan different exents on the same server at the 
same time. The session
+     * allows each potential rpc thread to have its own activityTime.
+     */
+    Session startingScan() throws IOException {
+      var session = new Session();
+      session.activityTime = System.currentTimeMillis();
+      return session;
     }
 
-    void errorOccured() {
+    synchronized void errorOccured(Session session) {
       if (firstErrorTime == null) {
-        firstErrorTime = activityTime;
+        firstErrorTime = session.activityTime;
       } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
         badServers.add(server);
       }
@@ -776,6 +795,16 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
     public long getTimeOut() {
       return timeOut;
     }
+
+    synchronized void sawOnlyFailures(Session session) throws IOException {
+      if (firstAllFailureTime == null) {
+        firstAllFailureTime = session.activityTime;
+      } else if (System.currentTimeMillis() - firstAllFailureTime > timeOut) {
+        badServers.add(server);
+        throw new IOException(
+            "Time exceeded " + (System.currentTimeMillis() - 
firstAllFailureTime) + " " + server);
+      }
+    }
   }
 
   public static void doLookup(ClientContext context, String server,
@@ -806,7 +835,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
       unscanned.put(KeyExtent.copyOf(entry.getKey()), ranges);
     }
 
-    timeoutTracker.startingScan();
+    var timeoutSession = timeoutTracker.startingScan();
     try {
       final HostAndPort parsedServer = HostAndPort.fromString(server);
       final TabletScanClientService.Client client;
@@ -873,8 +902,14 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
           receiver.receive(entries);
         }
 
-        if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
-          timeoutTracker.madeProgress();
+        if (!entries.isEmpty() || !scanResult.fullScans.isEmpty() || 
scanResult.partScan != null) {
+          // Got some data back, finished scanning a tablet w/o getting data, 
or partially scanned a
+          // tablet w/o getting data. Any of these indicate the scan is making 
progress.
+          timeoutSession.madeProgress();
+        } else if (!scanResult.failures.isEmpty()) {
+          // Observed no progress and only tablets failed. Want to eventually 
timeout if this
+          // situation continues.
+          timeoutTracker.sawOnlyFailures(timeoutSession);
         }
 
         trackScanning(failures, unscanned, scanResult);
@@ -883,7 +918,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
         while (scanResult.more) {
 
-          timeoutTracker.check();
+          timeoutSession.check();
 
           if (timer != null) {
             log.trace("oid={} Continuing multi scan, scanid={}", 
nextOpid.get(), imsr.scanID);
@@ -908,8 +943,11 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
             receiver.receive(entries);
           }
 
-          if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
-            timeoutTracker.madeProgress();
+          if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()
+              || scanResult.partScan != null) {
+            timeoutSession.madeProgress();
+          } else if (!scanResult.failures.isEmpty()) {
+            timeoutTracker.sawOnlyFailures(timeoutSession);
           }
 
           trackScanning(failures, unscanned, scanResult);
@@ -936,7 +974,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
       }
     } catch (TTransportException e) {
       log.debug("Server : {} msg : {}", server, e.getMessage());
-      timeoutTracker.errorOccured();
+      timeoutTracker.errorOccured(timeoutSession);
       throw new IOException(e);
     } catch (ThriftSecurityException e) {
       log.debug("Server : {} msg : {}", server, e.getMessage(), e);
@@ -961,7 +999,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
       throw new SampleNotPresentException(message, e);
     } catch (TException e) {
       log.debug("Server : {} msg : {}", server, e.getMessage(), e);
-      timeoutTracker.errorOccured();
+      timeoutTracker.errorOccured(timeoutSession);
       throw new IOException(e);
     }
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 59871e9917..e418642913 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -324,11 +324,10 @@ public class ThriftScanner {
     return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble() 
/ 5));
   }
 
-  public static List<KeyValue> scan(ClientContext context, ScanState 
scanState, Duration timeOut)
-      throws ScanTimedOutException, AccumuloException, 
AccumuloSecurityException,
+  public static List<KeyValue> scan(ClientContext context, ScanState 
scanState, Duration timeOut,
+      Timer scanTimer) throws ScanTimedOutException, AccumuloException, 
AccumuloSecurityException,
       TableNotFoundException {
     TabletLocation loc = null;
-    Timer scanTimer = Timer.startNew();
     String lastError = null;
     String error = null;
     int tooManyFilesCount = 0;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index 8eb15ea6bb..8b2d2c0139 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -137,8 +137,10 @@ public class LookupTask extends ScanTask<MultiScanResult> {
           interruptFlag.set(false);
 
         } catch (IOException e) {
-          log.warn("lookup failed for tablet " + extent, e);
-          throw new RuntimeException(e);
+          log.warn("lookup failed for tablet {} client will retry", extent, e);
+          // add extent to failure set and the client will retry it
+          failures.put(extent, ranges);
+          continue;
         }
 
         bytesAdded += lookupResult.bytesAdded;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
index d9cb2c0e8d..32dcdbf9ec 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.test.functional;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.data.ByteSequence;
@@ -30,6 +31,8 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -39,67 +42,94 @@ import com.google.common.base.Preconditions;
  */
 public class ErrorThrowingIterator extends WrappingIterator {
 
+  private static final Logger log = 
LoggerFactory.getLogger(ErrorThrowingIterator.class);
+
   public static final String TIMES = "error.throwing.iterator.times";
+  public static final String NAME = "error.throwing.iterator.name";
+  public static final String ROW = "error.throwing.iterator.row";
 
   private static final String MESSAGE = "Exception thrown from 
ErrorThrowingIterator";
   private static final RuntimeException ERROR = new RuntimeException(MESSAGE);
-  private static final AtomicInteger TIMES_THROWN = new AtomicInteger(0);
+  private static final Map<String,AtomicInteger> TIMES_THROWN = new 
ConcurrentHashMap<>();
 
   private int threshold = 0;
+  private String name;
+  private String row;
+
+  private static AtomicInteger getCounter(String name) {
+    return TIMES_THROWN.computeIfAbsent(name, n -> new AtomicInteger());
+  }
 
   @Override
   public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
       IteratorEnvironment env) throws IOException {
     super.init(source, options, env);
     threshold = Integer.parseInt(options.get(TIMES));
-    Preconditions.checkState(TIMES_THROWN.get() <= threshold,
-        "This iterator does not"
-            + " support reuse within the same VM. If using in an IT, then be 
sure to use"
-            + " a different MAC instance between tests.");
+    name = options.getOrDefault(NAME, "");
+    row = options.getOrDefault(ROW, null);
+
+    Preconditions.checkState(getCounter(name).get() <= threshold,
+        "This iterator does not support reuse within the same VM (name='" + 
name
+            + "'). If using in an IT, then be sure to use"
+            + " a different MAC instance between tests or set a different 
name.");
   }
 
   private void incrementAndThrow(RuntimeException t) {
-    if (TIMES_THROWN.get() < threshold) {
-      TIMES_THROWN.incrementAndGet();
+    var counter = getCounter(name);
+    if (counter.get() < threshold) {
+      counter.incrementAndGet();
+      log.info("Throwing {}", t.getClass().getName());
       throw t;
     }
   }
 
   private void incrementAndThrowIOE() throws IOException {
-    if (TIMES_THROWN.get() < threshold) {
-      TIMES_THROWN.incrementAndGet();
+    var counter = getCounter(name);
+    if (counter.get() < threshold) {
+      counter.incrementAndGet();
+      log.info("Throwing IOException");
       throw new IOException(MESSAGE);
     }
   }
 
   @Override
   public Key getTopKey() {
-    incrementAndThrow(ERROR);
+    if (row == null) {
+      incrementAndThrow(ERROR);
+    }
     return super.getTopKey();
   }
 
   @Override
   public Value getTopValue() {
-    incrementAndThrow(ERROR);
+    if (row == null) {
+      incrementAndThrow(ERROR);
+    }
     return super.getTopValue();
   }
 
   @Override
   public boolean hasTop() {
-    incrementAndThrow(ERROR);
+    if (row == null) {
+      incrementAndThrow(ERROR);
+    }
     return super.hasTop();
   }
 
   @Override
   public void next() throws IOException {
-    incrementAndThrowIOE();
+    if (row == null || super.getTopKey().getRowData().toString().equals(row)) {
+      incrementAndThrowIOE();
+    }
     super.next();
   }
 
   @Override
   public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
       throws IOException {
-    incrementAndThrowIOE();
+    if (row == null) {
+      incrementAndThrowIOE();
+    }
     super.seek(range, columnFamilies, inclusive);
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 0b71ec83c0..2a6d5d9287 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -20,16 +20,26 @@ package org.apache.accumulo.test.functional;
 
 import static 
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL;
 import static 
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
 import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.security.SecureRandom;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -37,20 +47,30 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.ThriftScanner;
+import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.CloseScannerIT;
 import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
+import com.google.common.collect.MoreCollectors;
+
 public class ScannerIT extends ConfigurableMacBase {
 
   @Override
@@ -58,6 +78,11 @@ public class ScannerIT extends ConfigurableMacBase {
     return Duration.ofMinutes(1);
   }
 
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setNumScanServers(1);
+  }
+
   @Test
   public void testScannerReadaheadConfiguration() throws Exception {
     final String table = getUniqueNames(1)[0];
@@ -239,4 +264,182 @@ public class ScannerIT extends ConfigurableMacBase {
     }
     return count;
   }
+
+  @Test
+  public void testIOExceptionDuringScanIterator() throws Exception {
+
+    getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+    var random = new SecureRandom();
+
+    Properties props = getClientProperties();
+    // configure scan server not to fallback to tablet servers
+    String profiles = "[{'isDefault':true,'maxBusyTimeout':'1s', 
'busyTimeoutMultiplier':8,"
+        + "'timeToWaitForScanServers':10h, "
+        + "'attemptPlans':[{'servers':'3', 'busyTimeout':'100ms'}]}]";
+    props.put(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() + 
"profiles", profiles);
+
+    final String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+      client.tableOperations().create(table);
+
+      try (var writer = client.createBatchWriter(table)) {
+        for (int i = 0; i < 10; i++) {
+          Mutation m = new Mutation("row" + i);
+          m.put("", "", "");
+          writer.addMutation(m);
+        }
+      }
+
+      // need to flush data to disk so its visible to scan server
+      client.tableOperations().flush(table, null, null, true);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(1000, 
ErrorThrowingIterator.class);
+      iteratorSetting.addOption(ErrorThrowingIterator.TIMES, "3");
+      // Set a single row to fail so that after splitting some tablets fail 
and some do not fail.
+      iteratorSetting.addOption(ErrorThrowingIterator.ROW, "row5");
+
+      // The batch scanner sends multiple extents in a single RPC. Need to try 
a mixture of failing
+      // and non failing extents for this RPC, so test w/ single tablet and 
three tablets.
+      for (List<String> splitsToAdd : List.of(List.<String>of(), 
List.of("row3", "row7"))) {
+        if (!splitsToAdd.isEmpty()) {
+          TreeSet<Text> splits =
+              
splitsToAdd.stream().map(Text::new).collect(Collectors.toCollection(TreeSet::new));
+          client.tableOperations().addSplits(table, splits);
+          // The scan server would not see these splits as it caches tablet 
info for a bit
+          getCluster().getClusterControl().stopAllServers(SCAN_SERVER);
+          getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+        }
+        // try tablet and scan server to ensure both have same behavior
+        for (var cl : ConsistencyLevel.values()) {
+          log.debug("Starting scan {} {}", cl, splitsToAdd);
+          try (var scanner = client.createScanner(table)) {
+            iteratorSetting.addOption(ErrorThrowingIterator.NAME, 
random.nextLong() + "");
+            scanner.addScanIterator(iteratorSetting);
+            scanner.setConsistencyLevel(cl);
+            assertEquals(10, scanner.stream().count());
+          }
+
+          log.debug("Starting batch scan {} {}", cl, splitsToAdd);
+          iteratorSetting.addOption(ErrorThrowingIterator.NAME, 
random.nextLong() + "");
+          try (var scanner = client.createBatchScanner(table)) {
+            scanner.setRanges(List.of(new Range()));
+            scanner.addScanIterator(iteratorSetting);
+            scanner.setConsistencyLevel(cl);
+            assertEquals(10, scanner.stream().count());
+          }
+        }
+      }
+
+      // ensure a repeating IOException in an iterator times out eventually
+      iteratorSetting.addOption(ErrorThrowingIterator.TIMES, "1000000");
+      var executor = Executors.newCachedThreadPool();
+      try {
+        List<Future<?>> futures = new ArrayList<>();
+        for (var consistencyLevel : List.of(IMMEDIATE, EVENTUAL)) {
+          iteratorSetting.addOption(ErrorThrowingIterator.NAME, 
random.nextLong() + "");
+          futures.add(executor
+              .submit(() -> expectScanTimeout(client, table, consistencyLevel, 
iteratorSetting)));
+          iteratorSetting.addOption(ErrorThrowingIterator.NAME, 
random.nextLong() + "");
+          futures.add(executor.submit(
+              () -> expectBatchScanTimeout(client, table, consistencyLevel, 
iteratorSetting)));
+        }
+
+        for (var future : futures) {
+          future.get();
+        }
+      } finally {
+        executor.shutdownNow();
+      }
+
+    }
+  }
+
+  @Test
+  public void testIOExceptionDuringScanFileOpen() throws Exception {
+
+    getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+
+    final String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      client.tableOperations().create(table);
+
+      try (var writer = client.createBatchWriter(table)) {
+        for (int i = 0; i < 10; i++) {
+          Mutation m = new Mutation("row" + i);
+          m.put("", "", "");
+          writer.addMutation(m);
+        }
+      }
+
+      client.tableOperations().flush(table, null, null, true);
+
+      var ctx = (ClientContext) client;
+      var tableId = ctx.getTableId(table);
+
+      // Delete the tablets file to cause an IOException during opening the 
file. By default
+      // scanners will retry indefinitely when an IOException happens. Test 
setting a timeout on the
+      // scans for this case.
+      try (var tablets = 
ctx.getAmple().readTablets().forTable(tableId).fetch(FILES).build()) {
+        var tabletList = tablets.stream().collect(Collectors.toList());
+        assertEquals(1, tabletList.size());
+        for (var tablet : tabletList) {
+          var file = 
tablet.getFiles().stream().collect(MoreCollectors.onlyElement());
+          assertTrue(getCluster().getFileSystem().delete(file.getPath(), 
false));
+        }
+      }
+
+      // Run scans all concurrently to avoid waiting on each one to timeout 
sequentially.
+      var executor = Executors.newCachedThreadPool();
+      try {
+        List<Future<?>> futures = new ArrayList<>();
+        for (var consistencyLevel : List.of(IMMEDIATE, EVENTUAL)) {
+          futures.add(executor.submit(() -> expectScanTimeout(client, table, 
consistencyLevel)));
+          futures
+              .add(executor.submit(() -> expectBatchScanTimeout(client, table, 
consistencyLevel)));
+        }
+
+        for (var future : futures) {
+          future.get();
+        }
+      } finally {
+        executor.shutdownNow();
+      }
+    }
+  }
+
+  private static void expectBatchScanTimeout(AccumuloClient client, String 
table,
+      ConsistencyLevel consistencyLevel, IteratorSetting... iters) {
+    try (var scanner = client.createBatchScanner(table)) {
+      scanner.setRanges(List.of(new Range()));
+      scanner.setTimeout(5, TimeUnit.SECONDS);
+      scanner.setConsistencyLevel(consistencyLevel);
+      for (var iter : iters) {
+        scanner.addScanIterator(iter);
+      }
+      Timer timer = Timer.startNew();
+      assertThrows(TimedOutException.class, () -> scanner.stream().count());
+      long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+      assertTrue(elapsed >= 5000, () -> "elapsed : " + elapsed);
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private static void expectScanTimeout(AccumuloClient client, String table,
+      ConsistencyLevel consistencyLevel, IteratorSetting... iters) {
+    try (var scanner = client.createScanner(table)) {
+      scanner.setTimeout(5, TimeUnit.SECONDS);
+      scanner.setConsistencyLevel(consistencyLevel);
+      for (var iter : iters) {
+        scanner.addScanIterator(iter);
+      }
+      Timer timer = Timer.startNew();
+      var exception = assertThrows(RuntimeException.class, () -> 
scanner.stream().count());
+      assertEquals(ThriftScanner.ScanTimedOutException.class, 
exception.getCause().getClass());
+      long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+      assertTrue(elapsed >= 5000, () -> "elapsed : " + elapsed);
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
index fdbfcd820d..1152943247 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.test.functional;
 
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.time.Duration;
@@ -32,7 +33,9 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.clientImpl.ThriftScanner;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -48,9 +51,10 @@ public class TimeoutIT extends AccumuloClusterHarness {
   @Test
   public void run() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
-      String[] tableNames = getUniqueNames(2);
+      String[] tableNames = getUniqueNames(3);
       testBatchWriterTimeout(client, tableNames[0]);
       testBatchScannerTimeout(client, tableNames[1]);
+      testScannerTimeout(client, tableNames[2]);
     }
   }
 
@@ -92,9 +96,9 @@ public class TimeoutIT extends AccumuloClusterHarness {
       bs.setRanges(Collections.singletonList(new Range()));
 
       // should not timeout
+      bs.setTimeout(5, TimeUnit.SECONDS);
       bs.forEach((k, v) -> {});
 
-      bs.setTimeout(5, TimeUnit.SECONDS);
       IteratorSetting iterSetting = new IteratorSetting(100, 
SlowIterator.class);
       iterSetting.addOption("sleepTime", 2000 + "");
       bs.addScanIterator(iterSetting);
@@ -104,4 +108,32 @@ public class TimeoutIT extends AccumuloClusterHarness {
     }
   }
 
+  public void testScannerTimeout(AccumuloClient client, String tableName) 
throws Exception {
+    client.tableOperations().create(tableName);
+
+    try (BatchWriter bw = client.createBatchWriter(tableName)) {
+      Mutation m = new Mutation("r1");
+      m.put("cf1", "cq1", "v1");
+      m.put("cf1", "cq2", "v2");
+      m.put("cf1", "cq3", "v3");
+      m.put("cf1", "cq4", "v4");
+      bw.addMutation(m);
+    }
+
+    try (Scanner scanner = client.createScanner(tableName)) {
+      scanner.setRange(new Range());
+
+      // should not timeout
+      scanner.setTimeout(5, TimeUnit.SECONDS);
+      scanner.forEach((k, v) -> {});
+
+      IteratorSetting iterSetting = new IteratorSetting(100, 
SlowIterator.class);
+      iterSetting.addOption("sleepTime", 6000 + "");
+      scanner.addScanIterator(iterSetting);
+
+      var exception = assertThrows(RuntimeException.class, () -> 
scanner.iterator().next(),
+          "scanner did not time out");
+      assertEquals(ThriftScanner.ScanTimedOutException.class, 
exception.getCause().getClass());
+    }
+  }
 }

Reply via email to