This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new cf2f758947 improves tablet load times for tablet w/ walogs and no data 
(#4873)
cf2f758947 is described below

commit cf2f7589477054d6f685f559109dbfb5ca9d7aba
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Sep 13 15:26:32 2024 -0400

    improves tablet load times for tablet w/ walogs and no data (#4873)
    
    This commit makes two major changes.  First it changed log recovery to
    use block caches.  Second it checks if a tablet has any data in walogs
    before acquiring the recovery lock.  These two changes together really
    speed up loading tablets that have no data in walogs. These changes
    introduce an extra opening of the walogs to see if the recovery lock
    needs to be acquired.  Using the block caches for this extra opening
    should avoid any extra cost.  The block caches also help in the case
    where many tablets with the same walogs are assigned to a tablet server.
    
    In some simple test saw an 8x speedup in tablet load times.
    
    Anytime a tablet has an unclean shutdown it will have the walogs of the
    dead tserver assigned to it even if had no data in those walogs.  These
    change make loading tablets in that situation much faster.
---
 .../org/apache/accumulo/tserver/TabletServer.java  | 18 ++++++-
 .../accumulo/tserver/log/RecoveryLogsIterator.java | 43 ++++++++++++----
 .../accumulo/tserver/log/SortedLogRecovery.java    | 29 +++++++++--
 .../accumulo/tserver/log/TabletServerLogger.java   | 59 +++++++++++++++++-----
 .../tserver/log/SortedLogRecoveryTest.java         | 17 ++++++-
 test/src/main/resources/log4j2-test.properties     |  2 +-
 6 files changed, 137 insertions(+), 31 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1039c09804..eae6637702 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -29,6 +29,7 @@ import static 
org.apache.accumulo.core.util.threads.ThreadPools.watchCriticalSch
 import static 
org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.time.Duration;
@@ -518,7 +519,7 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
   private static final AutoCloseable NOOP_CLOSEABLE = () -> {};
 
   AutoCloseable acquireRecoveryMemory(TabletMetadata tabletMetadata) {
-    if (tabletMetadata.getExtent().isMeta() || 
tabletMetadata.getLogs().isEmpty()) {
+    if (tabletMetadata.getExtent().isMeta() || !needsRecovery(tabletMetadata)) 
{
       return NOOP_CLOSEABLE;
     } else {
       recoveryLock.lock();
@@ -1078,6 +1079,21 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
     logger.minorCompactionStarted(tablet, lastUpdateSequence, 
newDataFileLocation, durability);
   }
 
+  public boolean needsRecovery(TabletMetadata tabletMetadata) {
+
+    var logEntries = tabletMetadata.getLogs();
+
+    if (logEntries.isEmpty()) {
+      return false;
+    }
+
+    try {
+      return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), 
logEntries);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
   public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> 
logEntries,
       Set<String> tabletFiles, MutationReceiver mutationReceiver) throws 
IOException {
     logger.recover(getContext(), extent, logEntries, tabletFiles, 
mutationReceiver);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
index f3d92b05ce..b2a1d8ba1c 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.iterators.IteratorAdapter;
 import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Cache;
 import com.google.common.collect.Iterators;
 
 /**
@@ -59,11 +61,17 @@ public class RecoveryLogsIterator
   private final Iterator<Entry<Key,Value>> iter;
   private final CryptoEnvironment env = new 
CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY);
 
+  public RecoveryLogsIterator(ServerContext context, List<ResolvedSortedLog> 
recoveryLogDirs,
+      LogFileKey start, LogFileKey end, boolean checkFirstKey) throws 
IOException {
+    this(context, recoveryLogDirs, start, end, checkFirstKey, null, null);
+  }
+
   /**
    * Scans the files in each recoveryLogDir over the range [start,end].
    */
   public RecoveryLogsIterator(ServerContext context, List<ResolvedSortedLog> 
recoveryLogDirs,
-      LogFileKey start, LogFileKey end, boolean checkFirstKey) throws 
IOException {
+      LogFileKey start, LogFileKey end, boolean checkFirstKey, 
Cache<String,Long> fileLenCache,
+      CacheProvider cacheProvider) throws IOException {
 
     List<Iterator<Entry<Key,Value>>> iterators = new 
ArrayList<>(recoveryLogDirs.size());
     fileIters = new ArrayList<>();
@@ -80,13 +88,13 @@ public class RecoveryLogsIterator
 
       // only check the first key once to prevent extra iterator creation and 
seeking
       if (checkFirstKey && !logFiles.isEmpty()) {
-        validateFirstKey(context, cryptoService, fs, logDir);
+        validateFirstKey(context, cryptoService, fs, logDir, fileLenCache, 
cacheProvider);
       }
 
       for (UnreferencedTabletFile log : logFiles) {
-        FileSKVIterator fileIter = 
FileOperations.getInstance().newReaderBuilder()
-            .forFile(log, fs, fs.getConf(), cryptoService)
-            
.withTableConfiguration(context.getConfiguration()).seekToBeginning().build();
+        FileSKVIterator fileIter =
+            openLogFile(context, log, cryptoService, fs, fileLenCache, 
cacheProvider);
+
         if (range != null) {
           fileIter.seek(range, Collections.emptySet(), false);
         }
@@ -131,14 +139,31 @@ public class RecoveryLogsIterator
     }
   }
 
+  FileSKVIterator openLogFile(ServerContext context, UnreferencedTabletFile 
logFile,
+      CryptoService cs, FileSystem fs, Cache<String,Long> fileLenCache, 
CacheProvider cacheProvider)
+      throws IOException {
+    var builder = FileOperations.getInstance().newReaderBuilder()
+        .forFile(logFile, fs, fs.getConf(), 
cs).withTableConfiguration(context.getConfiguration());
+
+    if (fileLenCache != null) {
+      builder = builder.withFileLenCache(fileLenCache);
+    }
+
+    if (cacheProvider != null) {
+      builder = builder.withCacheProvider(cacheProvider);
+    }
+
+    return builder.seekToBeginning().build();
+  }
+
   /**
    * Check that the first entry in the WAL is OPEN. Only need to do this once.
    */
   private void validateFirstKey(ServerContext context, CryptoService cs, 
FileSystem fs,
-      ResolvedSortedLog sortedLogs) throws IOException {
-    try (FileSKVIterator fileIter = 
FileOperations.getInstance().newReaderBuilder()
-        .forFile(sortedLogs.getChildren().first(), fs, fs.getConf(), cs)
-        
.withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
+      ResolvedSortedLog sortedLogs, Cache<String,Long> fileLenCache, 
CacheProvider cacheProvider)
+      throws IOException {
+    try (FileSKVIterator fileIter = openLogFile(context, 
sortedLogs.getChildren().first(), cs, fs,
+        fileLenCache, cacheProvider)) {
       Iterator<Entry<Key,Value>> iterator = new IteratorAdapter(fileIter);
 
       if (iterator.hasNext()) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index 8696f70c23..cf6649d0ac 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -42,6 +42,7 @@ import java.util.Set;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.tserver.logger.LogEvents;
@@ -51,6 +52,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Cache;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -65,8 +67,15 @@ public class SortedLogRecovery {
 
   private final ServerContext context;
 
-  public SortedLogRecovery(ServerContext context) {
+  private final CacheProvider cacheProvider;
+
+  private final Cache<String,Long> fileLenCache;
+
+  public SortedLogRecovery(ServerContext context, Cache<String,Long> 
fileLenCache,
+      CacheProvider cacheProvider) {
     this.context = context;
+    this.cacheProvider = cacheProvider;
+    this.fileLenCache = fileLenCache;
   }
 
   static LogFileKey maxKey(LogEvents event) {
@@ -105,7 +114,7 @@ public class SortedLogRecovery {
     int tabletId = -1;
 
     try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, 
minKey(DEFINE_TABLET),
-        maxKey(DEFINE_TABLET), true)) {
+        maxKey(DEFINE_TABLET), true, fileLenCache, cacheProvider)) {
 
       KeyExtent alternative = extent;
       if (extent.isRootTablet()) {
@@ -206,8 +215,9 @@ public class SortedLogRecovery {
     long lastFinish = 0;
     long recoverySeq = 0;
 
-    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, 
recoveryLogs,
-        minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, 
tabletId), false)) {
+    try (RecoveryLogsIterator rli =
+        new RecoveryLogsIterator(context, recoveryLogs, 
minKey(COMPACTION_START, tabletId),
+            maxKey(COMPACTION_START, tabletId), false, fileLenCache, 
cacheProvider)) {
 
       DeduplicatingIterator ddi = new DeduplicatingIterator(rli);
 
@@ -262,7 +272,8 @@ public class SortedLogRecovery {
 
     LogFileKey end = maxKey(MUTATION, tabletId);
 
-    try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, 
false)) {
+    try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, 
false, fileLenCache,
+        cacheProvider)) {
       while (rli.hasNext()) {
         Entry<LogFileKey,LogFileValue> entry = rli.next();
         LogFileKey logFileKey = entry.getKey();
@@ -287,6 +298,14 @@ public class SortedLogRecovery {
     return Collections2.transform(recoveryLogs, rsl -> rsl.getDir().getName());
   }
 
+  public boolean needsRecovery(KeyExtent extent, List<ResolvedSortedLog> 
recoveryDirs)
+      throws IOException {
+    Entry<Integer,List<ResolvedSortedLog>> maxEntry =
+        findLogsThatDefineTablet(extent, recoveryDirs);
+    int tabletId = maxEntry.getKey();
+    return tabletId != -1;
+  }
+
   public void recover(KeyExtent extent, List<ResolvedSortedLog> recoveryDirs,
       Set<String> tabletFiles, MutationReceiver mr) throws IOException {
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 3caf5e08e4..a5f6963da3 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -41,6 +41,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
+import org.apache.accumulo.core.logging.LoggingBlockCache;
+import org.apache.accumulo.core.spi.cache.CacheType;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.Retry;
@@ -50,6 +54,7 @@ import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
 import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
 import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.hadoop.fs.Path;
@@ -518,23 +523,49 @@ public class TabletServerLogger {
     return seq;
   }
 
+  private List<ResolvedSortedLog> resolve(Collection<LogEntry> walogs) {
+    List<ResolvedSortedLog> sortedLogs = new ArrayList<>(walogs.size());
+    for (var logEntry : walogs) {
+      var sortedLog = sortedLogCache.get(logEntry, le1 -> {
+        try {
+          return ResolvedSortedLog.resolve(le1, tserver.getVolumeManager());
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      });
+
+      sortedLogs.add(sortedLog);
+    }
+    return sortedLogs;
+  }
+
+  private CacheProvider createCacheProvider(TabletServerResourceManager 
resourceMgr) {
+    return new BasicCacheProvider(
+        LoggingBlockCache.wrap(CacheType.INDEX, resourceMgr.getIndexCache()),
+        LoggingBlockCache.wrap(CacheType.DATA, resourceMgr.getDataCache()));
+  }
+
+  public boolean needsRecovery(ServerContext context, KeyExtent extent, 
Collection<LogEntry> walogs)
+      throws IOException {
+    try {
+      var resourceMgr = tserver.getResourceManager();
+      var cacheProvider = createCacheProvider(resourceMgr);
+      SortedLogRecovery recovery =
+          new SortedLogRecovery(context, resourceMgr.getFileLenCache(), 
cacheProvider);
+      return recovery.needsRecovery(extent, resolve(walogs));
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
   public void recover(ServerContext context, KeyExtent extent, List<LogEntry> 
walogs,
       Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     try {
-      SortedLogRecovery recovery = new SortedLogRecovery(context);
-      List<ResolvedSortedLog> sortedLogs = new ArrayList<>(walogs.size());
-      for (var logEntry : walogs) {
-        var sortedLog = sortedLogCache.get(logEntry, le1 -> {
-          try {
-            return ResolvedSortedLog.resolve(le1, tserver.getVolumeManager());
-          } catch (IOException e) {
-            throw new UncheckedIOException(e);
-          }
-        });
-
-        sortedLogs.add(sortedLog);
-      }
-      recovery.recover(extent, sortedLogs, tabletFiles, mr);
+      var resourceMgr = tserver.getResourceManager();
+      var cacheProvider = createCacheProvider(resourceMgr);
+      SortedLogRecovery recovery =
+          new SortedLogRecovery(context, resourceMgr.getFileLenCache(), 
cacheProvider);
+      recovery.recover(extent, resolve(walogs), tabletFiles, mr);
     } catch (Exception e) {
       throw new IOException(e);
     }
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index b5c30f634e..5645dea90f 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -52,10 +52,15 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import 
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCache;
+import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.file.rfile.bcfile.Compression;
 import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils;
 import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
+import org.apache.accumulo.core.spi.cache.CacheType;
 import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -78,6 +83,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not 
set by user input")
@@ -165,6 +173,13 @@ public class SortedLogRecoveryTest extends WithTestNames {
     return recover(logs, new HashSet<>(), extent, bufferSize);
   }
 
+  private CacheProvider cacheProvider = new BasicCacheProvider(new 
TinyLfuBlockCache(
+      
BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), 
CacheType.INDEX),
+      new TinyLfuBlockCache(
+          
BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()),
+          CacheType.DATA));
+  private Cache<String,Long> fileLenCache = Caffeine.newBuilder().build();
+
   private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> 
files, KeyExtent extent,
       int bufferSize) throws IOException {
 
@@ -205,7 +220,7 @@ public class SortedLogRecoveryTest extends WithTestNames {
         dirs.add(ResolvedSortedLog.resolve(LogEntry.fromPath(origPath), fs));
       }
       // Recover
-      SortedLogRecovery recovery = new SortedLogRecovery(context);
+      SortedLogRecovery recovery = new SortedLogRecovery(context, 
fileLenCache, cacheProvider);
       CaptureMutations capture = new CaptureMutations();
       recovery.recover(extent, dirs, files, capture);
       verify(context);
diff --git a/test/src/main/resources/log4j2-test.properties 
b/test/src/main/resources/log4j2-test.properties
index 0c77a3871b..23da7cd1e8 100644
--- a/test/src/main/resources/log4j2-test.properties
+++ b/test/src/main/resources/log4j2-test.properties
@@ -25,7 +25,7 @@ appender.console.type = Console
 appender.console.name = STDOUT
 appender.console.target = SYSTEM_OUT
 appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{ISO8601} [%c{2}] %-5p: %m%n
+appender.console.layout.pattern = %d{ISO8601} %T [%c{2}] %-5p: %m%n
 
 logger.01.name = org.apache.accumulo.core
 logger.01.level = debug

Reply via email to