This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push: new cf2f758947 improves tablet load times for tablet w/ walogs and no data (#4873) cf2f758947 is described below commit cf2f7589477054d6f685f559109dbfb5ca9d7aba Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Sep 13 15:26:32 2024 -0400 improves tablet load times for tablet w/ walogs and no data (#4873) This commit makes two major changes. First it changed log recovery to use block caches. Second it checks if a tablet has any data in walogs before acquiring the recovery lock. These two changes together really speed up loading tablets that have no data in walogs. These changes introduce an extra opening of the walogs to see if the recovery lock needs to be acquired. Using the block caches for this extra opening should avoid any extra cost. The block caches also help in the case where many tablets with the same walogs are assigned to a tablet server. In some simple test saw an 8x speedup in tablet load times. Anytime a tablet has an unclean shutdown it will have the walogs of the dead tserver assigned to it even if had no data in those walogs. These change make loading tablets in that situation much faster. --- .../org/apache/accumulo/tserver/TabletServer.java | 18 ++++++- .../accumulo/tserver/log/RecoveryLogsIterator.java | 43 ++++++++++++---- .../accumulo/tserver/log/SortedLogRecovery.java | 29 +++++++++-- .../accumulo/tserver/log/TabletServerLogger.java | 59 +++++++++++++++++----- .../tserver/log/SortedLogRecoveryTest.java | 17 ++++++- test/src/main/resources/log4j2-test.properties | 2 +- 6 files changed, 137 insertions(+), 31 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 1039c09804..eae6637702 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -29,6 +29,7 @@ import static org.apache.accumulo.core.util.threads.ThreadPools.watchCriticalSch import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.time.Duration; @@ -518,7 +519,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private static final AutoCloseable NOOP_CLOSEABLE = () -> {}; AutoCloseable acquireRecoveryMemory(TabletMetadata tabletMetadata) { - if (tabletMetadata.getExtent().isMeta() || tabletMetadata.getLogs().isEmpty()) { + if (tabletMetadata.getExtent().isMeta() || !needsRecovery(tabletMetadata)) { return NOOP_CLOSEABLE; } else { recoveryLock.lock(); @@ -1078,6 +1079,21 @@ public class TabletServer extends AbstractServer implements TabletHostingServer logger.minorCompactionStarted(tablet, lastUpdateSequence, newDataFileLocation, durability); } + public boolean needsRecovery(TabletMetadata tabletMetadata) { + + var logEntries = tabletMetadata.getLogs(); + + if (logEntries.isEmpty()) { + return false; + } + + try { + return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), logEntries); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException { logger.recover(getContext(), extent, logEntries, tabletFiles, mutationReceiver); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index f3d92b05ce..b2a1d8ba1c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.iterators.IteratorAdapter; import org.apache.accumulo.core.metadata.UnreferencedTabletFile; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; @@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Cache; import com.google.common.collect.Iterators; /** @@ -59,11 +61,17 @@ public class RecoveryLogsIterator private final Iterator<Entry<Key,Value>> iter; private final CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY); + public RecoveryLogsIterator(ServerContext context, List<ResolvedSortedLog> recoveryLogDirs, + LogFileKey start, LogFileKey end, boolean checkFirstKey) throws IOException { + this(context, recoveryLogDirs, start, end, checkFirstKey, null, null); + } + /** * Scans the files in each recoveryLogDir over the range [start,end]. */ public RecoveryLogsIterator(ServerContext context, List<ResolvedSortedLog> recoveryLogDirs, - LogFileKey start, LogFileKey end, boolean checkFirstKey) throws IOException { + LogFileKey start, LogFileKey end, boolean checkFirstKey, Cache<String,Long> fileLenCache, + CacheProvider cacheProvider) throws IOException { List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size()); fileIters = new ArrayList<>(); @@ -80,13 +88,13 @@ public class RecoveryLogsIterator // only check the first key once to prevent extra iterator creation and seeking if (checkFirstKey && !logFiles.isEmpty()) { - validateFirstKey(context, cryptoService, fs, logDir); + validateFirstKey(context, cryptoService, fs, logDir, fileLenCache, cacheProvider); } for (UnreferencedTabletFile log : logFiles) { - FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(log, fs, fs.getConf(), cryptoService) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build(); + FileSKVIterator fileIter = + openLogFile(context, log, cryptoService, fs, fileLenCache, cacheProvider); + if (range != null) { fileIter.seek(range, Collections.emptySet(), false); } @@ -131,14 +139,31 @@ public class RecoveryLogsIterator } } + FileSKVIterator openLogFile(ServerContext context, UnreferencedTabletFile logFile, + CryptoService cs, FileSystem fs, Cache<String,Long> fileLenCache, CacheProvider cacheProvider) + throws IOException { + var builder = FileOperations.getInstance().newReaderBuilder() + .forFile(logFile, fs, fs.getConf(), cs).withTableConfiguration(context.getConfiguration()); + + if (fileLenCache != null) { + builder = builder.withFileLenCache(fileLenCache); + } + + if (cacheProvider != null) { + builder = builder.withCacheProvider(cacheProvider); + } + + return builder.seekToBeginning().build(); + } + /** * Check that the first entry in the WAL is OPEN. Only need to do this once. */ private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs, - ResolvedSortedLog sortedLogs) throws IOException { - try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(sortedLogs.getChildren().first(), fs, fs.getConf(), cs) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { + ResolvedSortedLog sortedLogs, Cache<String,Long> fileLenCache, CacheProvider cacheProvider) + throws IOException { + try (FileSKVIterator fileIter = openLogFile(context, sortedLogs.getChildren().first(), cs, fs, + fileLenCache, cacheProvider)) { Iterator<Entry<Key,Value>> iterator = new IteratorAdapter(fileIter); if (iterator.hasNext()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index 8696f70c23..cf6649d0ac 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -42,6 +42,7 @@ import java.util.Set; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.tserver.logger.LogEvents; @@ -51,6 +52,7 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Cache; import com.google.common.collect.Collections2; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -65,8 +67,15 @@ public class SortedLogRecovery { private final ServerContext context; - public SortedLogRecovery(ServerContext context) { + private final CacheProvider cacheProvider; + + private final Cache<String,Long> fileLenCache; + + public SortedLogRecovery(ServerContext context, Cache<String,Long> fileLenCache, + CacheProvider cacheProvider) { this.context = context; + this.cacheProvider = cacheProvider; + this.fileLenCache = fileLenCache; } static LogFileKey maxKey(LogEvents event) { @@ -105,7 +114,7 @@ public class SortedLogRecovery { int tabletId = -1; try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET), - maxKey(DEFINE_TABLET), true)) { + maxKey(DEFINE_TABLET), true, fileLenCache, cacheProvider)) { KeyExtent alternative = extent; if (extent.isRootTablet()) { @@ -206,8 +215,9 @@ public class SortedLogRecovery { long lastFinish = 0; long recoverySeq = 0; - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, recoveryLogs, - minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId), false)) { + try (RecoveryLogsIterator rli = + new RecoveryLogsIterator(context, recoveryLogs, minKey(COMPACTION_START, tabletId), + maxKey(COMPACTION_START, tabletId), false, fileLenCache, cacheProvider)) { DeduplicatingIterator ddi = new DeduplicatingIterator(rli); @@ -262,7 +272,8 @@ public class SortedLogRecovery { LogFileKey end = maxKey(MUTATION, tabletId); - try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false)) { + try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false, fileLenCache, + cacheProvider)) { while (rli.hasNext()) { Entry<LogFileKey,LogFileValue> entry = rli.next(); LogFileKey logFileKey = entry.getKey(); @@ -287,6 +298,14 @@ public class SortedLogRecovery { return Collections2.transform(recoveryLogs, rsl -> rsl.getDir().getName()); } + public boolean needsRecovery(KeyExtent extent, List<ResolvedSortedLog> recoveryDirs) + throws IOException { + Entry<Integer,List<ResolvedSortedLog>> maxEntry = + findLogsThatDefineTablet(extent, recoveryDirs); + int tabletId = maxEntry.getKey(); + return tabletId != -1; + } + public void recover(KeyExtent extent, List<ResolvedSortedLog> recoveryDirs, Set<String> tabletFiles, MutationReceiver mr) throws IOException { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 3caf5e08e4..a5f6963da3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -41,6 +41,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; +import org.apache.accumulo.core.logging.LoggingBlockCache; +import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; @@ -50,6 +54,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.tserver.TabletMutations; import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; import org.apache.accumulo.tserver.tablet.CommitSession; import org.apache.hadoop.fs.Path; @@ -518,23 +523,49 @@ public class TabletServerLogger { return seq; } + private List<ResolvedSortedLog> resolve(Collection<LogEntry> walogs) { + List<ResolvedSortedLog> sortedLogs = new ArrayList<>(walogs.size()); + for (var logEntry : walogs) { + var sortedLog = sortedLogCache.get(logEntry, le1 -> { + try { + return ResolvedSortedLog.resolve(le1, tserver.getVolumeManager()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + sortedLogs.add(sortedLog); + } + return sortedLogs; + } + + private CacheProvider createCacheProvider(TabletServerResourceManager resourceMgr) { + return new BasicCacheProvider( + LoggingBlockCache.wrap(CacheType.INDEX, resourceMgr.getIndexCache()), + LoggingBlockCache.wrap(CacheType.DATA, resourceMgr.getDataCache())); + } + + public boolean needsRecovery(ServerContext context, KeyExtent extent, Collection<LogEntry> walogs) + throws IOException { + try { + var resourceMgr = tserver.getResourceManager(); + var cacheProvider = createCacheProvider(resourceMgr); + SortedLogRecovery recovery = + new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); + return recovery.needsRecovery(extent, resolve(walogs)); + } catch (Exception e) { + throw new IOException(e); + } + } + public void recover(ServerContext context, KeyExtent extent, List<LogEntry> walogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException { try { - SortedLogRecovery recovery = new SortedLogRecovery(context); - List<ResolvedSortedLog> sortedLogs = new ArrayList<>(walogs.size()); - for (var logEntry : walogs) { - var sortedLog = sortedLogCache.get(logEntry, le1 -> { - try { - return ResolvedSortedLog.resolve(le1, tserver.getVolumeManager()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - - sortedLogs.add(sortedLog); - } - recovery.recover(extent, sortedLogs, tabletFiles, mr); + var resourceMgr = tserver.getResourceManager(); + var cacheProvider = createCacheProvider(resourceMgr); + SortedLogRecovery recovery = + new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); + recovery.recover(extent, resolve(walogs), tabletFiles, mr); } catch (Exception e) { throw new IOException(e); } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index b5c30f634e..5645dea90f 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@ -52,10 +52,15 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCache; +import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.file.rfile.bcfile.Compression; import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm; import org.apache.accumulo.core.file.rfile.bcfile.Utils; import org.apache.accumulo.core.file.streams.SeekableDataInputStream; +import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory; import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -78,6 +83,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input") @@ -165,6 +173,13 @@ public class SortedLogRecoveryTest extends WithTestNames { return recover(logs, new HashSet<>(), extent, bufferSize); } + private CacheProvider cacheProvider = new BasicCacheProvider(new TinyLfuBlockCache( + BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), CacheType.INDEX), + new TinyLfuBlockCache( + BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), + CacheType.DATA)); + private Cache<String,Long> fileLenCache = Caffeine.newBuilder().build(); + private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent, int bufferSize) throws IOException { @@ -205,7 +220,7 @@ public class SortedLogRecoveryTest extends WithTestNames { dirs.add(ResolvedSortedLog.resolve(LogEntry.fromPath(origPath), fs)); } // Recover - SortedLogRecovery recovery = new SortedLogRecovery(context); + SortedLogRecovery recovery = new SortedLogRecovery(context, fileLenCache, cacheProvider); CaptureMutations capture = new CaptureMutations(); recovery.recover(extent, dirs, files, capture); verify(context); diff --git a/test/src/main/resources/log4j2-test.properties b/test/src/main/resources/log4j2-test.properties index 0c77a3871b..23da7cd1e8 100644 --- a/test/src/main/resources/log4j2-test.properties +++ b/test/src/main/resources/log4j2-test.properties @@ -25,7 +25,7 @@ appender.console.type = Console appender.console.name = STDOUT appender.console.target = SYSTEM_OUT appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{ISO8601} [%c{2}] %-5p: %m%n +appender.console.layout.pattern = %d{ISO8601} %T [%c{2}] %-5p: %m%n logger.01.name = org.apache.accumulo.core logger.01.level = debug