This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push: new 0c66ddacb8 Avoids listing the sorted logs dir multiple times during log recovery. (#4874) 0c66ddacb8 is described below commit 0c66ddacb806e9427602b3411a8f822df2233290 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Sep 13 14:00:32 2024 -0400 Avoids listing the sorted logs dir multiple times during log recovery. (#4874) The log recovery code would list the sorted walog files multiple times during recovery. These changes modify the code to only list the files once. Also the listing is cached for a short period of time to improve the case of multiple tablet referencing the same walogs. This along with #4873 should result in much less traffic to the namenode when an entire accumulo cluster shutsdown and needs to recover. --- .../org/apache/accumulo/tserver/TabletServer.java | 19 +--- .../accumulo/tserver/log/RecoveryLogsIterator.java | 60 ++-------- .../accumulo/tserver/log/ResolvedSortedLog.java | 125 +++++++++++++++++++++ .../accumulo/tserver/log/SortedLogRecovery.java | 37 +++--- .../accumulo/tserver/log/TabletServerLogger.java | 24 +++- .../apache/accumulo/tserver/logger/LogReader.java | 5 +- .../tserver/log/RecoveryLogsIteratorTest.java | 41 +++---- .../tserver/log/SortedLogRecoveryTest.java | 13 ++- 8 files changed, 209 insertions(+), 115 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 41c6409063..1039c09804 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -115,10 +115,8 @@ import org.apache.accumulo.server.compaction.PausedCompactionMetrics; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.log.SortedLogState; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; -import org.apache.accumulo.server.manager.recovery.RecoveryPath; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; @@ -1082,22 +1080,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException { - List<Path> recoveryDirs = new ArrayList<>(); - for (LogEntry entry : logEntries) { - Path recovery = null; - Path finished = RecoveryPath.getRecoveryPath(new Path(entry.getPath())); - finished = SortedLogState.getFinishedMarkerPath(finished); - TabletServer.log.debug("Looking for " + finished); - if (fs.exists(finished)) { - recovery = finished.getParent(); - } - if (recovery == null) { - throw new IOException( - "Unable to find recovery files for extent " + extent + " logEntry: " + entry); - } - recoveryDirs.add(recovery); - } - logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver); + logger.recover(getContext(), extent, logEntries, tabletFiles, mutationReceiver); } public int createLogId() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index f5b97361d0..f3d92b05ce 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -22,12 +22,10 @@ import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.SortedSet; -import java.util.TreeSet; import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl; import org.apache.accumulo.core.data.Key; @@ -40,14 +38,10 @@ import org.apache.accumulo.core.metadata.UnreferencedTabletFile; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.log.SortedLogState; import org.apache.accumulo.tserver.logger.LogEvents; import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,8 +62,8 @@ public class RecoveryLogsIterator /** * Scans the files in each recoveryLogDir over the range [start,end]. */ - public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start, - LogFileKey end, boolean checkFirstKey) throws IOException { + public RecoveryLogsIterator(ServerContext context, List<ResolvedSortedLog> recoveryLogDirs, + LogFileKey start, LogFileKey end, boolean checkFirstKey) throws IOException { List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size()); fileIters = new ArrayList<>(); @@ -79,14 +73,14 @@ public class RecoveryLogsIterator final CryptoService cryptoService = context.getCryptoFactory().getService(env, context.getConfiguration().getAllCryptoProperties()); - for (Path logDir : recoveryLogDirs) { - LOG.debug("Opening recovery log dir {}", logDir.getName()); - SortedSet<UnreferencedTabletFile> logFiles = getFiles(vm, logDir); - var fs = vm.getFileSystemByPath(logDir); + for (ResolvedSortedLog logDir : recoveryLogDirs) { + LOG.debug("Opening recovery log dir {}", logDir); + SortedSet<UnreferencedTabletFile> logFiles = logDir.getChildren(); + var fs = vm.getFileSystemByPath(logDir.getDir()); // only check the first key once to prevent extra iterator creation and seeking if (checkFirstKey && !logFiles.isEmpty()) { - validateFirstKey(context, cryptoService, fs, logFiles, logDir); + validateFirstKey(context, cryptoService, fs, logDir); } for (UnreferencedTabletFile log : logFiles) { @@ -137,47 +131,13 @@ public class RecoveryLogsIterator } } - /** - * Check for sorting signal files (finished/failed) and get the logs in the provided directory. - */ - private SortedSet<UnreferencedTabletFile> getFiles(VolumeManager fs, Path directory) - throws IOException { - boolean foundFinish = false; - // Path::getName compares the last component of each Path value. In this case, the last - // component should - // always have the format 'part-r-XXXXX.rf', where XXXXX are one-up values. - SortedSet<UnreferencedTabletFile> logFiles = - new TreeSet<>(Comparator.comparing(tf -> tf.getPath().getName())); - for (FileStatus child : fs.listStatus(directory)) { - if (child.getPath().getName().startsWith("_")) { - continue; - } - if (SortedLogState.isFinished(child.getPath().getName())) { - foundFinish = true; - continue; - } - if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) { - continue; - } - FileSystem ns = fs.getFileSystemByPath(child.getPath()); - UnreferencedTabletFile fullLogPath = - UnreferencedTabletFile.of(ns, ns.makeQualified(child.getPath())); - logFiles.add(fullLogPath); - } - if (!foundFinish) { - throw new IOException( - "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory); - } - return logFiles; - } - /** * Check that the first entry in the WAL is OPEN. Only need to do this once. */ private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs, - SortedSet<UnreferencedTabletFile> logFiles, Path fullLogPath) throws IOException { + ResolvedSortedLog sortedLogs) throws IOException { try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(logFiles.first(), fs, fs.getConf(), cs) + .forFile(sortedLogs.getChildren().first(), fs, fs.getConf(), cs) .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { Iterator<Entry<Key,Value>> iterator = new IteratorAdapter(fileIter); @@ -185,7 +145,7 @@ public class RecoveryLogsIterator Key firstKey = iterator.next().getKey(); LogFileKey key = LogFileKey.fromKey(firstKey); if (key.event != LogEvents.OPEN) { - throw new IllegalStateException("First log entry is not OPEN " + fullLogPath); + throw new IllegalStateException("First log entry is not OPEN " + sortedLogs); } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java new file mode 100644 index 0000000000..184b628ad2 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.log; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.metadata.UnreferencedTabletFile; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.log.SortedLogState; +import org.apache.accumulo.server.manager.recovery.RecoveryPath; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Write ahead logs have two paths in DFS. There is the path of the original unsorted walog and the + * path of the sorted walog. The purpose of this class is to convert the unsorted wal path to a + * sorted wal path and validate the sorted dir exists and is finished. + */ +public class ResolvedSortedLog { + + private static final Logger log = LoggerFactory.getLogger(ResolvedSortedLog.class); + + private final SortedSet<UnreferencedTabletFile> children; + private final LogEntry origin; + private final Path sortedLogDir; + + private ResolvedSortedLog(LogEntry origin, Path sortedLogDir, + SortedSet<UnreferencedTabletFile> children) { + this.origin = origin; + this.sortedLogDir = sortedLogDir; + this.children = Collections.unmodifiableSortedSet(children); + } + + /** + * @return the unsorted walog path from which this was created. + */ + public LogEntry getOrigin() { + return origin; + } + + /** + * @return the path of the directory in which sorted logs are stored + */ + public Path getDir() { + return sortedLogDir; + } + + /** + * @return When an unsorted walog is sorted the sorted data is stored in one os more rfiles, this + * returns the paths of those rfiles. + */ + public SortedSet<UnreferencedTabletFile> getChildren() { + return children; + } + + @Override + public String toString() { + return sortedLogDir.toString(); + } + + /** + * For a given path of an unsorted walog check to see if the corresponding sorted log dir exists + * and is finished. If it is return an immutable object containing information about the sorted + * walogs. + */ + public static ResolvedSortedLog resolve(LogEntry logEntry, VolumeManager fs) throws IOException { + + // convert the path of on unsorted logs to the expected path for the corresponding sorted log + // dir + Path sortedLogPath = RecoveryPath.getRecoveryPath(new Path(logEntry.getPath())); + + boolean foundFinish = false; + // Path::getName compares the last component of each Path value. In this case, the last + // component should + // always have the format 'part-r-XXXXX.rf', where XXXXX are one-up values. + SortedSet<UnreferencedTabletFile> logFiles = + new TreeSet<>(Comparator.comparing(tf -> tf.getPath().getName())); + for (FileStatus child : fs.listStatus(sortedLogPath)) { + if (child.getPath().getName().startsWith("_")) { + continue; + } + if (SortedLogState.isFinished(child.getPath().getName())) { + foundFinish = true; + continue; + } + if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) { + continue; + } + FileSystem ns = fs.getFileSystemByPath(child.getPath()); + UnreferencedTabletFile fullLogPath = + UnreferencedTabletFile.of(ns, ns.makeQualified(child.getPath())); + logFiles.add(fullLogPath); + } + if (!foundFinish) { + throw new IOException("Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + + sortedLogPath + " for walog " + logEntry.getPath()); + } + + return new ResolvedSortedLog(logEntry, sortedLogPath, logFiles); + } +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index dd69e7e184..8696f70c23 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -100,7 +100,8 @@ public class SortedLogRecovery { return key; } - private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogDirs) throws IOException { + private int findMaxTabletId(KeyExtent extent, List<ResolvedSortedLog> recoveryLogDirs) + throws IOException { int tabletId = -1; try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET), @@ -139,18 +140,17 @@ public class SortedLogRecovery { * @return The maximum tablet ID observed AND the list of logs that contained the maximum tablet * ID. */ - private Entry<Integer,List<Path>> findLogsThatDefineTablet(KeyExtent extent, - List<Path> recoveryDirs) throws IOException { - Map<Integer,List<Path>> logsThatDefineTablet = new HashMap<>(); + private Entry<Integer,List<ResolvedSortedLog>> findLogsThatDefineTablet(KeyExtent extent, + List<ResolvedSortedLog> recoveryDirs) throws IOException { + Map<Integer,List<ResolvedSortedLog>> logsThatDefineTablet = new HashMap<>(); - for (Path walDir : recoveryDirs) { + for (ResolvedSortedLog walDir : recoveryDirs) { int tabletId = findMaxTabletId(extent, Collections.singletonList(walDir)); if (tabletId == -1) { - log.debug("Did not find tablet {} in recovery log {}", extent, walDir.getName()); + log.debug("Did not find tablet {} in recovery log {}", extent, walDir); } else { logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(walDir); - log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, - walDir.getName()); + log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, walDir); } } @@ -195,8 +195,8 @@ public class SortedLogRecovery { } - private long findRecoverySeq(List<Path> recoveryLogs, Set<String> tabletFiles, int tabletId) - throws IOException { + private long findRecoverySeq(List<ResolvedSortedLog> recoveryLogs, Set<String> tabletFiles, + int tabletId) throws IOException { HashSet<String> suffixes = new HashSet<>(); for (String path : tabletFiles) { suffixes.add(getPathSuffix(path)); @@ -255,8 +255,8 @@ public class SortedLogRecovery { return recoverySeq; } - private void playbackMutations(List<Path> recoveryLogs, MutationReceiver mr, int tabletId, - long recoverySeq) throws IOException { + private void playbackMutations(List<ResolvedSortedLog> recoveryLogs, MutationReceiver mr, + int tabletId, long recoverySeq) throws IOException { LogFileKey start = minKey(MUTATION, tabletId); start.seq = recoverySeq; @@ -283,20 +283,21 @@ public class SortedLogRecovery { } } - Collection<String> asNames(List<Path> recoveryLogs) { - return Collections2.transform(recoveryLogs, Path::getName); + Collection<String> asNames(List<ResolvedSortedLog> recoveryLogs) { + return Collections2.transform(recoveryLogs, rsl -> rsl.getDir().getName()); } - public void recover(KeyExtent extent, List<Path> recoveryDirs, Set<String> tabletFiles, - MutationReceiver mr) throws IOException { + public void recover(KeyExtent extent, List<ResolvedSortedLog> recoveryDirs, + Set<String> tabletFiles, MutationReceiver mr) throws IOException { - Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs); + Entry<Integer,List<ResolvedSortedLog>> maxEntry = + findLogsThatDefineTablet(extent, recoveryDirs); // A tablet may leave a tserver and then come back, in which case it would have a different and // higher tablet id. Only want to consider events in the log related to the last time the tablet // was loaded. int tabletId = maxEntry.getKey(); - List<Path> logsThatDefineTablet = maxEntry.getValue(); + List<ResolvedSortedLog> logsThatDefineTablet = maxEntry.getValue(); if (tabletId == -1) { log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryDirs)); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 544f53acaa..3caf5e08e4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -22,7 +22,9 @@ import static java.util.Collections.singletonList; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WAL_CREATOR_POOL; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -55,6 +57,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + /** * Central logging facility for the TServerInfo. * @@ -101,6 +106,8 @@ public class TabletServerLogger { private final RetryFactory writeRetryFactory; + private final Cache<LogEntry,ResolvedSortedLog> sortedLogCache; + private abstract static class TestCallWithWriteLock { abstract boolean test(); @@ -154,6 +161,7 @@ public class TabletServerLogger { this.createRetry = null; this.writeRetryFactory = writeRetryFactory; this.maxAge = maxAge; + this.sortedLogCache = Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.SECONDS).build(); } private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException { @@ -510,11 +518,23 @@ public class TabletServerLogger { return seq; } - public void recover(ServerContext context, KeyExtent extent, List<Path> recoveryDirs, + public void recover(ServerContext context, KeyExtent extent, List<LogEntry> walogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException { try { SortedLogRecovery recovery = new SortedLogRecovery(context); - recovery.recover(extent, recoveryDirs, tabletFiles, mr); + List<ResolvedSortedLog> sortedLogs = new ArrayList<>(walogs.size()); + for (var logEntry : walogs) { + var sortedLog = sortedLogCache.get(logEntry, le1 -> { + try { + return ResolvedSortedLog.resolve(le1, tserver.getVolumeManager()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + sortedLogs.add(sortedLog); + } + recovery.recover(extent, sortedLogs, tabletFiles, mr); } catch (Exception e) { throw new IOException(e); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java index 34cc101136..f4c009eeb0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java @@ -42,12 +42,14 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.NoFileEncrypter; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.accumulo.tserver.log.DfsLogger; import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException; import org.apache.accumulo.tserver.log.RecoveryLogsIterator; +import org.apache.accumulo.tserver.log.ResolvedSortedLog; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -173,7 +175,8 @@ public class LogReader implements KeywordExecutable { } else { // read the log entries in a sorted RFile. This has to be a directory that contains the // finished file. - try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), null, + var rsl = ResolvedSortedLog.resolve(LogEntry.fromPath(path.toString()), fs); + try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(rsl), null, null, false)) { while (rli.hasNext()) { Entry<LogFileKey,LogFileValue> entry = rli.next(); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java index 6d14f8c927..dd9b689229 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java @@ -26,21 +26,23 @@ import static org.easymock.EasyMock.replay; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.TreeMap; +import java.util.UUID; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; @@ -128,7 +130,7 @@ public class RecoveryLogsIteratorTest extends WithTestNames { Map<String,KeyValue[]> logs = new TreeMap<>(); logs.put("keyValues", keyValues); - ArrayList<Path> dirs = new ArrayList<>(); + ArrayList<ResolvedSortedLog> dirs = new ArrayList<>(); createRecoveryDir(logs, dirs, true); @@ -154,24 +156,11 @@ public class RecoveryLogsIteratorTest extends WithTestNames { Map<String,KeyValue[]> logs = new TreeMap<>(); logs.put("keyValues", keyValues); - ArrayList<Path> dirs = new ArrayList<>(); + ArrayList<ResolvedSortedLog> dirs = new ArrayList<>(); - createRecoveryDir(logs, dirs, false); - - assertThrows(IOException.class, - () -> new RecoveryLogsIterator(context, dirs, null, null, false), + var exception = assertThrows(IOException.class, () -> createRecoveryDir(logs, dirs, false), "Finish marker should not be found"); - } - - @Test - public void testSingleFile() throws IOException { - String destPath = workDir + "/test.rf"; - fs.create(new Path(destPath)); - - assertThrows( - IOException.class, () -> new RecoveryLogsIterator(context, - Collections.singletonList(new Path(destPath)), null, null, false), - "Finish marker should not be found for a single file."); + assertTrue(exception.getMessage().contains("'finished' flag not found")); } @Test @@ -187,7 +176,7 @@ public class RecoveryLogsIteratorTest extends WithTestNames { Map<String,KeyValue[]> logs = new TreeMap<>(); logs.put("keyValues", keyValues); - ArrayList<Path> dirs = new ArrayList<>(); + ArrayList<ResolvedSortedLog> dirs = new ArrayList<>(); createRecoveryDir(logs, dirs, true); @@ -215,7 +204,7 @@ public class RecoveryLogsIteratorTest extends WithTestNames { Map<String,KeyValue[]> logs = new TreeMap<>(); logs.put("keyValues", keyValues); - ArrayList<Path> dirs = new ArrayList<>(); + ArrayList<ResolvedSortedLog> dirs = new ArrayList<>(); createRecoveryDir(logs, dirs, true); @@ -227,11 +216,16 @@ public class RecoveryLogsIteratorTest extends WithTestNames { } } - private void createRecoveryDir(Map<String,KeyValue[]> logs, ArrayList<Path> dirs, + private void createRecoveryDir(Map<String,KeyValue[]> logs, ArrayList<ResolvedSortedLog> dirs, boolean FinishMarker) throws IOException { for (Entry<String,KeyValue[]> entry : logs.entrySet()) { - String destPath = workDir + "/dir"; + var uuid = UUID.randomUUID(); + String origPath = "file://" + workDir + "/" + entry.getKey() + "/" + + VolumeManager.FileType.WAL.getDirectory() + "/localhost+9997/" + uuid; + String destPath = "file://" + workDir + "/" + entry.getKey() + "/" + + VolumeManager.FileType.RECOVERY.getDirectory() + "/" + uuid; + FileSystem ns = fs.getFileSystemByPath(new Path(destPath)); // convert test object to Pairs for LogSorter. @@ -245,7 +239,8 @@ public class RecoveryLogsIteratorTest extends WithTestNames { ns.create(SortedLogState.getFinishedMarkerPath(destPath)); } - dirs.add(new Path(destPath)); + var rsl = ResolvedSortedLog.resolve(LogEntry.fromPath(origPath), fs); + dirs.add(rsl); } } } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index e19b27452d..b5c30f634e 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@ -43,6 +43,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; @@ -57,9 +58,11 @@ import org.apache.accumulo.core.file.rfile.bcfile.Utils; import org.apache.accumulo.core.file.streams.SeekableDataInputStream; import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory; import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.data.ServerMutation; +import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.log.SortedLogState; import org.apache.accumulo.tserver.WithTestNames; @@ -178,9 +181,13 @@ public class SortedLogRecoveryTest extends WithTestNames { final Path workdirPath = new Path("file://" + workdir); fs.deleteRecursively(workdirPath); - ArrayList<Path> dirs = new ArrayList<>(); + ArrayList<ResolvedSortedLog> dirs = new ArrayList<>(); for (Entry<String,KeyValue[]> entry : logs.entrySet()) { - String destPath = workdir + "/" + entry.getKey(); + var uuid = UUID.randomUUID(); + String origPath = "file://" + workdir + "/" + entry.getKey() + "/" + + VolumeManager.FileType.WAL.getDirectory() + "/localhost+9997/" + uuid; + String destPath = "file://" + workdir + "/" + entry.getKey() + "/" + + VolumeManager.FileType.RECOVERY.getDirectory() + "/" + uuid; FileSystem ns = fs.getFileSystemByPath(new Path(destPath)); // convert test object to Pairs for LogSorter, flushing based on bufferSize List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>(); @@ -195,7 +202,7 @@ public class SortedLogRecoveryTest extends WithTestNames { logSorter.writeBuffer(destPath, buffer, parts); ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); - dirs.add(new Path(destPath)); + dirs.add(ResolvedSortedLog.resolve(LogEntry.fromPath(origPath), fs)); } // Recover SortedLogRecovery recovery = new SortedLogRecovery(context);