This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new c4b2cf3fe9 Fixes multiple problems with scan timeout (#6307)
c4b2cf3fe9 is described below
commit c4b2cf3fe9a27cce59ed110634fc61292c70a8bf
Author: Keith Turner <[email protected]>
AuthorDate: Thu Apr 16 07:04:29 2026 -0700
Fixes multiple problems with scan timeout (#6307)
The following four scanner timeout problems are fixed in this commit.
The batch scanner would timeout when servers repeatedly returned no
data. The scanner would not timeout for this case. After this change
both scanner and batch scanner will timeout for this case. This
fixes #6107. Added tests for this.
The scanner would retry on server side IOException (like a transient
problem reading a file). The batch scanner would not retry for this
case. Although the scanner would retry for this case, it would not
timeout for this case. After this change both will retry for this case
and both will properly timeout for this case. Added tests for this.
The batch scanner code was not considering partial empty scans when
deciding if an RPC to server made progress or not for the purpose of
timeout tracking. A partial empty scan is the case where no data was
returned by a RPC but the RPC indicated it had completed scanning part
of a tablet. Update the timeout tracking code to consider this case.
The batch scanner code had a timeout tracking object that could be used
by multiple scan task running in a thread pool. It was not handling the
case of the same server having multiple RPC task running concurrently
against different extents. Slightly refactored the timeout tracking
object to handle this case.
---
.../apache/accumulo/core/client/ScannerBase.java | 4 +-
.../accumulo/core/clientImpl/ScannerIterator.java | 4 +-
.../TabletServerBatchReaderIterator.java | 92 +++++++---
.../accumulo/core/clientImpl/ThriftScanner.java | 5 +-
.../apache/accumulo/tserver/scan/LookupTask.java | 6 +-
.../test/functional/ErrorThrowingIterator.java | 58 ++++--
.../apache/accumulo/test/functional/ScannerIT.java | 203 +++++++++++++++++++++
.../apache/accumulo/test/functional/TimeoutIT.java | 36 +++-
8 files changed, 357 insertions(+), 51 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index a9ae3e561e..6df879a265 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -202,8 +202,8 @@ public interface ScannerBase extends
Iterable<Entry<Key,Value>>, AutoCloseable {
Iterator<Entry<Key,Value>> iterator();
/**
- * This setting determines how long a scanner will automatically retry when
a failure occurs. By
- * default, a scanner will retry forever.
+ * This setting determines how long a scanner will automatically retry when
a failure occurs or
+ * when no data is being returned by servers. By default, a scanner will
retry forever.
*
* <p>
* Setting the timeout to zero (with any time unit) or {@link
Long#MAX_VALUE} (with
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
index d1eca1b198..e6e6f0b68f 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Timer;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
@@ -150,11 +151,12 @@ public class ScannerIterator implements
Iterator<Entry<Key,Value>> {
List<KeyValue> batch;
+ Timer scanTimer = Timer.startNew();
do {
synchronized (scanState) {
// this is synchronized so its mutually exclusive with closing
Preconditions.checkState(!closed.get(), "Scanner was closed");
- batch = ThriftScanner.scan(scanState.context, scanState, timeOut);
+ batch = ThriftScanner.scan(scanState.context, scanState, timeOut,
scanTimer);
}
} while (batch != null && batch.isEmpty());
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 9c042c961a..53a44dc0bf 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -36,6 +36,7 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -732,42 +733,60 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
private static class TimeoutTracker {
- String server;
- Set<String> badServers;
- long timeOut;
- long activityTime;
+ final String server;
+ final Set<String> badServers;
+ final long timeOut;
+
+ // When failures happen, rpc task to scan a server may be requeued in a
thread pool. These two
+ // variables track failures across task running in those thread pools.
Long firstErrorTime = null;
+ Long firstAllFailureTime = null;
TimeoutTracker(String server, Set<String> badServers, long timeOut) {
- this(timeOut);
- this.server = server;
+ this.timeOut = timeOut;
+ this.server = Objects.requireNonNull(server);
this.badServers = badServers;
}
TimeoutTracker(long timeOut) {
this.timeOut = timeOut;
+ this.badServers = null;
+ this.server = null;
}
- void startingScan() {
- activityTime = System.currentTimeMillis();
- }
+ class Session {
+ long activityTime;
- void check() throws IOException {
- if (System.currentTimeMillis() - activityTime > timeOut) {
- badServers.add(server);
- throw new IOException(
- "Time exceeded " + (System.currentTimeMillis() - activityTime) + "
" + server);
+ void check() throws IOException {
+ if (System.currentTimeMillis() - activityTime > timeOut) {
+ badServers.add(server);
+ throw new IOException(
+ "Time exceeded " + (System.currentTimeMillis() - activityTime) +
" " + server);
+ }
+ }
+
+ void madeProgress() {
+ activityTime = System.currentTimeMillis();
+ synchronized (TimeoutTracker.this) {
+ firstErrorTime = null;
+ firstAllFailureTime = null;
+ }
}
}
- void madeProgress() {
- activityTime = System.currentTimeMillis();
- firstErrorTime = null;
+ /**
+ * Multiple threads can scan different exents on the same server at the
same time. The session
+ * allows each potential rpc thread to have its own activityTime.
+ */
+ Session startingScan() throws IOException {
+ var session = new Session();
+ session.activityTime = System.currentTimeMillis();
+ return session;
}
- void errorOccured() {
+ synchronized void errorOccured(Session session) {
if (firstErrorTime == null) {
- firstErrorTime = activityTime;
+ firstErrorTime = session.activityTime;
} else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
badServers.add(server);
}
@@ -776,6 +795,16 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
public long getTimeOut() {
return timeOut;
}
+
+ synchronized void sawOnlyFailures(Session session) throws IOException {
+ if (firstAllFailureTime == null) {
+ firstAllFailureTime = session.activityTime;
+ } else if (System.currentTimeMillis() - firstAllFailureTime > timeOut) {
+ badServers.add(server);
+ throw new IOException(
+ "Time exceeded " + (System.currentTimeMillis() -
firstAllFailureTime) + " " + server);
+ }
+ }
}
public static void doLookup(ClientContext context, String server,
@@ -806,7 +835,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
unscanned.put(KeyExtent.copyOf(entry.getKey()), ranges);
}
- timeoutTracker.startingScan();
+ var timeoutSession = timeoutTracker.startingScan();
try {
final HostAndPort parsedServer = HostAndPort.fromString(server);
final TabletScanClientService.Client client;
@@ -873,8 +902,14 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
receiver.receive(entries);
}
- if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
- timeoutTracker.madeProgress();
+ if (!entries.isEmpty() || !scanResult.fullScans.isEmpty() ||
scanResult.partScan != null) {
+ // Got some data back, finished scanning a tablet w/o getting data,
or partially scanned a
+ // tablet w/o getting data. Any of these indicate the scan is making
progress.
+ timeoutSession.madeProgress();
+ } else if (!scanResult.failures.isEmpty()) {
+ // Observed no progress and only tablets failed. Want to eventually
timeout if this
+ // situation continues.
+ timeoutTracker.sawOnlyFailures(timeoutSession);
}
trackScanning(failures, unscanned, scanResult);
@@ -883,7 +918,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
while (scanResult.more) {
- timeoutTracker.check();
+ timeoutSession.check();
if (timer != null) {
log.trace("oid={} Continuing multi scan, scanid={}",
nextOpid.get(), imsr.scanID);
@@ -908,8 +943,11 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
receiver.receive(entries);
}
- if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
- timeoutTracker.madeProgress();
+ if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()
+ || scanResult.partScan != null) {
+ timeoutSession.madeProgress();
+ } else if (!scanResult.failures.isEmpty()) {
+ timeoutTracker.sawOnlyFailures(timeoutSession);
}
trackScanning(failures, unscanned, scanResult);
@@ -936,7 +974,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
}
} catch (TTransportException e) {
log.debug("Server : {} msg : {}", server, e.getMessage());
- timeoutTracker.errorOccured();
+ timeoutTracker.errorOccured(timeoutSession);
throw new IOException(e);
} catch (ThriftSecurityException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
@@ -961,7 +999,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
throw new SampleNotPresentException(message, e);
} catch (TException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
- timeoutTracker.errorOccured();
+ timeoutTracker.errorOccured(timeoutSession);
throw new IOException(e);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 59871e9917..e418642913 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -324,11 +324,10 @@ public class ThriftScanner {
return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble()
/ 5));
}
- public static List<KeyValue> scan(ClientContext context, ScanState
scanState, Duration timeOut)
- throws ScanTimedOutException, AccumuloException,
AccumuloSecurityException,
+ public static List<KeyValue> scan(ClientContext context, ScanState
scanState, Duration timeOut,
+ Timer scanTimer) throws ScanTimedOutException, AccumuloException,
AccumuloSecurityException,
TableNotFoundException {
TabletLocation loc = null;
- Timer scanTimer = Timer.startNew();
String lastError = null;
String error = null;
int tooManyFilesCount = 0;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index 8eb15ea6bb..8b2d2c0139 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -137,8 +137,10 @@ public class LookupTask extends ScanTask<MultiScanResult> {
interruptFlag.set(false);
} catch (IOException e) {
- log.warn("lookup failed for tablet " + extent, e);
- throw new RuntimeException(e);
+ log.warn("lookup failed for tablet {} client will retry", extent, e);
+ // add extent to failure set and the client will retry it
+ failures.put(extent, ranges);
+ continue;
}
bytesAdded += lookupResult.bytesAdded;
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
b/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
index d9cb2c0e8d..32dcdbf9ec 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.test.functional;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.data.ByteSequence;
@@ -30,6 +31,8 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
@@ -39,67 +42,94 @@ import com.google.common.base.Preconditions;
*/
public class ErrorThrowingIterator extends WrappingIterator {
+ private static final Logger log =
LoggerFactory.getLogger(ErrorThrowingIterator.class);
+
public static final String TIMES = "error.throwing.iterator.times";
+ public static final String NAME = "error.throwing.iterator.name";
+ public static final String ROW = "error.throwing.iterator.row";
private static final String MESSAGE = "Exception thrown from
ErrorThrowingIterator";
private static final RuntimeException ERROR = new RuntimeException(MESSAGE);
- private static final AtomicInteger TIMES_THROWN = new AtomicInteger(0);
+ private static final Map<String,AtomicInteger> TIMES_THROWN = new
ConcurrentHashMap<>();
private int threshold = 0;
+ private String name;
+ private String row;
+
+ private static AtomicInteger getCounter(String name) {
+ return TIMES_THROWN.computeIfAbsent(name, n -> new AtomicInteger());
+ }
@Override
public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
IteratorEnvironment env) throws IOException {
super.init(source, options, env);
threshold = Integer.parseInt(options.get(TIMES));
- Preconditions.checkState(TIMES_THROWN.get() <= threshold,
- "This iterator does not"
- + " support reuse within the same VM. If using in an IT, then be
sure to use"
- + " a different MAC instance between tests.");
+ name = options.getOrDefault(NAME, "");
+ row = options.getOrDefault(ROW, null);
+
+ Preconditions.checkState(getCounter(name).get() <= threshold,
+ "This iterator does not support reuse within the same VM (name='" +
name
+ + "'). If using in an IT, then be sure to use"
+ + " a different MAC instance between tests or set a different
name.");
}
private void incrementAndThrow(RuntimeException t) {
- if (TIMES_THROWN.get() < threshold) {
- TIMES_THROWN.incrementAndGet();
+ var counter = getCounter(name);
+ if (counter.get() < threshold) {
+ counter.incrementAndGet();
+ log.info("Throwing {}", t.getClass().getName());
throw t;
}
}
private void incrementAndThrowIOE() throws IOException {
- if (TIMES_THROWN.get() < threshold) {
- TIMES_THROWN.incrementAndGet();
+ var counter = getCounter(name);
+ if (counter.get() < threshold) {
+ counter.incrementAndGet();
+ log.info("Throwing IOException");
throw new IOException(MESSAGE);
}
}
@Override
public Key getTopKey() {
- incrementAndThrow(ERROR);
+ if (row == null) {
+ incrementAndThrow(ERROR);
+ }
return super.getTopKey();
}
@Override
public Value getTopValue() {
- incrementAndThrow(ERROR);
+ if (row == null) {
+ incrementAndThrow(ERROR);
+ }
return super.getTopValue();
}
@Override
public boolean hasTop() {
- incrementAndThrow(ERROR);
+ if (row == null) {
+ incrementAndThrow(ERROR);
+ }
return super.hasTop();
}
@Override
public void next() throws IOException {
- incrementAndThrowIOE();
+ if (row == null || super.getTopKey().getRowData().toString().equals(row)) {
+ incrementAndThrowIOE();
+ }
super.next();
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
throws IOException {
- incrementAndThrowIOE();
+ if (row == null) {
+ incrementAndThrowIOE();
+ }
super.seek(range, columnFamilies, inclusive);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 0b71ec83c0..2a6d5d9287 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -20,16 +20,26 @@ package org.apache.accumulo.test.functional;
import static
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL;
import static
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE;
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.security.SecureRandom;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -37,20 +47,30 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.ThriftScanner;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.CloseScannerIT;
import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import com.google.common.collect.MoreCollectors;
+
public class ScannerIT extends ConfigurableMacBase {
@Override
@@ -58,6 +78,11 @@ public class ScannerIT extends ConfigurableMacBase {
return Duration.ofMinutes(1);
}
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setNumScanServers(1);
+ }
+
@Test
public void testScannerReadaheadConfiguration() throws Exception {
final String table = getUniqueNames(1)[0];
@@ -239,4 +264,182 @@ public class ScannerIT extends ConfigurableMacBase {
}
return count;
}
+
+ @Test
+ public void testIOExceptionDuringScanIterator() throws Exception {
+
+ getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+ var random = new SecureRandom();
+
+ Properties props = getClientProperties();
+ // configure scan server not to fallback to tablet servers
+ String profiles = "[{'isDefault':true,'maxBusyTimeout':'1s',
'busyTimeoutMultiplier':8,"
+ + "'timeToWaitForScanServers':10h, "
+ + "'attemptPlans':[{'servers':'3', 'busyTimeout':'100ms'}]}]";
+ props.put(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() +
"profiles", profiles);
+
+ final String table = getUniqueNames(1)[0];
+ try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+ client.tableOperations().create(table);
+
+ try (var writer = client.createBatchWriter(table)) {
+ for (int i = 0; i < 10; i++) {
+ Mutation m = new Mutation("row" + i);
+ m.put("", "", "");
+ writer.addMutation(m);
+ }
+ }
+
+ // need to flush data to disk so its visible to scan server
+ client.tableOperations().flush(table, null, null, true);
+
+ IteratorSetting iteratorSetting = new IteratorSetting(1000,
ErrorThrowingIterator.class);
+ iteratorSetting.addOption(ErrorThrowingIterator.TIMES, "3");
+ // Set a single row to fail so that after splitting some tablets fail
and some do not fail.
+ iteratorSetting.addOption(ErrorThrowingIterator.ROW, "row5");
+
+ // The batch scanner sends multiple extents in a single RPC. Need to try
a mixture of failing
+ // and non failing extents for this RPC, so test w/ single tablet and
three tablets.
+ for (List<String> splitsToAdd : List.of(List.<String>of(),
List.of("row3", "row7"))) {
+ if (!splitsToAdd.isEmpty()) {
+ TreeSet<Text> splits =
+
splitsToAdd.stream().map(Text::new).collect(Collectors.toCollection(TreeSet::new));
+ client.tableOperations().addSplits(table, splits);
+ // The scan server would not see these splits as it caches tablet
info for a bit
+ getCluster().getClusterControl().stopAllServers(SCAN_SERVER);
+ getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+ }
+ // try tablet and scan server to ensure both have same behavior
+ for (var cl : ConsistencyLevel.values()) {
+ log.debug("Starting scan {} {}", cl, splitsToAdd);
+ try (var scanner = client.createScanner(table)) {
+ iteratorSetting.addOption(ErrorThrowingIterator.NAME,
random.nextLong() + "");
+ scanner.addScanIterator(iteratorSetting);
+ scanner.setConsistencyLevel(cl);
+ assertEquals(10, scanner.stream().count());
+ }
+
+ log.debug("Starting batch scan {} {}", cl, splitsToAdd);
+ iteratorSetting.addOption(ErrorThrowingIterator.NAME,
random.nextLong() + "");
+ try (var scanner = client.createBatchScanner(table)) {
+ scanner.setRanges(List.of(new Range()));
+ scanner.addScanIterator(iteratorSetting);
+ scanner.setConsistencyLevel(cl);
+ assertEquals(10, scanner.stream().count());
+ }
+ }
+ }
+
+ // ensure a repeating IOException in an iterator times out eventually
+ iteratorSetting.addOption(ErrorThrowingIterator.TIMES, "1000000");
+ var executor = Executors.newCachedThreadPool();
+ try {
+ List<Future<?>> futures = new ArrayList<>();
+ for (var consistencyLevel : List.of(IMMEDIATE, EVENTUAL)) {
+ iteratorSetting.addOption(ErrorThrowingIterator.NAME,
random.nextLong() + "");
+ futures.add(executor
+ .submit(() -> expectScanTimeout(client, table, consistencyLevel,
iteratorSetting)));
+ iteratorSetting.addOption(ErrorThrowingIterator.NAME,
random.nextLong() + "");
+ futures.add(executor.submit(
+ () -> expectBatchScanTimeout(client, table, consistencyLevel,
iteratorSetting)));
+ }
+
+ for (var future : futures) {
+ future.get();
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+
+ }
+ }
+
+ @Test
+ public void testIOExceptionDuringScanFileOpen() throws Exception {
+
+ getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+
+ final String table = getUniqueNames(1)[0];
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
+ client.tableOperations().create(table);
+
+ try (var writer = client.createBatchWriter(table)) {
+ for (int i = 0; i < 10; i++) {
+ Mutation m = new Mutation("row" + i);
+ m.put("", "", "");
+ writer.addMutation(m);
+ }
+ }
+
+ client.tableOperations().flush(table, null, null, true);
+
+ var ctx = (ClientContext) client;
+ var tableId = ctx.getTableId(table);
+
+ // Delete the tablets file to cause an IOException during opening the
file. By default
+ // scanners will retry indefinitely when an IOException happens. Test
setting a timeout on the
+ // scans for this case.
+ try (var tablets =
ctx.getAmple().readTablets().forTable(tableId).fetch(FILES).build()) {
+ var tabletList = tablets.stream().collect(Collectors.toList());
+ assertEquals(1, tabletList.size());
+ for (var tablet : tabletList) {
+ var file =
tablet.getFiles().stream().collect(MoreCollectors.onlyElement());
+ assertTrue(getCluster().getFileSystem().delete(file.getPath(),
false));
+ }
+ }
+
+ // Run scans all concurrently to avoid waiting on each one to timeout
sequentially.
+ var executor = Executors.newCachedThreadPool();
+ try {
+ List<Future<?>> futures = new ArrayList<>();
+ for (var consistencyLevel : List.of(IMMEDIATE, EVENTUAL)) {
+ futures.add(executor.submit(() -> expectScanTimeout(client, table,
consistencyLevel)));
+ futures
+ .add(executor.submit(() -> expectBatchScanTimeout(client, table,
consistencyLevel)));
+ }
+
+ for (var future : futures) {
+ future.get();
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ private static void expectBatchScanTimeout(AccumuloClient client, String
table,
+ ConsistencyLevel consistencyLevel, IteratorSetting... iters) {
+ try (var scanner = client.createBatchScanner(table)) {
+ scanner.setRanges(List.of(new Range()));
+ scanner.setTimeout(5, TimeUnit.SECONDS);
+ scanner.setConsistencyLevel(consistencyLevel);
+ for (var iter : iters) {
+ scanner.addScanIterator(iter);
+ }
+ Timer timer = Timer.startNew();
+ assertThrows(TimedOutException.class, () -> scanner.stream().count());
+ long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+ assertTrue(elapsed >= 5000, () -> "elapsed : " + elapsed);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private static void expectScanTimeout(AccumuloClient client, String table,
+ ConsistencyLevel consistencyLevel, IteratorSetting... iters) {
+ try (var scanner = client.createScanner(table)) {
+ scanner.setTimeout(5, TimeUnit.SECONDS);
+ scanner.setConsistencyLevel(consistencyLevel);
+ for (var iter : iters) {
+ scanner.addScanIterator(iter);
+ }
+ Timer timer = Timer.startNew();
+ var exception = assertThrows(RuntimeException.class, () ->
scanner.stream().count());
+ assertEquals(ThriftScanner.ScanTimedOutException.class,
exception.getCause().getClass());
+ long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+ assertTrue(elapsed >= 5000, () -> "elapsed : " + elapsed);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
index fdbfcd820d..1152943247 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.test.functional;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.time.Duration;
@@ -32,7 +33,9 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.clientImpl.ThriftScanner;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -48,9 +51,10 @@ public class TimeoutIT extends AccumuloClusterHarness {
@Test
public void run() throws Exception {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
- String[] tableNames = getUniqueNames(2);
+ String[] tableNames = getUniqueNames(3);
testBatchWriterTimeout(client, tableNames[0]);
testBatchScannerTimeout(client, tableNames[1]);
+ testScannerTimeout(client, tableNames[2]);
}
}
@@ -92,9 +96,9 @@ public class TimeoutIT extends AccumuloClusterHarness {
bs.setRanges(Collections.singletonList(new Range()));
// should not timeout
+ bs.setTimeout(5, TimeUnit.SECONDS);
bs.forEach((k, v) -> {});
- bs.setTimeout(5, TimeUnit.SECONDS);
IteratorSetting iterSetting = new IteratorSetting(100,
SlowIterator.class);
iterSetting.addOption("sleepTime", 2000 + "");
bs.addScanIterator(iterSetting);
@@ -104,4 +108,32 @@ public class TimeoutIT extends AccumuloClusterHarness {
}
}
+ public void testScannerTimeout(AccumuloClient client, String tableName)
throws Exception {
+ client.tableOperations().create(tableName);
+
+ try (BatchWriter bw = client.createBatchWriter(tableName)) {
+ Mutation m = new Mutation("r1");
+ m.put("cf1", "cq1", "v1");
+ m.put("cf1", "cq2", "v2");
+ m.put("cf1", "cq3", "v3");
+ m.put("cf1", "cq4", "v4");
+ bw.addMutation(m);
+ }
+
+ try (Scanner scanner = client.createScanner(tableName)) {
+ scanner.setRange(new Range());
+
+ // should not timeout
+ scanner.setTimeout(5, TimeUnit.SECONDS);
+ scanner.forEach((k, v) -> {});
+
+ IteratorSetting iterSetting = new IteratorSetting(100,
SlowIterator.class);
+ iterSetting.addOption("sleepTime", 6000 + "");
+ scanner.addScanIterator(iterSetting);
+
+ var exception = assertThrows(RuntimeException.class, () ->
scanner.iterator().next(),
+ "scanner did not time out");
+ assertEquals(ThriftScanner.ScanTimedOutException.class,
exception.getCause().getClass());
+ }
+ }
}