This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 5f1628245eeafe99ed3c7a19a0b72a739e80fb7b Merge: da4ff8c98d c4b2cf3fe9 Author: Keith Turner <[email protected]> AuthorDate: Thu Apr 16 14:46:18 2026 +0000 Merge branch '2.1' .../apache/accumulo/core/client/ScannerBase.java | 4 +- .../accumulo/core/clientImpl/ScannerIterator.java | 4 +- .../TabletServerBatchReaderIterator.java | 82 ++++++--- .../accumulo/core/clientImpl/ThriftScanner.java | 5 +- .../apache/accumulo/tserver/scan/LookupTask.java | 6 +- .../test/functional/ErrorThrowingIterator.java | 58 ++++-- .../apache/accumulo/test/functional/ScannerIT.java | 202 +++++++++++++++++++++ .../apache/accumulo/test/functional/TimeoutIT.java | 36 +++- 8 files changed, 350 insertions(+), 47 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 372d119f05,53a44dc0bf..267ca5b2db --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@@ -799,43 -733,61 +800,60 @@@ public final class TabletServerBatchRea private static class TimeoutTracker { - final String server; - final Set<String> badServers; + String server; + Set<String> badServers; final long timeOut; - CountDownTimer timeoutCountDownTimer; + + // When failures happen, rpc task to scan a server may be requeued in a thread pool. These two + // variables track failures across task running in those thread pools. - Long firstErrorTime = null; - Long firstAllFailureTime = null; + CountDownTimer errorTimer; ++ CountDownTimer failureTimer; TimeoutTracker(String server, Set<String> badServers, long timeOut) { - this.timeOut = timeOut; + this(timeOut); - this.server = server; + this.server = Objects.requireNonNull(server); this.badServers = badServers; } TimeoutTracker(long timeOut) { this.timeOut = timeOut; - this.timeoutCountDownTimer = CountDownTimer.startNew(timeOut, MILLISECONDS); - this.badServers = null; - this.server = null; } - void startingScan() { - timeoutCountDownTimer.restart(); - } + class Session { - long activityTime; ++ final CountDownTimer timeoutCountDownTimer; + - void check() throws IOException { - if (timeoutCountDownTimer.isExpired()) { - badServers.add(server); - throw new IOException("Time exceeded " + timeOut + " ms for server " + server); ++ Session() { ++ timeoutCountDownTimer = CountDownTimer.startNew(timeOut, MILLISECONDS); ++ } + + void check() throws IOException { - if (System.currentTimeMillis() - activityTime > timeOut) { ++ if (timeoutCountDownTimer.isExpired()) { + badServers.add(server); - throw new IOException( - "Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server); ++ throw new IOException("Time exceeded " + timeOut + " ms for server " + server); + } + } + + void madeProgress() { - activityTime = System.currentTimeMillis(); ++ timeoutCountDownTimer.restart(); + synchronized (TimeoutTracker.this) { - firstErrorTime = null; - firstAllFailureTime = null; ++ errorTimer = null; ++ failureTimer = null; + } } } - void madeProgress() { - timeoutCountDownTimer.restart(); - errorTimer = null; + /** - * Multiple threads can scan different exents on the same server at the same time. The session ++ * Multiple threads can scan different extents on the same server at the same time. The session + * allows each potential rpc thread to have its own activityTime. + */ - Session startingScan() throws IOException { - var session = new Session(); - session.activityTime = System.currentTimeMillis(); - return session; ++ Session startingScan() { ++ return new Session(); } - void errorOccured() { + synchronized void errorOccured(Session session) { - if (firstErrorTime == null) { - firstErrorTime = session.activityTime; - } else if (System.currentTimeMillis() - firstErrorTime > timeOut) { + if (errorTimer == null) { - errorTimer = CountDownTimer.startNew(timeOut, MILLISECONDS); ++ errorTimer = CountDownTimer.startNew(session.timeoutCountDownTimer.timeLeft()); + } else if (errorTimer.isExpired()) { badServers.add(server); } } @@@ -843,8 -795,27 +861,17 @@@ public long getTimeOut() { return timeOut; } + + synchronized void sawOnlyFailures(Session session) throws IOException { - if (firstAllFailureTime == null) { - firstAllFailureTime = session.activityTime; - } else if (System.currentTimeMillis() - firstAllFailureTime > timeOut) { ++ if (failureTimer == null) { ++ failureTimer = CountDownTimer.startNew(session.timeoutCountDownTimer.timeLeft()); ++ } else if (failureTimer.isExpired()) { + badServers.add(server); - throw new IOException( - "Time exceeded " + (System.currentTimeMillis() - firstAllFailureTime) + " " + server); ++ throw new IOException("Time exceeded " + timeOut + " ms for server " + server); + } + } } - public static void doLookup(ClientContext context, String server, - Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, - Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns, - ScannerOptions options, Authorizations authorizations) - throws IOException, AccumuloSecurityException, AccumuloServerException { - doLookup(context, server, requested, failures, unscanned, receiver, columns, options, - authorizations, new TimeoutTracker(Long.MAX_VALUE), 0L); - } - static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns, ScannerOptions options, diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index b3f660ac2f,e418642913..b7fe95bbd4 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@@ -376,262 -321,13 +376,261 @@@ public final class ThriftScanner Thread.sleep(millis); } // wait 2 * last time, with +-10% random jitter - return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble() / 5)); + return (long) (Math.min(millis * 2, maxSleep) * (.9 + RANDOM.get().nextDouble() / 5)); + } + + private static Optional<ScanAddress> getScanServerAddress(ClientContext context, + ScanState scanState, CachedTablet loc, Duration timeOut, Timer scanTimer) { + Preconditions.checkArgument(scanState.runOnScanServer); + + ScanAddress addr = null; + + if (scanState.scanID != null && scanState.prevLoc != null + && scanState.prevLoc.serverType == ServerType.SSERVER + && scanState.prevLoc.getExtent().equals(loc.getExtent())) { + // this is the case of continuing a scan on a scan server for the same tablet, so lets not + // call the scan server selector and just go back to the previous scan server + addr = scanState.prevLoc; + log.trace( + "For tablet {} continuing scan {} on scan server {} without consulting scan server selector, using busyTimeout {}", + loc.getExtent(), scanState.scanID, addr.serverAddress, scanState.busyTimeout); + } else { + var tabletId = new TabletIdImpl(loc.getExtent()); + // obtain a snapshot once and only expose this snapshot to the plugin for consistency + var attempts = scanState.scanAttempts.snapshot(); + + Duration timeoutLeft = timeOut.minus(scanTimer.elapsed()); + + var params = new ScanServerSelector.SelectorParameters() { + + @Override + public List<TabletId> getTablets() { + return List.of(tabletId); + } + + @Override + public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) { + return attempts.getOrDefault(tabletId, Set.of()); + } + + @Override + public Map<String,String> getHints() { + if (scanState.executionHints == null) { + return Map.of(); + } + return scanState.executionHints; + } + + @Override + public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description) { + return ThriftScanner.waitUntil(condition, maxWaitTime, description, timeoutLeft, context, + loc.getExtent().tableId(), log); + } + }; + + ScanServerSelections actions = context.getScanServerSelector().selectServers(params); + + Duration delay = null; + + String scanServer = actions.getScanServer(tabletId); + if (scanServer != null) { + addr = new ScanAddress(scanServer, ServerType.SSERVER, loc); + delay = actions.getDelay(); + scanState.busyTimeout = actions.getBusyTimeout(); + log.trace("For tablet {} scan server selector chose scan_server:{} delay:{} busyTimeout:{}", + loc.getExtent(), scanServer, delay, scanState.busyTimeout); + } else { + Optional<String> tserverLoc = loc.getTserverLocation(); + + if (tserverLoc.isPresent()) { + addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), ServerType.TSERVER, loc); + delay = actions.getDelay(); + scanState.busyTimeout = Duration.ZERO; + log.trace("For tablet {} scan server selector chose tablet_server: {}", loc.getExtent(), + addr.serverAddress); + } else { + log.trace( + "For tablet {} scan server selector chose tablet_server, but the tablet is not currently hosted", + loc.getExtent()); + return Optional.empty(); + } + } + + if (!delay.isZero()) { + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + + return Optional.of(addr); + } + + /** + * @see ClientTabletCache#findTablet(ClientContext, Text, boolean, ClientTabletCache.LocationNeed, + * int, Range) + */ + private static int computeMinimumHostAhead(ScanState scanState, + ClientTabletCache.LocationNeed hostingNeed) { + int minimumHostAhead = 0; + + if (hostingNeed == ClientTabletCache.LocationNeed.REQUIRED) { + long currTime = System.nanoTime(); + + double timeRatio = 0; + + long totalTime = currTime - scanState.startTimeNanos; + if (totalTime > 0) { + // The following computes (total time spent in this method)/(total time scanning and time + // spent in this method) + timeRatio = (double) scanState.getNextScanAddressTimeNanos / (double) (totalTime); + } + + if (timeRatio < 0 || timeRatio > 1) { + log.warn("Computed ratio of time spent in getNextScanAddress has unexpected value : {} ", + timeRatio); + timeRatio = Math.max(0.0, Math.min(1.0, timeRatio)); + } + + // + // Do not want to host all tablets in the scan range in case not all data in the range is + // read. Need to determine how many tablets to host ahead of time. The following information + // is used to determine how many tablets to host ahead of time. + // + // 1. The number of tablets this scan has already read. The more tablets that this scan has + // read, the more likely that it will read many more tablets. + // + // 2. The timeRatio computed above. As timeRatio approaches 1.0 it means we are spending + // most of our time waiting on the next tablet to have an address. When are spending most of + // our time waiting for a tablet to have an address we want to increase the number of tablets + // we request to host ahead of time so we can hopefully spend less time waiting on that. + // + + if (timeRatio > .9) { + minimumHostAhead = 16; + } else if (timeRatio > .75) { + minimumHostAhead = 8; + } else if (timeRatio > .5) { + minimumHostAhead = 4; + } else if (timeRatio > .25) { + minimumHostAhead = 2; + } else { + minimumHostAhead = 1; + } + + minimumHostAhead = Math.min(scanState.tabletsScanned, minimumHostAhead); + } + return minimumHostAhead; + } + + static ScanAddress getNextScanAddress(ClientContext context, ScanState scanState, + Duration timeOut, Timer scanTimer, long maxSleepTime) + throws TableNotFoundException, AccumuloSecurityException, AccumuloServerException, + InterruptedException, ScanTimedOutException, InvalidTabletHostingRequestException { + + String lastError = null; + String error = null; + long sleepMillis = 100; + + ScanAddress addr = null; + + var hostingNeed = scanState.runOnScanServer ? ClientTabletCache.LocationNeed.NOT_REQUIRED + : ClientTabletCache.LocationNeed.REQUIRED; + + int minimumHostAhead = computeMinimumHostAhead(scanState, hostingNeed); + + while (addr == null) { + if (scanTimer.hasElapsed(timeOut)) { + throw new ScanTimedOutException("Failed to locate next server to scan before timeout"); + } + + CachedTablet loc = null; + + Span child1 = TraceUtil.startSpan(ThriftScanner.class, "scan::locateTablet"); + try (Scope locateSpan = child1.makeCurrent()) { + + loc = context.getTabletLocationCache(scanState.tableId).findTablet(context, + scanState.startRow, scanState.skipStartRow, hostingNeed, minimumHostAhead, + scanState.range); + + if (loc == null) { + context.requireNotDeleted(scanState.tableId); + context.requireNotOffline(scanState.tableId, null); + + error = "Failed to locate tablet for table : " + scanState.tableId + " row : " + + scanState.startRow; + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + lastError = error; + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } else { + scanState.incrementTabletsScanned(loc.getExtent()); + + // when a tablet splits we do want to continue scanning the low child + // of the split if we are already passed it + Range dataRange = loc.getExtent().toDataRange(); + + if (scanState.range.getStartKey() != null + && dataRange.afterEndKey(scanState.range.getStartKey())) { + // go to the next tablet + scanState.startRow = loc.getExtent().endRow(); + scanState.skipStartRow = true; + // force another lookup + loc = null; + } else if (scanState.range.getEndKey() != null + && dataRange.beforeStartKey(scanState.range.getEndKey())) { + // should not happen + throw new IllegalStateException("Unexpected tablet, extent : " + loc.getExtent() + + " range : " + scanState.range + " startRow : " + scanState.startRow); + } + } + } catch (AccumuloServerException e) { + TraceUtil.setException(child1, e, true); + log.debug("Scan failed, server side exception : {}", e.getMessage()); + throw e; + } catch (AccumuloException e) { + error = "exception from tablet loc " + e.getMessage(); + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + + TraceUtil.setException(child1, e, false); + + lastError = error; + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } finally { + child1.end(); + } + + if (loc != null) { + if (scanState.runOnScanServer) { + addr = getScanServerAddress(context, scanState, loc, timeOut, scanTimer).orElse(null); + if (addr == null && loc.getTserverLocation().isEmpty()) { + // wanted to fall back to tserver but tablet was not hosted so make another loop + hostingNeed = ClientTabletCache.LocationNeed.REQUIRED; + } + } else { + addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), ServerType.TSERVER, loc); + } + } + } + + return addr; } - public static List<KeyValue> scan(ClientContext context, ScanState scanState, Duration timeOut) - throws ScanTimedOutException, AccumuloException, AccumuloSecurityException, + public static List<KeyValue> scan(ClientContext context, ScanState scanState, Duration timeOut, + Timer scanTimer) throws ScanTimedOutException, AccumuloException, AccumuloSecurityException, TableNotFoundException { - TabletLocation loc = null; + - Timer scanTimer = Timer.startNew(); String lastError = null; String error = null; int tooManyFilesCount = 0; diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java index 44b29b84a3,2a6d5d9287..c0584b139c --- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java @@@ -37,7 -47,10 +47,11 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; + import org.apache.accumulo.core.client.TimedOutException; +import org.apache.accumulo.core.client.admin.servers.ServerId; + import org.apache.accumulo.core.clientImpl.ClientContext; + import org.apache.accumulo.core.clientImpl.ThriftScanner; + import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@@ -45,9 -58,13 +59,12 @@@ import org.apache.accumulo.core.data.Ra import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Timer; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.CloseScannerIT; import org.apache.accumulo.test.util.Wait; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@@ -59,6 -78,11 +78,11 @@@ public class ScannerIT extends Configur return Duration.ofMinutes(1); } + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumScanServers(1); ++ cfg.getClusterServerConfiguration().setNumDefaultScanServers(1); + } + @Test public void testScannerReadaheadConfiguration() throws Exception { final String table = getUniqueNames(1)[0]; @@@ -227,7 -257,189 +251,185 @@@ throw new IllegalArgumentException("Unsupported server type " + serverType); } - long count = 0; - for (String server : servers) { - count += c.instanceOperations().getActiveScans(server).stream() - .filter(activeScan -> activeScan.getTable().equals(tableName)).count(); - } - return count; + return c.instanceOperations().getActiveScans(servers).stream() + .filter(activeScan -> activeScan.getTable().equals(tableName)).count(); } + + @Test + public void testIOExceptionDuringScanIterator() throws Exception { + + getCluster().getClusterControl().startAllServers(SCAN_SERVER); + var random = new SecureRandom(); + + Properties props = getClientProperties(); + // configure scan server not to fallback to tablet servers + String profiles = "[{'isDefault':true,'maxBusyTimeout':'1s', 'busyTimeoutMultiplier':8," + + "'timeToWaitForScanServers':10h, " + + "'attemptPlans':[{'servers':'3', 'busyTimeout':'100ms'}]}]"; + props.put(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() + "profiles", profiles); + + final String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(props).build()) { + client.tableOperations().create(table); + + try (var writer = client.createBatchWriter(table)) { + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation("row" + i); + m.put("", "", ""); + writer.addMutation(m); + } + } + + // need to flush data to disk so its visible to scan server + client.tableOperations().flush(table, null, null, true); + + IteratorSetting iteratorSetting = new IteratorSetting(1000, ErrorThrowingIterator.class); + iteratorSetting.addOption(ErrorThrowingIterator.TIMES, "3"); + // Set a single row to fail so that after splitting some tablets fail and some do not fail. + iteratorSetting.addOption(ErrorThrowingIterator.ROW, "row5"); + + // The batch scanner sends multiple extents in a single RPC. Need to try a mixture of failing + // and non failing extents for this RPC, so test w/ single tablet and three tablets. + for (List<String> splitsToAdd : List.of(List.<String>of(), List.of("row3", "row7"))) { + if (!splitsToAdd.isEmpty()) { + TreeSet<Text> splits = + splitsToAdd.stream().map(Text::new).collect(Collectors.toCollection(TreeSet::new)); + client.tableOperations().addSplits(table, splits); + // The scan server would not see these splits as it caches tablet info for a bit + getCluster().getClusterControl().stopAllServers(SCAN_SERVER); + getCluster().getClusterControl().startAllServers(SCAN_SERVER); + } + // try tablet and scan server to ensure both have same behavior + for (var cl : ConsistencyLevel.values()) { + log.debug("Starting scan {} {}", cl, splitsToAdd); + try (var scanner = client.createScanner(table)) { + iteratorSetting.addOption(ErrorThrowingIterator.NAME, random.nextLong() + ""); + scanner.addScanIterator(iteratorSetting); + scanner.setConsistencyLevel(cl); + assertEquals(10, scanner.stream().count()); + } + + log.debug("Starting batch scan {} {}", cl, splitsToAdd); + iteratorSetting.addOption(ErrorThrowingIterator.NAME, random.nextLong() + ""); + try (var scanner = client.createBatchScanner(table)) { + scanner.setRanges(List.of(new Range())); + scanner.addScanIterator(iteratorSetting); + scanner.setConsistencyLevel(cl); + assertEquals(10, scanner.stream().count()); + } + } + } + + // ensure a repeating IOException in an iterator times out eventually + iteratorSetting.addOption(ErrorThrowingIterator.TIMES, "1000000"); + var executor = Executors.newCachedThreadPool(); + try { + List<Future<?>> futures = new ArrayList<>(); + for (var consistencyLevel : List.of(IMMEDIATE, EVENTUAL)) { + iteratorSetting.addOption(ErrorThrowingIterator.NAME, random.nextLong() + ""); + futures.add(executor + .submit(() -> expectScanTimeout(client, table, consistencyLevel, iteratorSetting))); + iteratorSetting.addOption(ErrorThrowingIterator.NAME, random.nextLong() + ""); + futures.add(executor.submit( + () -> expectBatchScanTimeout(client, table, consistencyLevel, iteratorSetting))); + } + + for (var future : futures) { + future.get(); + } + } finally { + executor.shutdownNow(); + } + + } + } + + @Test + public void testIOExceptionDuringScanFileOpen() throws Exception { + + getCluster().getClusterControl().startAllServers(SCAN_SERVER); + + final String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + client.tableOperations().create(table); + + try (var writer = client.createBatchWriter(table)) { + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation("row" + i); + m.put("", "", ""); + writer.addMutation(m); + } + } + + client.tableOperations().flush(table, null, null, true); + + var ctx = (ClientContext) client; + var tableId = ctx.getTableId(table); + + // Delete the tablets file to cause an IOException during opening the file. By default + // scanners will retry indefinitely when an IOException happens. Test setting a timeout on the + // scans for this case. + try (var tablets = ctx.getAmple().readTablets().forTable(tableId).fetch(FILES).build()) { + var tabletList = tablets.stream().collect(Collectors.toList()); + assertEquals(1, tabletList.size()); + for (var tablet : tabletList) { + var file = tablet.getFiles().stream().collect(MoreCollectors.onlyElement()); + assertTrue(getCluster().getFileSystem().delete(file.getPath(), false)); + } + } + + // Run scans all concurrently to avoid waiting on each one to timeout sequentially. + var executor = Executors.newCachedThreadPool(); + try { + List<Future<?>> futures = new ArrayList<>(); + for (var consistencyLevel : List.of(IMMEDIATE, EVENTUAL)) { + futures.add(executor.submit(() -> expectScanTimeout(client, table, consistencyLevel))); + futures + .add(executor.submit(() -> expectBatchScanTimeout(client, table, consistencyLevel))); + } + + for (var future : futures) { + future.get(); + } + } finally { + executor.shutdownNow(); + } + } + } + + private static void expectBatchScanTimeout(AccumuloClient client, String table, + ConsistencyLevel consistencyLevel, IteratorSetting... iters) { + try (var scanner = client.createBatchScanner(table)) { + scanner.setRanges(List.of(new Range())); + scanner.setTimeout(5, TimeUnit.SECONDS); + scanner.setConsistencyLevel(consistencyLevel); + for (var iter : iters) { + scanner.addScanIterator(iter); + } + Timer timer = Timer.startNew(); + assertThrows(TimedOutException.class, () -> scanner.stream().count()); + long elapsed = timer.elapsed(TimeUnit.MILLISECONDS); + assertTrue(elapsed >= 5000, () -> "elapsed : " + elapsed); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private static void expectScanTimeout(AccumuloClient client, String table, + ConsistencyLevel consistencyLevel, IteratorSetting... iters) { + try (var scanner = client.createScanner(table)) { + scanner.setTimeout(5, TimeUnit.SECONDS); + scanner.setConsistencyLevel(consistencyLevel); + for (var iter : iters) { + scanner.addScanIterator(iter); + } + Timer timer = Timer.startNew(); + var exception = assertThrows(RuntimeException.class, () -> scanner.stream().count()); + assertEquals(ThriftScanner.ScanTimedOutException.class, exception.getCause().getClass()); + long elapsed = timer.elapsed(TimeUnit.MILLISECONDS); + assertTrue(elapsed >= 5000, () -> "elapsed : " + elapsed); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } } diff --cc test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java index b1b17a6b54,1152943247..53b5607bb4 --- a/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java @@@ -18,7 -18,8 +18,8 @@@ */ package org.apache.accumulo.test.functional; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.SECONDS; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import java.time.Duration; @@@ -91,9 -96,9 +95,9 @@@ public class TimeoutIT extends Accumulo bs.setRanges(Collections.singletonList(new Range())); // should not timeout - bs.setTimeout(5, TimeUnit.SECONDS); ++ bs.setTimeout(5, SECONDS); bs.forEach((k, v) -> {}); - bs.setTimeout(5, SECONDS); IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class); iterSetting.addOption("sleepTime", 2000 + ""); bs.addScanIterator(iterSetting); @@@ -103,4 -108,32 +107,32 @@@ } } + public void testScannerTimeout(AccumuloClient client, String tableName) throws Exception { + client.tableOperations().create(tableName); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + Mutation m = new Mutation("r1"); + m.put("cf1", "cq1", "v1"); + m.put("cf1", "cq2", "v2"); + m.put("cf1", "cq3", "v3"); + m.put("cf1", "cq4", "v4"); + bw.addMutation(m); + } + + try (Scanner scanner = client.createScanner(tableName)) { + scanner.setRange(new Range()); + + // should not timeout - scanner.setTimeout(5, TimeUnit.SECONDS); ++ scanner.setTimeout(5, SECONDS); + scanner.forEach((k, v) -> {}); + + IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class); + iterSetting.addOption("sleepTime", 6000 + ""); + scanner.addScanIterator(iterSetting); + + var exception = assertThrows(RuntimeException.class, () -> scanner.iterator().next(), + "scanner did not time out"); + assertEquals(ThriftScanner.ScanTimedOutException.class, exception.getCause().getClass()); + } + } }
