This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 0c66ddacb8 Avoids listing the sorted logs dir multiple times during 
log recovery. (#4874)
0c66ddacb8 is described below

commit 0c66ddacb806e9427602b3411a8f822df2233290
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Sep 13 14:00:32 2024 -0400

    Avoids listing the sorted logs dir multiple times during log recovery. 
(#4874)
    
    The log recovery code would list the sorted walog files multiple times
    during recovery.  These changes modify the code to only list the files
    once.  Also the listing is cached for a short period of time to improve
    the case of multiple tablet referencing the same walogs.  This along
    with #4873 should result in much less traffic to the namenode when an
    entire accumulo cluster shutsdown and needs to recover.
---
 .../org/apache/accumulo/tserver/TabletServer.java  |  19 +---
 .../accumulo/tserver/log/RecoveryLogsIterator.java |  60 ++--------
 .../accumulo/tserver/log/ResolvedSortedLog.java    | 125 +++++++++++++++++++++
 .../accumulo/tserver/log/SortedLogRecovery.java    |  37 +++---
 .../accumulo/tserver/log/TabletServerLogger.java   |  24 +++-
 .../apache/accumulo/tserver/logger/LogReader.java  |   5 +-
 .../tserver/log/RecoveryLogsIteratorTest.java      |  41 +++----
 .../tserver/log/SortedLogRecoveryTest.java         |  13 ++-
 8 files changed, 209 insertions(+), 115 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 41c6409063..1039c09804 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -115,10 +115,8 @@ import 
org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
-import org.apache.accumulo.server.manager.recovery.RecoveryPath;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
@@ -1082,22 +1080,7 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
 
   public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> 
logEntries,
       Set<String> tabletFiles, MutationReceiver mutationReceiver) throws 
IOException {
-    List<Path> recoveryDirs = new ArrayList<>();
-    for (LogEntry entry : logEntries) {
-      Path recovery = null;
-      Path finished = RecoveryPath.getRecoveryPath(new Path(entry.getPath()));
-      finished = SortedLogState.getFinishedMarkerPath(finished);
-      TabletServer.log.debug("Looking for " + finished);
-      if (fs.exists(finished)) {
-        recovery = finished.getParent();
-      }
-      if (recovery == null) {
-        throw new IOException(
-            "Unable to find recovery files for extent " + extent + " logEntry: 
" + entry);
-      }
-      recoveryDirs.add(recovery);
-    }
-    logger.recover(getContext(), extent, recoveryDirs, tabletFiles, 
mutationReceiver);
+    logger.recover(getContext(), extent, logEntries, tabletFiles, 
mutationReceiver);
   }
 
   public int createLogId() {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
index f5b97361d0..f3d92b05ce 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -22,12 +22,10 @@ import java.io.IOException;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.SortedSet;
-import java.util.TreeSet;
 
 import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
 import org.apache.accumulo.core.data.Key;
@@ -40,14 +38,10 @@ import 
org.apache.accumulo.core.metadata.UnreferencedTabletFile;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,8 +62,8 @@ public class RecoveryLogsIterator
   /**
    * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  public RecoveryLogsIterator(ServerContext context, List<Path> 
recoveryLogDirs, LogFileKey start,
-      LogFileKey end, boolean checkFirstKey) throws IOException {
+  public RecoveryLogsIterator(ServerContext context, List<ResolvedSortedLog> 
recoveryLogDirs,
+      LogFileKey start, LogFileKey end, boolean checkFirstKey) throws 
IOException {
 
     List<Iterator<Entry<Key,Value>>> iterators = new 
ArrayList<>(recoveryLogDirs.size());
     fileIters = new ArrayList<>();
@@ -79,14 +73,14 @@ public class RecoveryLogsIterator
     final CryptoService cryptoService = 
context.getCryptoFactory().getService(env,
         context.getConfiguration().getAllCryptoProperties());
 
-    for (Path logDir : recoveryLogDirs) {
-      LOG.debug("Opening recovery log dir {}", logDir.getName());
-      SortedSet<UnreferencedTabletFile> logFiles = getFiles(vm, logDir);
-      var fs = vm.getFileSystemByPath(logDir);
+    for (ResolvedSortedLog logDir : recoveryLogDirs) {
+      LOG.debug("Opening recovery log dir {}", logDir);
+      SortedSet<UnreferencedTabletFile> logFiles = logDir.getChildren();
+      var fs = vm.getFileSystemByPath(logDir.getDir());
 
       // only check the first key once to prevent extra iterator creation and 
seeking
       if (checkFirstKey && !logFiles.isEmpty()) {
-        validateFirstKey(context, cryptoService, fs, logFiles, logDir);
+        validateFirstKey(context, cryptoService, fs, logDir);
       }
 
       for (UnreferencedTabletFile log : logFiles) {
@@ -137,47 +131,13 @@ public class RecoveryLogsIterator
     }
   }
 
-  /**
-   * Check for sorting signal files (finished/failed) and get the logs in the 
provided directory.
-   */
-  private SortedSet<UnreferencedTabletFile> getFiles(VolumeManager fs, Path 
directory)
-      throws IOException {
-    boolean foundFinish = false;
-    // Path::getName compares the last component of each Path value. In this 
case, the last
-    // component should
-    // always have the format 'part-r-XXXXX.rf', where XXXXX are one-up values.
-    SortedSet<UnreferencedTabletFile> logFiles =
-        new TreeSet<>(Comparator.comparing(tf -> tf.getPath().getName()));
-    for (FileStatus child : fs.listStatus(directory)) {
-      if (child.getPath().getName().startsWith("_")) {
-        continue;
-      }
-      if (SortedLogState.isFinished(child.getPath().getName())) {
-        foundFinish = true;
-        continue;
-      }
-      if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) 
{
-        continue;
-      }
-      FileSystem ns = fs.getFileSystemByPath(child.getPath());
-      UnreferencedTabletFile fullLogPath =
-          UnreferencedTabletFile.of(ns, ns.makeQualified(child.getPath()));
-      logFiles.add(fullLogPath);
-    }
-    if (!foundFinish) {
-      throw new IOException(
-          "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found 
in " + directory);
-    }
-    return logFiles;
-  }
-
   /**
    * Check that the first entry in the WAL is OPEN. Only need to do this once.
    */
   private void validateFirstKey(ServerContext context, CryptoService cs, 
FileSystem fs,
-      SortedSet<UnreferencedTabletFile> logFiles, Path fullLogPath) throws 
IOException {
+      ResolvedSortedLog sortedLogs) throws IOException {
     try (FileSKVIterator fileIter = 
FileOperations.getInstance().newReaderBuilder()
-        .forFile(logFiles.first(), fs, fs.getConf(), cs)
+        .forFile(sortedLogs.getChildren().first(), fs, fs.getConf(), cs)
         
.withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
       Iterator<Entry<Key,Value>> iterator = new IteratorAdapter(fileIter);
 
@@ -185,7 +145,7 @@ public class RecoveryLogsIterator
         Key firstKey = iterator.next().getKey();
         LogFileKey key = LogFileKey.fromKey(firstKey);
         if (key.event != LogEvents.OPEN) {
-          throw new IllegalStateException("First log entry is not OPEN " + 
fullLogPath);
+          throw new IllegalStateException("First log entry is not OPEN " + 
sortedLogs);
         }
       }
     }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java
new file mode 100644
index 0000000000..184b628ad2
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.server.manager.recovery.RecoveryPath;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Write ahead logs have two paths in DFS. There is the path of the original 
unsorted walog and the
+ * path of the sorted walog. The purpose of this class is to convert the 
unsorted wal path to a
+ * sorted wal path and validate the sorted dir exists and is finished.
+ */
+public class ResolvedSortedLog {
+
+  private static final Logger log = 
LoggerFactory.getLogger(ResolvedSortedLog.class);
+
+  private final SortedSet<UnreferencedTabletFile> children;
+  private final LogEntry origin;
+  private final Path sortedLogDir;
+
+  private ResolvedSortedLog(LogEntry origin, Path sortedLogDir,
+      SortedSet<UnreferencedTabletFile> children) {
+    this.origin = origin;
+    this.sortedLogDir = sortedLogDir;
+    this.children = Collections.unmodifiableSortedSet(children);
+  }
+
+  /**
+   * @return the unsorted walog path from which this was created.
+   */
+  public LogEntry getOrigin() {
+    return origin;
+  }
+
+  /**
+   * @return the path of the directory in which sorted logs are stored
+   */
+  public Path getDir() {
+    return sortedLogDir;
+  }
+
+  /**
+   * @return When an unsorted walog is sorted the sorted data is stored in one 
os more rfiles, this
+   *         returns the paths of those rfiles.
+   */
+  public SortedSet<UnreferencedTabletFile> getChildren() {
+    return children;
+  }
+
+  @Override
+  public String toString() {
+    return sortedLogDir.toString();
+  }
+
+  /**
+   * For a given path of an unsorted walog check to see if the corresponding 
sorted log dir exists
+   * and is finished. If it is return an immutable object containing 
information about the sorted
+   * walogs.
+   */
+  public static ResolvedSortedLog resolve(LogEntry logEntry, VolumeManager fs) 
throws IOException {
+
+    // convert the path of on unsorted logs to the expected path for the 
corresponding sorted log
+    // dir
+    Path sortedLogPath = RecoveryPath.getRecoveryPath(new 
Path(logEntry.getPath()));
+
+    boolean foundFinish = false;
+    // Path::getName compares the last component of each Path value. In this 
case, the last
+    // component should
+    // always have the format 'part-r-XXXXX.rf', where XXXXX are one-up values.
+    SortedSet<UnreferencedTabletFile> logFiles =
+        new TreeSet<>(Comparator.comparing(tf -> tf.getPath().getName()));
+    for (FileStatus child : fs.listStatus(sortedLogPath)) {
+      if (child.getPath().getName().startsWith("_")) {
+        continue;
+      }
+      if (SortedLogState.isFinished(child.getPath().getName())) {
+        foundFinish = true;
+        continue;
+      }
+      if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) 
{
+        continue;
+      }
+      FileSystem ns = fs.getFileSystemByPath(child.getPath());
+      UnreferencedTabletFile fullLogPath =
+          UnreferencedTabletFile.of(ns, ns.makeQualified(child.getPath()));
+      logFiles.add(fullLogPath);
+    }
+    if (!foundFinish) {
+      throw new IOException("Sort '" + SortedLogState.FINISHED.getMarker() + 
"' flag not found in "
+          + sortedLogPath + " for walog " + logEntry.getPath());
+    }
+
+    return new ResolvedSortedLog(logEntry, sortedLogPath, logFiles);
+  }
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index dd69e7e184..8696f70c23 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -100,7 +100,8 @@ public class SortedLogRecovery {
     return key;
   }
 
-  private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogDirs) 
throws IOException {
+  private int findMaxTabletId(KeyExtent extent, List<ResolvedSortedLog> 
recoveryLogDirs)
+      throws IOException {
     int tabletId = -1;
 
     try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, 
minKey(DEFINE_TABLET),
@@ -139,18 +140,17 @@ public class SortedLogRecovery {
    * @return The maximum tablet ID observed AND the list of logs that 
contained the maximum tablet
    *         ID.
    */
-  private Entry<Integer,List<Path>> findLogsThatDefineTablet(KeyExtent extent,
-      List<Path> recoveryDirs) throws IOException {
-    Map<Integer,List<Path>> logsThatDefineTablet = new HashMap<>();
+  private Entry<Integer,List<ResolvedSortedLog>> 
findLogsThatDefineTablet(KeyExtent extent,
+      List<ResolvedSortedLog> recoveryDirs) throws IOException {
+    Map<Integer,List<ResolvedSortedLog>> logsThatDefineTablet = new 
HashMap<>();
 
-    for (Path walDir : recoveryDirs) {
+    for (ResolvedSortedLog walDir : recoveryDirs) {
       int tabletId = findMaxTabletId(extent, 
Collections.singletonList(walDir));
       if (tabletId == -1) {
-        log.debug("Did not find tablet {} in recovery log {}", extent, 
walDir.getName());
+        log.debug("Did not find tablet {} in recovery log {}", extent, walDir);
       } else {
         logsThatDefineTablet.computeIfAbsent(tabletId, k -> new 
ArrayList<>()).add(walDir);
-        log.debug("Found tablet {} with id {} in recovery log {}", extent, 
tabletId,
-            walDir.getName());
+        log.debug("Found tablet {} with id {} in recovery log {}", extent, 
tabletId, walDir);
       }
     }
 
@@ -195,8 +195,8 @@ public class SortedLogRecovery {
 
   }
 
-  private long findRecoverySeq(List<Path> recoveryLogs, Set<String> 
tabletFiles, int tabletId)
-      throws IOException {
+  private long findRecoverySeq(List<ResolvedSortedLog> recoveryLogs, 
Set<String> tabletFiles,
+      int tabletId) throws IOException {
     HashSet<String> suffixes = new HashSet<>();
     for (String path : tabletFiles) {
       suffixes.add(getPathSuffix(path));
@@ -255,8 +255,8 @@ public class SortedLogRecovery {
     return recoverySeq;
   }
 
-  private void playbackMutations(List<Path> recoveryLogs, MutationReceiver mr, 
int tabletId,
-      long recoverySeq) throws IOException {
+  private void playbackMutations(List<ResolvedSortedLog> recoveryLogs, 
MutationReceiver mr,
+      int tabletId, long recoverySeq) throws IOException {
     LogFileKey start = minKey(MUTATION, tabletId);
     start.seq = recoverySeq;
 
@@ -283,20 +283,21 @@ public class SortedLogRecovery {
     }
   }
 
-  Collection<String> asNames(List<Path> recoveryLogs) {
-    return Collections2.transform(recoveryLogs, Path::getName);
+  Collection<String> asNames(List<ResolvedSortedLog> recoveryLogs) {
+    return Collections2.transform(recoveryLogs, rsl -> rsl.getDir().getName());
   }
 
-  public void recover(KeyExtent extent, List<Path> recoveryDirs, Set<String> 
tabletFiles,
-      MutationReceiver mr) throws IOException {
+  public void recover(KeyExtent extent, List<ResolvedSortedLog> recoveryDirs,
+      Set<String> tabletFiles, MutationReceiver mr) throws IOException {
 
-    Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, 
recoveryDirs);
+    Entry<Integer,List<ResolvedSortedLog>> maxEntry =
+        findLogsThatDefineTablet(extent, recoveryDirs);
 
     // A tablet may leave a tserver and then come back, in which case it would 
have a different and
     // higher tablet id. Only want to consider events in the log related to 
the last time the tablet
     // was loaded.
     int tabletId = maxEntry.getKey();
-    List<Path> logsThatDefineTablet = maxEntry.getValue();
+    List<ResolvedSortedLog> logsThatDefineTablet = maxEntry.getValue();
 
     if (tabletId == -1) {
       log.info("Tablet {} is not defined in recovery logs {} ", extent, 
asNames(recoveryDirs));
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 544f53acaa..3caf5e08e4 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -22,7 +22,9 @@ import static java.util.Collections.singletonList;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WAL_CREATOR_POOL;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +57,9 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
 /**
  * Central logging facility for the TServerInfo.
  *
@@ -101,6 +106,8 @@ public class TabletServerLogger {
 
   private final RetryFactory writeRetryFactory;
 
+  private final Cache<LogEntry,ResolvedSortedLog> sortedLogCache;
+
   private abstract static class TestCallWithWriteLock {
     abstract boolean test();
 
@@ -154,6 +161,7 @@ public class TabletServerLogger {
     this.createRetry = null;
     this.writeRetryFactory = writeRetryFactory;
     this.maxAge = maxAge;
+    this.sortedLogCache = Caffeine.newBuilder().expireAfterWrite(3, 
TimeUnit.SECONDS).build();
   }
 
   private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws 
IOException {
@@ -510,11 +518,23 @@ public class TabletServerLogger {
     return seq;
   }
 
-  public void recover(ServerContext context, KeyExtent extent, List<Path> 
recoveryDirs,
+  public void recover(ServerContext context, KeyExtent extent, List<LogEntry> 
walogs,
       Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     try {
       SortedLogRecovery recovery = new SortedLogRecovery(context);
-      recovery.recover(extent, recoveryDirs, tabletFiles, mr);
+      List<ResolvedSortedLog> sortedLogs = new ArrayList<>(walogs.size());
+      for (var logEntry : walogs) {
+        var sortedLog = sortedLogCache.get(logEntry, le1 -> {
+          try {
+            return ResolvedSortedLog.resolve(le1, tserver.getVolumeManager());
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+        sortedLogs.add(sortedLog);
+      }
+      recovery.recover(extent, sortedLogs, tabletFiles, mr);
     } catch (Exception e) {
       throw new IOException(e);
     }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 34cc101136..f4c009eeb0 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -42,12 +42,14 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
 import org.apache.accumulo.core.spi.crypto.NoFileEncrypter;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.log.RecoveryLogsIterator;
+import org.apache.accumulo.tserver.log.ResolvedSortedLog;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -173,7 +175,8 @@ public class LogReader implements KeywordExecutable {
         } else {
           // read the log entries in a sorted RFile. This has to be a 
directory that contains the
           // finished file.
-          try (var rli = new RecoveryLogsIterator(context, 
Collections.singletonList(path), null,
+          var rsl = 
ResolvedSortedLog.resolve(LogEntry.fromPath(path.toString()), fs);
+          try (var rli = new RecoveryLogsIterator(context, 
Collections.singletonList(rsl), null,
               null, false)) {
             while (rli.hasNext()) {
               Entry<LogFileKey,LogFileValue> entry = rli.next();
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
index 6d14f8c927..dd9b689229 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
@@ -26,21 +26,23 @@ import static org.easymock.EasyMock.replay;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.TreeMap;
+import java.util.UUID;
 
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -128,7 +130,7 @@ public class RecoveryLogsIteratorTest extends WithTestNames 
{
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("keyValues", keyValues);
 
-    ArrayList<Path> dirs = new ArrayList<>();
+    ArrayList<ResolvedSortedLog> dirs = new ArrayList<>();
 
     createRecoveryDir(logs, dirs, true);
 
@@ -154,24 +156,11 @@ public class RecoveryLogsIteratorTest extends 
WithTestNames {
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("keyValues", keyValues);
 
-    ArrayList<Path> dirs = new ArrayList<>();
+    ArrayList<ResolvedSortedLog> dirs = new ArrayList<>();
 
-    createRecoveryDir(logs, dirs, false);
-
-    assertThrows(IOException.class,
-        () -> new RecoveryLogsIterator(context, dirs, null, null, false),
+    var exception = assertThrows(IOException.class, () -> 
createRecoveryDir(logs, dirs, false),
         "Finish marker should not be found");
-  }
-
-  @Test
-  public void testSingleFile() throws IOException {
-    String destPath = workDir + "/test.rf";
-    fs.create(new Path(destPath));
-
-    assertThrows(
-        IOException.class, () -> new RecoveryLogsIterator(context,
-            Collections.singletonList(new Path(destPath)), null, null, false),
-        "Finish marker should not be found for a single file.");
+    assertTrue(exception.getMessage().contains("'finished' flag not found"));
   }
 
   @Test
@@ -187,7 +176,7 @@ public class RecoveryLogsIteratorTest extends WithTestNames 
{
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("keyValues", keyValues);
 
-    ArrayList<Path> dirs = new ArrayList<>();
+    ArrayList<ResolvedSortedLog> dirs = new ArrayList<>();
 
     createRecoveryDir(logs, dirs, true);
 
@@ -215,7 +204,7 @@ public class RecoveryLogsIteratorTest extends WithTestNames 
{
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("keyValues", keyValues);
 
-    ArrayList<Path> dirs = new ArrayList<>();
+    ArrayList<ResolvedSortedLog> dirs = new ArrayList<>();
 
     createRecoveryDir(logs, dirs, true);
 
@@ -227,11 +216,16 @@ public class RecoveryLogsIteratorTest extends 
WithTestNames {
     }
   }
 
-  private void createRecoveryDir(Map<String,KeyValue[]> logs, ArrayList<Path> 
dirs,
+  private void createRecoveryDir(Map<String,KeyValue[]> logs, 
ArrayList<ResolvedSortedLog> dirs,
       boolean FinishMarker) throws IOException {
 
     for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
-      String destPath = workDir + "/dir";
+      var uuid = UUID.randomUUID();
+      String origPath = "file://" + workDir + "/" + entry.getKey() + "/"
+          + VolumeManager.FileType.WAL.getDirectory() + "/localhost+9997/" + 
uuid;
+      String destPath = "file://" + workDir + "/" + entry.getKey() + "/"
+          + VolumeManager.FileType.RECOVERY.getDirectory() + "/" + uuid;
+
       FileSystem ns = fs.getFileSystemByPath(new Path(destPath));
 
       // convert test object to Pairs for LogSorter.
@@ -245,7 +239,8 @@ public class RecoveryLogsIteratorTest extends WithTestNames 
{
         ns.create(SortedLogState.getFinishedMarkerPath(destPath));
       }
 
-      dirs.add(new Path(destPath));
+      var rsl = ResolvedSortedLog.resolve(LogEntry.fromPath(origPath), fs);
+      dirs.add(rsl);
     }
   }
 }
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index e19b27452d..b5c30f634e 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -43,6 +43,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
 
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -57,9 +58,11 @@ import org.apache.accumulo.core.file.rfile.bcfile.Utils;
 import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
 import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.tserver.WithTestNames;
@@ -178,9 +181,13 @@ public class SortedLogRecoveryTest extends WithTestNames {
       final Path workdirPath = new Path("file://" + workdir);
       fs.deleteRecursively(workdirPath);
 
-      ArrayList<Path> dirs = new ArrayList<>();
+      ArrayList<ResolvedSortedLog> dirs = new ArrayList<>();
       for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
-        String destPath = workdir + "/" + entry.getKey();
+        var uuid = UUID.randomUUID();
+        String origPath = "file://" + workdir + "/" + entry.getKey() + "/"
+            + VolumeManager.FileType.WAL.getDirectory() + "/localhost+9997/" + 
uuid;
+        String destPath = "file://" + workdir + "/" + entry.getKey() + "/"
+            + VolumeManager.FileType.RECOVERY.getDirectory() + "/" + uuid;
         FileSystem ns = fs.getFileSystemByPath(new Path(destPath));
         // convert test object to Pairs for LogSorter, flushing based on 
bufferSize
         List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
@@ -195,7 +202,7 @@ public class SortedLogRecoveryTest extends WithTestNames {
         logSorter.writeBuffer(destPath, buffer, parts);
 
         ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
-        dirs.add(new Path(destPath));
+        dirs.add(ResolvedSortedLog.resolve(LogEntry.fromPath(origPath), fs));
       }
       // Recover
       SortedLogRecovery recovery = new SortedLogRecovery(context);

Reply via email to