This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit d80b9b4fd2487e2883aa25a4acb7b7982090093c
Merge: 3fc54839b7 cf2f758947
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Sep 13 22:24:02 2024 +0000

    Merge branch '3.1'

 .../core/client/admin/InstanceOperations.java      |   1 +
 .../accumulo/core/clientImpl/ScannerImpl.java      |   6 +-
 .../accumulo/core/clientImpl/ScannerIterator.java  |   5 +-
 .../accumulo/core/clientImpl/ThriftScanner.java    |  30 ++---
 .../file/blockfile/impl/ScanCacheProvider.java     |  19 +--
 .../accumulo/core/logging/LoggingBlockCache.java   | 131 +++++++++++++++++++
 .../accumulo/core/summary/SummaryReader.java       |   6 +-
 .../java/org/apache/accumulo/core/util/Timer.java  |  11 +-
 server/monitor/pom.xml                             |   4 +
 .../java/org/apache/accumulo/monitor/Monitor.java  |  60 ++++++---
 .../rest/compactions/external/ECResource.java      |  17 +--
 .../accumulo/monitor/util/logging/RecentLogs.java  |  48 ++++---
 .../org/apache/accumulo/tserver/TabletServer.java  |  37 +++---
 .../accumulo/tserver/log/RecoveryLogsIterator.java |  85 ++++++-------
 .../accumulo/tserver/log/ResolvedSortedLog.java    | 125 ++++++++++++++++++
 .../accumulo/tserver/log/SortedLogRecovery.java    |  66 ++++++----
 .../accumulo/tserver/log/TabletServerLogger.java   |  57 ++++++++-
 .../apache/accumulo/tserver/logger/LogReader.java  |   5 +-
 .../tserver/memory/LargestFirstMemoryManager.java  |  20 ++-
 .../accumulo/tserver/session/SessionManager.java   | 109 +++++++---------
 .../accumulo/tserver/tablet/SnapshotTablet.java    |  15 +--
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   9 --
 .../apache/accumulo/tserver/tablet/TabletBase.java |   9 ++
 .../tserver/log/RecoveryLogsIteratorTest.java      |  41 +++---
 .../tserver/log/SortedLogRecoveryTest.java         |  30 ++++-
 src/build/ci/find-unapproved-chars.sh              |   2 +-
 .../org/apache/accumulo/test/CloseScannerIT.java   |   9 +-
 .../org/apache/accumulo/test/ZombieScanIT.java     |  54 +++++---
 .../apache/accumulo/test/functional/ScanIdIT.java  | 139 +++++++++++++++------
 .../test/functional/ScanSessionTimeOutIT.java      |  11 +-
 .../apache/accumulo/test/functional/ScannerIT.java |  64 +++++++---
 31 files changed, 837 insertions(+), 388 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 90e271c706,abe644b81b..e70b0637d8
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@@ -378,261 -319,11 +376,259 @@@ public class ThriftScanner 
      return (long) (Math.min(millis * 2, maxSleep) * (.9 + 
RANDOM.get().nextDouble() / 5));
    }
  
 +  private static Optional<ScanAddress> getScanServerAddress(ClientContext 
context,
-       ScanState scanState, CachedTablet loc, long timeOut, long startTime) {
++      ScanState scanState, CachedTablet loc, Duration timeOut, Timer 
scanTimer) {
 +    Preconditions.checkArgument(scanState.runOnScanServer);
 +
 +    ScanAddress addr = null;
 +
 +    if (scanState.scanID != null && scanState.prevLoc != null
 +        && scanState.prevLoc.serverType == ServerType.SSERVER
 +        && scanState.prevLoc.getExtent().equals(loc.getExtent())) {
 +      // this is the case of continuing a scan on a scan server for the same 
tablet, so lets not
 +      // call the scan server selector and just go back to the previous scan 
server
 +      addr = scanState.prevLoc;
 +      log.trace(
 +          "For tablet {} continuing scan on scan server {} without consulting 
scan server selector, using busyTimeout {}",
 +          loc.getExtent(), addr.serverAddress, scanState.busyTimeout);
 +    } else {
 +      var tabletId = new TabletIdImpl(loc.getExtent());
 +      // obtain a snapshot once and only expose this snapshot to the plugin 
for consistency
 +      var attempts = scanState.scanAttempts.snapshot();
 +
-       Duration timeoutLeft = Duration.ofSeconds(timeOut)
-           .minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
++      Duration timeoutLeft = timeOut.minus(scanTimer.elapsed());
 +
 +      var params = new ScanServerSelector.SelectorParameters() {
 +
 +        @Override
 +        public List<TabletId> getTablets() {
 +          return List.of(tabletId);
 +        }
 +
 +        @Override
 +        public Collection<? extends ScanServerAttempt> getAttempts(TabletId 
tabletId) {
 +          return attempts.getOrDefault(tabletId, Set.of());
 +        }
 +
 +        @Override
 +        public Map<String,String> getHints() {
 +          if (scanState.executionHints == null) {
 +            return Map.of();
 +          }
 +          return scanState.executionHints;
 +        }
 +
 +        @Override
 +        public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, 
Duration maxWaitTime,
 +            String description) {
 +          return ThriftScanner.waitUntil(condition, maxWaitTime, description, 
timeoutLeft, context,
 +              loc.getExtent().tableId(), log);
 +        }
 +      };
 +
 +      ScanServerSelections actions = 
context.getScanServerSelector().selectServers(params);
 +
 +      Duration delay = null;
 +
 +      String scanServer = actions.getScanServer(tabletId);
 +      if (scanServer != null) {
 +        addr = new ScanAddress(scanServer, ServerType.SSERVER, loc);
 +        delay = actions.getDelay();
 +        scanState.busyTimeout = actions.getBusyTimeout();
 +        log.trace("For tablet {} scan server selector chose scan_server:{} 
delay:{} busyTimeout:{}",
 +            loc.getExtent(), scanServer, delay, scanState.busyTimeout);
 +      } else {
 +        Optional<String> tserverLoc = loc.getTserverLocation();
 +
 +        if (tserverLoc.isPresent()) {
 +          addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), 
ServerType.TSERVER, loc);
 +          delay = actions.getDelay();
 +          scanState.busyTimeout = Duration.ZERO;
 +          log.trace("For tablet {} scan server selector chose tablet_server: 
{}", loc.getExtent(),
 +              addr);
 +        } else {
 +          log.trace(
 +              "For tablet {} scan server selector chose tablet_server, but 
the tablet is not currently hosted",
 +              loc.getExtent());
 +          return Optional.empty();
 +        }
 +      }
 +
 +      if (!delay.isZero()) {
 +        try {
 +          Thread.sleep(delay.toMillis());
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +
 +    return Optional.of(addr);
 +  }
 +
 +  /**
 +   * @see ClientTabletCache#findTablet(ClientContext, Text, boolean, 
ClientTabletCache.LocationNeed,
 +   *      int, Range)
 +   */
 +  private static int computeMinimumHostAhead(ScanState scanState,
 +      ClientTabletCache.LocationNeed hostingNeed) {
 +    int minimumHostAhead = 0;
 +
 +    if (hostingNeed == ClientTabletCache.LocationNeed.REQUIRED) {
 +      long currTime = System.nanoTime();
 +
 +      double timeRatio = 0;
 +
 +      long totalTime = currTime - scanState.startTimeNanos;
 +      if (totalTime > 0) {
 +        // The following computes (total time spent in this method)/(total 
time scanning and time
 +        // spent in this method)
 +        timeRatio = (double) scanState.getNextScanAddressTimeNanos / (double) 
(totalTime);
 +      }
 +
 +      if (timeRatio < 0 || timeRatio > 1) {
 +        log.warn("Computed ratio of time spent in getNextScanAddress has 
unexpected value : {} ",
 +            timeRatio);
 +        timeRatio = Math.max(0.0, Math.min(1.0, timeRatio));
 +      }
 +
 +      //
 +      // Do not want to host all tablets in the scan range in case not all 
data in the range is
 +      // read. Need to determine how many tablets to host ahead of time. The 
following information
 +      // is used to determine how many tablets to host ahead of time.
 +      //
 +      // 1. The number of tablets this scan has already read. The more 
tablets that this scan has
 +      // read, the more likely that it will read many more tablets.
 +      //
 +      // 2. The timeRatio computed above. As timeRatio approaches 1.0 it 
means we are spending
 +      // most of our time waiting on the next tablet to have an address. When 
are spending most of
 +      // our time waiting for a tablet to have an address we want to increase 
the number of tablets
 +      // we request to host ahead of time so we can hopefully spend less time 
waiting on that.
 +      //
 +
 +      if (timeRatio > .9) {
 +        minimumHostAhead = 16;
 +      } else if (timeRatio > .75) {
 +        minimumHostAhead = 8;
 +      } else if (timeRatio > .5) {
 +        minimumHostAhead = 4;
 +      } else if (timeRatio > .25) {
 +        minimumHostAhead = 2;
 +      } else {
 +        minimumHostAhead = 1;
 +      }
 +
 +      minimumHostAhead = Math.min(scanState.tabletsScanned, minimumHostAhead);
 +    }
 +    return minimumHostAhead;
 +  }
 +
-   static ScanAddress getNextScanAddress(ClientContext context, ScanState 
scanState, long timeOut,
-       long startTime, long maxSleepTime)
++  static ScanAddress getNextScanAddress(ClientContext context, ScanState 
scanState,
++      Duration timeOut, Timer scanTimer, long maxSleepTime)
 +      throws TableNotFoundException, AccumuloSecurityException, 
AccumuloServerException,
 +      InterruptedException, ScanTimedOutException, 
InvalidTabletHostingRequestException {
 +
 +    String lastError = null;
 +    String error = null;
 +    long sleepMillis = 100;
 +
 +    ScanAddress addr = null;
 +
 +    var hostingNeed = scanState.runOnScanServer ? 
ClientTabletCache.LocationNeed.NOT_REQUIRED
 +        : ClientTabletCache.LocationNeed.REQUIRED;
 +
 +    int minimumHostAhead = computeMinimumHostAhead(scanState, hostingNeed);
 +
 +    while (addr == null) {
-       long currentTime = System.currentTimeMillis();
-       if ((currentTime - startTime) / 1000.0 > timeOut) {
++      if (scanTimer.hasElapsed(timeOut)) {
 +        throw new ScanTimedOutException("Failed to locate next server to scan 
before timeout");
 +      }
 +
 +      CachedTablet loc = null;
 +
 +      Span child1 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::locateTablet");
 +      try (Scope locateSpan = child1.makeCurrent()) {
 +
 +        loc = ClientTabletCache.getInstance(context, 
scanState.tableId).findTablet(context,
 +            scanState.startRow, scanState.skipStartRow, hostingNeed, 
minimumHostAhead,
 +            scanState.range);
 +
 +        if (loc == null) {
 +          context.requireNotDeleted(scanState.tableId);
 +          context.requireNotOffline(scanState.tableId, null);
 +
 +          error = "Failed to locate tablet for table : " + scanState.tableId 
+ " row : "
 +              + scanState.startRow;
 +          if (!error.equals(lastError)) {
 +            log.debug("{}", error);
 +          } else if (log.isTraceEnabled()) {
 +            log.trace("{}", error);
 +          }
 +          lastError = error;
 +          sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
 +        } else {
 +          scanState.incrementTabletsScanned(loc.getExtent());
 +
 +          // when a tablet splits we do want to continue scanning the low 
child
 +          // of the split if we are already passed it
 +          Range dataRange = loc.getExtent().toDataRange();
 +
 +          if (scanState.range.getStartKey() != null
 +              && dataRange.afterEndKey(scanState.range.getStartKey())) {
 +            // go to the next tablet
 +            scanState.startRow = loc.getExtent().endRow();
 +            scanState.skipStartRow = true;
 +            // force another lookup
 +            loc = null;
 +          } else if (scanState.range.getEndKey() != null
 +              && dataRange.beforeStartKey(scanState.range.getEndKey())) {
 +            // should not happen
 +            throw new IllegalStateException("Unexpected tablet, extent : " + 
loc.getExtent()
 +                + "  range : " + scanState.range + " startRow : " + 
scanState.startRow);
 +          }
 +        }
 +      } catch (AccumuloServerException e) {
 +        TraceUtil.setException(child1, e, true);
 +        log.debug("Scan failed, server side exception : {}", e.getMessage());
 +        throw e;
 +      } catch (AccumuloException e) {
 +        error = "exception from tablet loc " + e.getMessage();
 +        if (!error.equals(lastError)) {
 +          log.debug("{}", error);
 +        } else if (log.isTraceEnabled()) {
 +          log.trace("{}", error);
 +        }
 +
 +        TraceUtil.setException(child1, e, false);
 +
 +        lastError = error;
 +        sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
 +      } finally {
 +        child1.end();
 +      }
 +
 +      if (loc != null) {
 +        if (scanState.runOnScanServer) {
-           addr = getScanServerAddress(context, scanState, loc, timeOut, 
startTime).orElse(null);
++          addr = getScanServerAddress(context, scanState, loc, timeOut, 
scanTimer).orElse(null);
 +          if (addr == null && loc.getTserverLocation().isEmpty()) {
 +            // wanted to fall back to tserver but tablet was not hosted so 
make another loop
 +            hostingNeed = ClientTabletCache.LocationNeed.REQUIRED;
 +          }
 +        } else {
 +          addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), 
ServerType.TSERVER, loc);
 +        }
 +      }
 +    }
 +
 +    return addr;
 +  }
 +
-   public static List<KeyValue> scan(ClientContext context, ScanState 
scanState, long timeOut)
+   public static List<KeyValue> scan(ClientContext context, ScanState 
scanState, Duration timeOut)
        throws ScanTimedOutException, AccumuloException, 
AccumuloSecurityException,
        TableNotFoundException {
 -    TabletLocation loc = null;
 +
-     long startTime = System.currentTimeMillis();
+     Timer scanTimer = Timer.startNew();
      String lastError = null;
      String error = null;
      int tooManyFilesCount = 0;
@@@ -648,25 -339,70 +644,25 @@@
          if (Thread.currentThread().isInterrupted()) {
            throw new AccumuloException("Thread interrupted");
          }
 +
-         if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut) {
+         if (scanTimer.hasElapsed(timeOut)) {
 -          throw new ScanTimedOutException();
 +          throw new ScanTimedOutException(
 +              "Failed to retrieve next batch of key values before timeout");
          }
  
 -        while (loc == null) {
 -          if (scanTimer.hasElapsed(timeOut)) {
 -            throw new ScanTimedOutException();
 +        ScanAddress addr;
 +        long beginTime = System.nanoTime();
 +        try {
-           addr = getNextScanAddress(context, scanState, timeOut, startTime, 
maxSleepTime);
++          addr = getNextScanAddress(context, scanState, timeOut, scanTimer, 
maxSleepTime);
 +        } finally {
 +          // track the initial time that we started tracking the time for 
getting the next scan
 +          // address
 +          if (scanState.startTimeNanos == 0) {
 +            scanState.startTimeNanos = beginTime;
            }
  
 -          Span child1 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::locateTablet");
 -          try (Scope locateSpan = child1.makeCurrent()) {
 -            loc = TabletLocator.getLocator(context, 
scanState.tableId).locateTablet(context,
 -                scanState.startRow, scanState.skipStartRow, false);
 -
 -            if (loc == null) {
 -              context.requireNotDeleted(scanState.tableId);
 -              context.requireNotOffline(scanState.tableId, null);
 -
 -              error = "Failed to locate tablet for table : " + 
scanState.tableId + " row : "
 -                  + scanState.startRow;
 -              if (!error.equals(lastError)) {
 -                log.debug("{}", error);
 -              } else if (log.isTraceEnabled()) {
 -                log.trace("{}", error);
 -              }
 -              lastError = error;
 -              sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
 -            } else {
 -              // when a tablet splits we do want to continue scanning the low 
child
 -              // of the split if we are already passed it
 -              Range dataRange = loc.getExtent().toDataRange();
 -
 -              if (scanState.range.getStartKey() != null
 -                  && dataRange.afterEndKey(scanState.range.getStartKey())) {
 -                // go to the next tablet
 -                scanState.startRow = loc.getExtent().endRow();
 -                scanState.skipStartRow = true;
 -                loc = null;
 -              } else if (scanState.range.getEndKey() != null
 -                  && dataRange.beforeStartKey(scanState.range.getEndKey())) {
 -                // should not happen
 -                throw new IllegalStateException("Unexpected tablet, extent : 
" + loc.getExtent()
 -                    + "  range : " + scanState.range + " startRow : " + 
scanState.startRow);
 -              }
 -            }
 -          } catch (AccumuloServerException e) {
 -            TraceUtil.setException(child1, e, true);
 -            log.debug("Scan failed, server side exception : {}", 
e.getMessage());
 -            throw e;
 -          } catch (AccumuloException e) {
 -            error = "exception from tablet loc " + e.getMessage();
 -            if (!error.equals(lastError)) {
 -              log.debug("{}", error);
 -            } else if (log.isTraceEnabled()) {
 -              log.trace("{}", error);
 -            }
 -
 -            TraceUtil.setException(child1, e, false);
 -
 -            lastError = error;
 -            sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
 -          } finally {
 -            child1.end();
 -          }
 +          // track the total amount of time spent getting the next scan 
address
 +          scanState.getNextScanAddressTimeNanos += System.nanoTime() - 
beginTime;
          }
  
          Span child2 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::location",
diff --cc core/src/main/java/org/apache/accumulo/core/util/Timer.java
index 3884a234bd,e0ace84c4a..88cfbeee76
--- a/core/src/main/java/org/apache/accumulo/core/util/Timer.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Timer.java
@@@ -88,12 -88,13 +88,21 @@@ public final class Timer 
      return unit.convert(getElapsedNanos(), TimeUnit.NANOSECONDS);
    }
  
 +  /**
 +   * @return true if this timer was started/reset after the other timer was 
started/reset, false
 +   *         otherwise
 +   */
 +  public boolean startedAfter(Timer otherTimer) {
 +    return (startNanos - otherTimer.startNanos) > 0;
 +  }
 +
+   private static long toNanos(Duration duration) {
+     try {
+       // This can overflow when very large, such as when the
+       // duration is created using Long.MAX_VALUE millis
+       return duration.toNanos();
+     } catch (ArithmeticException e) {
+       return Long.MAX_VALUE;
+     }
+   }
  }
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index cb8f211fe2,eae6637702..8733ffdf42
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -25,10 -29,11 +25,11 @@@ import static org.apache.accumulo.core.
  import static 
org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask;
  
  import java.io.IOException;
+ import java.io.UncheckedIOException;
  import java.lang.management.ManagementFactory;
 +import java.lang.reflect.InvocationTargetException;
  import java.net.UnknownHostException;
  import java.time.Duration;
 -import java.time.Instant;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collection;
@@@ -899,24 -1079,24 +898,24 @@@ public class TabletServer extends Abstr
      logger.minorCompactionStarted(tablet, lastUpdateSequence, 
newDataFileLocation, durability);
    }
  
+   public boolean needsRecovery(TabletMetadata tabletMetadata) {
+ 
+     var logEntries = tabletMetadata.getLogs();
+ 
+     if (logEntries.isEmpty()) {
+       return false;
+     }
+ 
+     try {
+       return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), 
logEntries);
+     } catch (IOException e) {
+       throw new UncheckedIOException(e);
+     }
+   }
+ 
 -  public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> 
logEntries,
 +  public void recover(VolumeManager fs, KeyExtent extent, 
Collection<LogEntry> logEntries,
        Set<String> tabletFiles, MutationReceiver mutationReceiver) throws 
IOException {
-     List<Path> recoveryDirs = new ArrayList<>();
-     for (LogEntry entry : logEntries) {
-       Path recovery = null;
-       Path finished = RecoveryPath.getRecoveryPath(new Path(entry.getPath()));
-       finished = SortedLogState.getFinishedMarkerPath(finished);
-       TabletServer.log.debug("Looking for " + finished);
-       if (fs.exists(finished)) {
-         recovery = finished.getParent();
-       }
-       if (recovery == null) {
-         throw new IOException(
-             "Unable to find recovery files for extent " + extent + " 
logEntry: " + entry);
-       }
-       recoveryDirs.add(recovery);
-     }
-     logger.recover(getContext(), extent, recoveryDirs, tabletFiles, 
mutationReceiver);
+     logger.recover(getContext(), extent, logEntries, tabletFiles, 
mutationReceiver);
    }
  
    public int createLogId() {
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 544f53acaa,a5f6963da3..cbc4f20e80
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -510,11 -523,49 +523,49 @@@ public class TabletServerLogger 
      return seq;
    }
  
-   public void recover(ServerContext context, KeyExtent extent, List<Path> 
recoveryDirs,
+   private List<ResolvedSortedLog> resolve(Collection<LogEntry> walogs) {
+     List<ResolvedSortedLog> sortedLogs = new ArrayList<>(walogs.size());
+     for (var logEntry : walogs) {
+       var sortedLog = sortedLogCache.get(logEntry, le1 -> {
+         try {
+           return ResolvedSortedLog.resolve(le1, tserver.getVolumeManager());
+         } catch (IOException e) {
+           throw new UncheckedIOException(e);
+         }
+       });
+ 
+       sortedLogs.add(sortedLog);
+     }
+     return sortedLogs;
+   }
+ 
+   private CacheProvider createCacheProvider(TabletServerResourceManager 
resourceMgr) {
+     return new BasicCacheProvider(
+         LoggingBlockCache.wrap(CacheType.INDEX, resourceMgr.getIndexCache()),
+         LoggingBlockCache.wrap(CacheType.DATA, resourceMgr.getDataCache()));
+   }
+ 
+   public boolean needsRecovery(ServerContext context, KeyExtent extent, 
Collection<LogEntry> walogs)
+       throws IOException {
+     try {
+       var resourceMgr = tserver.getResourceManager();
+       var cacheProvider = createCacheProvider(resourceMgr);
+       SortedLogRecovery recovery =
+           new SortedLogRecovery(context, resourceMgr.getFileLenCache(), 
cacheProvider);
+       return recovery.needsRecovery(extent, resolve(walogs));
+     } catch (Exception e) {
+       throw new IOException(e);
+     }
+   }
+ 
 -  public void recover(ServerContext context, KeyExtent extent, List<LogEntry> 
walogs,
++  public void recover(ServerContext context, KeyExtent extent, 
Collection<LogEntry> walogs,
        Set<String> tabletFiles, MutationReceiver mr) throws IOException {
      try {
-       SortedLogRecovery recovery = new SortedLogRecovery(context);
-       recovery.recover(extent, recoveryDirs, tabletFiles, mr);
+       var resourceMgr = tserver.getResourceManager();
+       var cacheProvider = createCacheProvider(resourceMgr);
+       SortedLogRecovery recovery =
+           new SortedLogRecovery(context, resourceMgr.getFileLenCache(), 
cacheProvider);
+       recovery.recover(extent, resolve(walogs), tabletFiles, mr);
      } catch (Exception e) {
        throw new IOException(e);
      }
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index a9b571078e,287bee16f3..1ce57c87e4
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -998,18 -1111,11 +998,9 @@@ public class Tablet extends TabletBase 
      // close data files
      getTabletResources().close();
  
 -    if (completeClose) {
 -      closeState = CloseState.COMPLETE;
 -    }
 +    closeState = CloseState.COMPLETE;
    }
  
-   private boolean disallowNewReservations(ScanParameters scanParameters) {
-     var scanSessId = scanParameters.getScanSessionId();
-     if (scanSessId != null) {
-       return 
getTabletServer().getSessionManager().disallowNewReservations(scanSessId);
-     } else {
-       return true;
-     }
-   }
- 
    private void closeConsistencyCheck() {
  
      long num = tabletMemory.getMemTable().getNumEntries();
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
index 3203134499,58fc563bba..d79d8408ec
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@@ -113,9 -114,18 +113,18 @@@ public abstract class TabletBase 
      }
    }
  
+   protected boolean disallowNewReservations(ScanParameters scanParameters) {
+     var scanSessId = scanParameters.getScanSessionId();
+     if (scanSessId != null) {
+       return server.getSessionManager().disallowNewReservations(scanSessId);
+     } else {
+       return true;
+     }
+   }
+ 
    public abstract boolean isClosed();
  
 -  public abstract SortedMap<StoredTabletFile,DataFileValue> getDatafiles();
 +  public abstract Map<StoredTabletFile,DataFileValue> getDatafiles();
  
    public abstract void addToYieldMetric(int i);
  
diff --cc test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
index d5c7c3e46b,0461ba53ec..399a208d0a
--- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
@@@ -252,8 -259,18 +259,16 @@@ public class ZombieScanIT extends Confi
  
      String table = getUniqueNames(1)[0];
  
+     final ServerType serverType = consistency == IMMEDIATE ? TABLET_SERVER : 
SCAN_SERVER;
+ 
      try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
  
+       if (serverType == SCAN_SERVER) {
 -        getCluster().getConfig().setNumScanServers(1);
 -        getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+         // Scans will fall back to tablet servers when no scan servers are 
present. So wait for scan
+         // servers to show up in zookeeper. Can remove this in 3.1.
+         Wait.waitFor(() -> 
!c.instanceOperations().getScanServers().isEmpty());
+       }
+ 
        c.tableOperations().create(table);
  
        var executor = Executors.newCachedThreadPool();
@@@ -314,11 -331,15 +329,10 @@@
  
        Wait.waitFor(() -> getZombieScansMetric() == 6);
  
-       assertEquals(6, countActiveScans(c, table));
+       assertEquals(6, countActiveScans(c, serverType, table));
  
        executor.shutdownNow();
 -    } finally {
 -      if (serverType == SCAN_SERVER) {
 -        getCluster().getConfig().setNumScanServers(0);
 -        getCluster().getClusterControl().stopAllServers(SCAN_SERVER);
 -      }
      }
- 
    }
  
    private static long countLocations(String table, AccumuloClient client) 
throws Exception {
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index c5069b5323,fae0aa42e7..ef5089c961
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@@ -121,10 -129,20 +129,18 @@@ public class ScannerIT extends Configur
    /**
     * {@link CloseScannerIT#testManyScans()} is a similar test.
     */
-   @Test
-   public void testSessionCleanup() throws Exception {
+   @ParameterizedTest
+   @EnumSource
+   public void testSessionCleanup(ConsistencyLevel consistency) throws 
Exception {
      final String tableName = getUniqueNames(1)[0];
-     try (AccumuloClient accumuloClient = 
Accumulo.newClient().from(getClientProps()).build()) {
+     final ServerType serverType = consistency == IMMEDIATE ? TABLET_SERVER : 
SCAN_SERVER;
+     try (AccumuloClient accumuloClient = 
Accumulo.newClient().from(getClientProperties()).build()) {
+ 
+       if (serverType == SCAN_SERVER) {
 -        getCluster().getConfig().setNumScanServers(1);
 -        getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+         // Scans will fall back to tablet servers when no scan servers are 
present. So wait for scan
+         // servers to show up in zookeeper. Can remove this in 3.1.
+         Wait.waitFor(() -> 
!accumuloClient.instanceOperations().getScanServers().isEmpty());
+       }
  
        accumuloClient.tableOperations().create(tableName);
  
@@@ -183,9 -209,14 +207,9 @@@
            scanner.setRanges(List.of(new Range()));
            assertEquals(100000, scanner.stream().count());
            assertEquals(100000, scanner.stream().count());
-           assertEquals(0, countActiveScans(accumuloClient, tableName));
+           assertEquals(0, countActiveScans(accumuloClient, serverType, 
tableName));
          }
        }
 -    } finally {
 -      if (serverType == SCAN_SERVER) {
 -        getCluster().getConfig().setNumScanServers(0);
 -        getCluster().getClusterControl().stopAllServers(SCAN_SERVER);
 -      }
      }
    }
  

Reply via email to