This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit d80b9b4fd2487e2883aa25a4acb7b7982090093c Merge: 3fc54839b7 cf2f758947 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Sep 13 22:24:02 2024 +0000 Merge branch '3.1' .../core/client/admin/InstanceOperations.java | 1 + .../accumulo/core/clientImpl/ScannerImpl.java | 6 +- .../accumulo/core/clientImpl/ScannerIterator.java | 5 +- .../accumulo/core/clientImpl/ThriftScanner.java | 30 ++--- .../file/blockfile/impl/ScanCacheProvider.java | 19 +-- .../accumulo/core/logging/LoggingBlockCache.java | 131 +++++++++++++++++++ .../accumulo/core/summary/SummaryReader.java | 6 +- .../java/org/apache/accumulo/core/util/Timer.java | 11 +- server/monitor/pom.xml | 4 + .../java/org/apache/accumulo/monitor/Monitor.java | 60 ++++++--- .../rest/compactions/external/ECResource.java | 17 +-- .../accumulo/monitor/util/logging/RecentLogs.java | 48 ++++--- .../org/apache/accumulo/tserver/TabletServer.java | 37 +++--- .../accumulo/tserver/log/RecoveryLogsIterator.java | 85 ++++++------- .../accumulo/tserver/log/ResolvedSortedLog.java | 125 ++++++++++++++++++ .../accumulo/tserver/log/SortedLogRecovery.java | 66 ++++++---- .../accumulo/tserver/log/TabletServerLogger.java | 57 ++++++++- .../apache/accumulo/tserver/logger/LogReader.java | 5 +- .../tserver/memory/LargestFirstMemoryManager.java | 20 ++- .../accumulo/tserver/session/SessionManager.java | 109 +++++++--------- .../accumulo/tserver/tablet/SnapshotTablet.java | 15 +-- .../org/apache/accumulo/tserver/tablet/Tablet.java | 9 -- .../apache/accumulo/tserver/tablet/TabletBase.java | 9 ++ .../tserver/log/RecoveryLogsIteratorTest.java | 41 +++--- .../tserver/log/SortedLogRecoveryTest.java | 30 ++++- src/build/ci/find-unapproved-chars.sh | 2 +- .../org/apache/accumulo/test/CloseScannerIT.java | 9 +- .../org/apache/accumulo/test/ZombieScanIT.java | 54 +++++--- .../apache/accumulo/test/functional/ScanIdIT.java | 139 +++++++++++++++------ .../test/functional/ScanSessionTimeOutIT.java | 11 +- .../apache/accumulo/test/functional/ScannerIT.java | 64 +++++++--- 31 files changed, 837 insertions(+), 388 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index 90e271c706,abe644b81b..e70b0637d8 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@@ -378,261 -319,11 +376,259 @@@ public class ThriftScanner return (long) (Math.min(millis * 2, maxSleep) * (.9 + RANDOM.get().nextDouble() / 5)); } + private static Optional<ScanAddress> getScanServerAddress(ClientContext context, - ScanState scanState, CachedTablet loc, long timeOut, long startTime) { ++ ScanState scanState, CachedTablet loc, Duration timeOut, Timer scanTimer) { + Preconditions.checkArgument(scanState.runOnScanServer); + + ScanAddress addr = null; + + if (scanState.scanID != null && scanState.prevLoc != null + && scanState.prevLoc.serverType == ServerType.SSERVER + && scanState.prevLoc.getExtent().equals(loc.getExtent())) { + // this is the case of continuing a scan on a scan server for the same tablet, so lets not + // call the scan server selector and just go back to the previous scan server + addr = scanState.prevLoc; + log.trace( + "For tablet {} continuing scan on scan server {} without consulting scan server selector, using busyTimeout {}", + loc.getExtent(), addr.serverAddress, scanState.busyTimeout); + } else { + var tabletId = new TabletIdImpl(loc.getExtent()); + // obtain a snapshot once and only expose this snapshot to the plugin for consistency + var attempts = scanState.scanAttempts.snapshot(); + - Duration timeoutLeft = Duration.ofSeconds(timeOut) - .minus(Duration.ofMillis(System.currentTimeMillis() - startTime)); ++ Duration timeoutLeft = timeOut.minus(scanTimer.elapsed()); + + var params = new ScanServerSelector.SelectorParameters() { + + @Override + public List<TabletId> getTablets() { + return List.of(tabletId); + } + + @Override + public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) { + return attempts.getOrDefault(tabletId, Set.of()); + } + + @Override + public Map<String,String> getHints() { + if (scanState.executionHints == null) { + return Map.of(); + } + return scanState.executionHints; + } + + @Override + public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description) { + return ThriftScanner.waitUntil(condition, maxWaitTime, description, timeoutLeft, context, + loc.getExtent().tableId(), log); + } + }; + + ScanServerSelections actions = context.getScanServerSelector().selectServers(params); + + Duration delay = null; + + String scanServer = actions.getScanServer(tabletId); + if (scanServer != null) { + addr = new ScanAddress(scanServer, ServerType.SSERVER, loc); + delay = actions.getDelay(); + scanState.busyTimeout = actions.getBusyTimeout(); + log.trace("For tablet {} scan server selector chose scan_server:{} delay:{} busyTimeout:{}", + loc.getExtent(), scanServer, delay, scanState.busyTimeout); + } else { + Optional<String> tserverLoc = loc.getTserverLocation(); + + if (tserverLoc.isPresent()) { + addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), ServerType.TSERVER, loc); + delay = actions.getDelay(); + scanState.busyTimeout = Duration.ZERO; + log.trace("For tablet {} scan server selector chose tablet_server: {}", loc.getExtent(), + addr); + } else { + log.trace( + "For tablet {} scan server selector chose tablet_server, but the tablet is not currently hosted", + loc.getExtent()); + return Optional.empty(); + } + } + + if (!delay.isZero()) { + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + + return Optional.of(addr); + } + + /** + * @see ClientTabletCache#findTablet(ClientContext, Text, boolean, ClientTabletCache.LocationNeed, + * int, Range) + */ + private static int computeMinimumHostAhead(ScanState scanState, + ClientTabletCache.LocationNeed hostingNeed) { + int minimumHostAhead = 0; + + if (hostingNeed == ClientTabletCache.LocationNeed.REQUIRED) { + long currTime = System.nanoTime(); + + double timeRatio = 0; + + long totalTime = currTime - scanState.startTimeNanos; + if (totalTime > 0) { + // The following computes (total time spent in this method)/(total time scanning and time + // spent in this method) + timeRatio = (double) scanState.getNextScanAddressTimeNanos / (double) (totalTime); + } + + if (timeRatio < 0 || timeRatio > 1) { + log.warn("Computed ratio of time spent in getNextScanAddress has unexpected value : {} ", + timeRatio); + timeRatio = Math.max(0.0, Math.min(1.0, timeRatio)); + } + + // + // Do not want to host all tablets in the scan range in case not all data in the range is + // read. Need to determine how many tablets to host ahead of time. The following information + // is used to determine how many tablets to host ahead of time. + // + // 1. The number of tablets this scan has already read. The more tablets that this scan has + // read, the more likely that it will read many more tablets. + // + // 2. The timeRatio computed above. As timeRatio approaches 1.0 it means we are spending + // most of our time waiting on the next tablet to have an address. When are spending most of + // our time waiting for a tablet to have an address we want to increase the number of tablets + // we request to host ahead of time so we can hopefully spend less time waiting on that. + // + + if (timeRatio > .9) { + minimumHostAhead = 16; + } else if (timeRatio > .75) { + minimumHostAhead = 8; + } else if (timeRatio > .5) { + minimumHostAhead = 4; + } else if (timeRatio > .25) { + minimumHostAhead = 2; + } else { + minimumHostAhead = 1; + } + + minimumHostAhead = Math.min(scanState.tabletsScanned, minimumHostAhead); + } + return minimumHostAhead; + } + - static ScanAddress getNextScanAddress(ClientContext context, ScanState scanState, long timeOut, - long startTime, long maxSleepTime) ++ static ScanAddress getNextScanAddress(ClientContext context, ScanState scanState, ++ Duration timeOut, Timer scanTimer, long maxSleepTime) + throws TableNotFoundException, AccumuloSecurityException, AccumuloServerException, + InterruptedException, ScanTimedOutException, InvalidTabletHostingRequestException { + + String lastError = null; + String error = null; + long sleepMillis = 100; + + ScanAddress addr = null; + + var hostingNeed = scanState.runOnScanServer ? ClientTabletCache.LocationNeed.NOT_REQUIRED + : ClientTabletCache.LocationNeed.REQUIRED; + + int minimumHostAhead = computeMinimumHostAhead(scanState, hostingNeed); + + while (addr == null) { - long currentTime = System.currentTimeMillis(); - if ((currentTime - startTime) / 1000.0 > timeOut) { ++ if (scanTimer.hasElapsed(timeOut)) { + throw new ScanTimedOutException("Failed to locate next server to scan before timeout"); + } + + CachedTablet loc = null; + + Span child1 = TraceUtil.startSpan(ThriftScanner.class, "scan::locateTablet"); + try (Scope locateSpan = child1.makeCurrent()) { + + loc = ClientTabletCache.getInstance(context, scanState.tableId).findTablet(context, + scanState.startRow, scanState.skipStartRow, hostingNeed, minimumHostAhead, + scanState.range); + + if (loc == null) { + context.requireNotDeleted(scanState.tableId); + context.requireNotOffline(scanState.tableId, null); + + error = "Failed to locate tablet for table : " + scanState.tableId + " row : " + + scanState.startRow; + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + lastError = error; + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } else { + scanState.incrementTabletsScanned(loc.getExtent()); + + // when a tablet splits we do want to continue scanning the low child + // of the split if we are already passed it + Range dataRange = loc.getExtent().toDataRange(); + + if (scanState.range.getStartKey() != null + && dataRange.afterEndKey(scanState.range.getStartKey())) { + // go to the next tablet + scanState.startRow = loc.getExtent().endRow(); + scanState.skipStartRow = true; + // force another lookup + loc = null; + } else if (scanState.range.getEndKey() != null + && dataRange.beforeStartKey(scanState.range.getEndKey())) { + // should not happen + throw new IllegalStateException("Unexpected tablet, extent : " + loc.getExtent() + + " range : " + scanState.range + " startRow : " + scanState.startRow); + } + } + } catch (AccumuloServerException e) { + TraceUtil.setException(child1, e, true); + log.debug("Scan failed, server side exception : {}", e.getMessage()); + throw e; + } catch (AccumuloException e) { + error = "exception from tablet loc " + e.getMessage(); + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + + TraceUtil.setException(child1, e, false); + + lastError = error; + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } finally { + child1.end(); + } + + if (loc != null) { + if (scanState.runOnScanServer) { - addr = getScanServerAddress(context, scanState, loc, timeOut, startTime).orElse(null); ++ addr = getScanServerAddress(context, scanState, loc, timeOut, scanTimer).orElse(null); + if (addr == null && loc.getTserverLocation().isEmpty()) { + // wanted to fall back to tserver but tablet was not hosted so make another loop + hostingNeed = ClientTabletCache.LocationNeed.REQUIRED; + } + } else { + addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), ServerType.TSERVER, loc); + } + } + } + + return addr; + } + - public static List<KeyValue> scan(ClientContext context, ScanState scanState, long timeOut) + public static List<KeyValue> scan(ClientContext context, ScanState scanState, Duration timeOut) throws ScanTimedOutException, AccumuloException, AccumuloSecurityException, TableNotFoundException { - TabletLocation loc = null; + - long startTime = System.currentTimeMillis(); + Timer scanTimer = Timer.startNew(); String lastError = null; String error = null; int tooManyFilesCount = 0; @@@ -648,25 -339,70 +644,25 @@@ if (Thread.currentThread().isInterrupted()) { throw new AccumuloException("Thread interrupted"); } + - if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut) { + if (scanTimer.hasElapsed(timeOut)) { - throw new ScanTimedOutException(); + throw new ScanTimedOutException( + "Failed to retrieve next batch of key values before timeout"); } - while (loc == null) { - if (scanTimer.hasElapsed(timeOut)) { - throw new ScanTimedOutException(); + ScanAddress addr; + long beginTime = System.nanoTime(); + try { - addr = getNextScanAddress(context, scanState, timeOut, startTime, maxSleepTime); ++ addr = getNextScanAddress(context, scanState, timeOut, scanTimer, maxSleepTime); + } finally { + // track the initial time that we started tracking the time for getting the next scan + // address + if (scanState.startTimeNanos == 0) { + scanState.startTimeNanos = beginTime; } - Span child1 = TraceUtil.startSpan(ThriftScanner.class, "scan::locateTablet"); - try (Scope locateSpan = child1.makeCurrent()) { - loc = TabletLocator.getLocator(context, scanState.tableId).locateTablet(context, - scanState.startRow, scanState.skipStartRow, false); - - if (loc == null) { - context.requireNotDeleted(scanState.tableId); - context.requireNotOffline(scanState.tableId, null); - - error = "Failed to locate tablet for table : " + scanState.tableId + " row : " - + scanState.startRow; - if (!error.equals(lastError)) { - log.debug("{}", error); - } else if (log.isTraceEnabled()) { - log.trace("{}", error); - } - lastError = error; - sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); - } else { - // when a tablet splits we do want to continue scanning the low child - // of the split if we are already passed it - Range dataRange = loc.getExtent().toDataRange(); - - if (scanState.range.getStartKey() != null - && dataRange.afterEndKey(scanState.range.getStartKey())) { - // go to the next tablet - scanState.startRow = loc.getExtent().endRow(); - scanState.skipStartRow = true; - loc = null; - } else if (scanState.range.getEndKey() != null - && dataRange.beforeStartKey(scanState.range.getEndKey())) { - // should not happen - throw new IllegalStateException("Unexpected tablet, extent : " + loc.getExtent() - + " range : " + scanState.range + " startRow : " + scanState.startRow); - } - } - } catch (AccumuloServerException e) { - TraceUtil.setException(child1, e, true); - log.debug("Scan failed, server side exception : {}", e.getMessage()); - throw e; - } catch (AccumuloException e) { - error = "exception from tablet loc " + e.getMessage(); - if (!error.equals(lastError)) { - log.debug("{}", error); - } else if (log.isTraceEnabled()) { - log.trace("{}", error); - } - - TraceUtil.setException(child1, e, false); - - lastError = error; - sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); - } finally { - child1.end(); - } + // track the total amount of time spent getting the next scan address + scanState.getNextScanAddressTimeNanos += System.nanoTime() - beginTime; } Span child2 = TraceUtil.startSpan(ThriftScanner.class, "scan::location", diff --cc core/src/main/java/org/apache/accumulo/core/util/Timer.java index 3884a234bd,e0ace84c4a..88cfbeee76 --- a/core/src/main/java/org/apache/accumulo/core/util/Timer.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Timer.java @@@ -88,12 -88,13 +88,21 @@@ public final class Timer return unit.convert(getElapsedNanos(), TimeUnit.NANOSECONDS); } + /** + * @return true if this timer was started/reset after the other timer was started/reset, false + * otherwise + */ + public boolean startedAfter(Timer otherTimer) { + return (startNanos - otherTimer.startNanos) > 0; + } + + private static long toNanos(Duration duration) { + try { + // This can overflow when very large, such as when the + // duration is created using Long.MAX_VALUE millis + return duration.toNanos(); + } catch (ArithmeticException e) { + return Long.MAX_VALUE; + } + } } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index cb8f211fe2,eae6637702..8733ffdf42 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -25,10 -29,11 +25,11 @@@ import static org.apache.accumulo.core. import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask; import java.io.IOException; + import java.io.UncheckedIOException; import java.lang.management.ManagementFactory; +import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@@ -899,24 -1079,24 +898,24 @@@ public class TabletServer extends Abstr logger.minorCompactionStarted(tablet, lastUpdateSequence, newDataFileLocation, durability); } + public boolean needsRecovery(TabletMetadata tabletMetadata) { + + var logEntries = tabletMetadata.getLogs(); + + if (logEntries.isEmpty()) { + return false; + } + + try { + return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), logEntries); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + - public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries, + public void recover(VolumeManager fs, KeyExtent extent, Collection<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException { - List<Path> recoveryDirs = new ArrayList<>(); - for (LogEntry entry : logEntries) { - Path recovery = null; - Path finished = RecoveryPath.getRecoveryPath(new Path(entry.getPath())); - finished = SortedLogState.getFinishedMarkerPath(finished); - TabletServer.log.debug("Looking for " + finished); - if (fs.exists(finished)) { - recovery = finished.getParent(); - } - if (recovery == null) { - throw new IOException( - "Unable to find recovery files for extent " + extent + " logEntry: " + entry); - } - recoveryDirs.add(recovery); - } - logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver); + logger.recover(getContext(), extent, logEntries, tabletFiles, mutationReceiver); } public int createLogId() { diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 544f53acaa,a5f6963da3..cbc4f20e80 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@@ -510,11 -523,49 +523,49 @@@ public class TabletServerLogger return seq; } - public void recover(ServerContext context, KeyExtent extent, List<Path> recoveryDirs, + private List<ResolvedSortedLog> resolve(Collection<LogEntry> walogs) { + List<ResolvedSortedLog> sortedLogs = new ArrayList<>(walogs.size()); + for (var logEntry : walogs) { + var sortedLog = sortedLogCache.get(logEntry, le1 -> { + try { + return ResolvedSortedLog.resolve(le1, tserver.getVolumeManager()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + sortedLogs.add(sortedLog); + } + return sortedLogs; + } + + private CacheProvider createCacheProvider(TabletServerResourceManager resourceMgr) { + return new BasicCacheProvider( + LoggingBlockCache.wrap(CacheType.INDEX, resourceMgr.getIndexCache()), + LoggingBlockCache.wrap(CacheType.DATA, resourceMgr.getDataCache())); + } + + public boolean needsRecovery(ServerContext context, KeyExtent extent, Collection<LogEntry> walogs) + throws IOException { + try { + var resourceMgr = tserver.getResourceManager(); + var cacheProvider = createCacheProvider(resourceMgr); + SortedLogRecovery recovery = + new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); + return recovery.needsRecovery(extent, resolve(walogs)); + } catch (Exception e) { + throw new IOException(e); + } + } + - public void recover(ServerContext context, KeyExtent extent, List<LogEntry> walogs, ++ public void recover(ServerContext context, KeyExtent extent, Collection<LogEntry> walogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException { try { - SortedLogRecovery recovery = new SortedLogRecovery(context); - recovery.recover(extent, recoveryDirs, tabletFiles, mr); + var resourceMgr = tserver.getResourceManager(); + var cacheProvider = createCacheProvider(resourceMgr); + SortedLogRecovery recovery = + new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); + recovery.recover(extent, resolve(walogs), tabletFiles, mr); } catch (Exception e) { throw new IOException(e); } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index a9b571078e,287bee16f3..1ce57c87e4 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -998,18 -1111,11 +998,9 @@@ public class Tablet extends TabletBase // close data files getTabletResources().close(); - if (completeClose) { - closeState = CloseState.COMPLETE; - } + closeState = CloseState.COMPLETE; } - private boolean disallowNewReservations(ScanParameters scanParameters) { - var scanSessId = scanParameters.getScanSessionId(); - if (scanSessId != null) { - return getTabletServer().getSessionManager().disallowNewReservations(scanSessId); - } else { - return true; - } - } - private void closeConsistencyCheck() { long num = tabletMemory.getMemTable().getNumEntries(); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 3203134499,58fc563bba..d79d8408ec --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@@ -113,9 -114,18 +113,18 @@@ public abstract class TabletBase } } + protected boolean disallowNewReservations(ScanParameters scanParameters) { + var scanSessId = scanParameters.getScanSessionId(); + if (scanSessId != null) { + return server.getSessionManager().disallowNewReservations(scanSessId); + } else { + return true; + } + } + public abstract boolean isClosed(); - public abstract SortedMap<StoredTabletFile,DataFileValue> getDatafiles(); + public abstract Map<StoredTabletFile,DataFileValue> getDatafiles(); public abstract void addToYieldMetric(int i); diff --cc test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java index d5c7c3e46b,0461ba53ec..399a208d0a --- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@@ -252,8 -259,18 +259,16 @@@ public class ZombieScanIT extends Confi String table = getUniqueNames(1)[0]; + final ServerType serverType = consistency == IMMEDIATE ? TABLET_SERVER : SCAN_SERVER; + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + if (serverType == SCAN_SERVER) { - getCluster().getConfig().setNumScanServers(1); - getCluster().getClusterControl().startAllServers(SCAN_SERVER); + // Scans will fall back to tablet servers when no scan servers are present. So wait for scan + // servers to show up in zookeeper. Can remove this in 3.1. + Wait.waitFor(() -> !c.instanceOperations().getScanServers().isEmpty()); + } + c.tableOperations().create(table); var executor = Executors.newCachedThreadPool(); @@@ -314,11 -331,15 +329,10 @@@ Wait.waitFor(() -> getZombieScansMetric() == 6); - assertEquals(6, countActiveScans(c, table)); + assertEquals(6, countActiveScans(c, serverType, table)); executor.shutdownNow(); - } finally { - if (serverType == SCAN_SERVER) { - getCluster().getConfig().setNumScanServers(0); - getCluster().getClusterControl().stopAllServers(SCAN_SERVER); - } } - } private static long countLocations(String table, AccumuloClient client) throws Exception { diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java index c5069b5323,fae0aa42e7..ef5089c961 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java @@@ -121,10 -129,20 +129,18 @@@ public class ScannerIT extends Configur /** * {@link CloseScannerIT#testManyScans()} is a similar test. */ - @Test - public void testSessionCleanup() throws Exception { + @ParameterizedTest + @EnumSource + public void testSessionCleanup(ConsistencyLevel consistency) throws Exception { final String tableName = getUniqueNames(1)[0]; - try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { + final ServerType serverType = consistency == IMMEDIATE ? TABLET_SERVER : SCAN_SERVER; + try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProperties()).build()) { + + if (serverType == SCAN_SERVER) { - getCluster().getConfig().setNumScanServers(1); - getCluster().getClusterControl().startAllServers(SCAN_SERVER); + // Scans will fall back to tablet servers when no scan servers are present. So wait for scan + // servers to show up in zookeeper. Can remove this in 3.1. + Wait.waitFor(() -> !accumuloClient.instanceOperations().getScanServers().isEmpty()); + } accumuloClient.tableOperations().create(tableName); @@@ -183,9 -209,14 +207,9 @@@ scanner.setRanges(List.of(new Range())); assertEquals(100000, scanner.stream().count()); assertEquals(100000, scanner.stream().count()); - assertEquals(0, countActiveScans(accumuloClient, tableName)); + assertEquals(0, countActiveScans(accumuloClient, serverType, tableName)); } } - } finally { - if (serverType == SCAN_SERVER) { - getCluster().getConfig().setNumScanServers(0); - getCluster().getClusterControl().stopAllServers(SCAN_SERVER); - } } }