This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 5d3ccec0df Enable background refresh for the scan server tablet 
metadata cache (#4551)
5d3ccec0df is described below

commit 5d3ccec0dfed42667608a574ae2cb4b12ffa1987
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Sat May 25 08:07:35 2024 -0400

    Enable background refresh for the scan server tablet metadata cache (#4551)
    
    This adds a property to configure the scan server tablet metadata
    Caffeine cache to refresh cached tablet metadata in the background on
    cache hits after the refresh time has passed. The refresh time is
    expressed as a percentage of the expiration time. This allows the cached
    entries to refresh before expiration if they are frequently used so that
    scans will not be blocked waiting on a refresh on expiration. Entries
    still expire if no cache hits come after the refresh time and expiration
    time passes.
    
    See: https://github.com/ben-manes/caffeine/wiki/Refresh
    
    This closes #4544
---
 .../org/apache/accumulo/core/conf/Property.java    | 11 +++-
 .../org/apache/accumulo/tserver/ScanServer.java    | 37 +++++++++++-
 .../test/ScanServerConcurrentTabletScanIT.java     | 69 ++++++++++++++++++++--
 3 files changed, 107 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fc5a52f239..fe4b8f4c18 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -446,9 +446,18 @@ public enum Property {
       "Specifies a default blocksize for the scan server caches.", "2.1.0"),
   @Experimental
   SSERV_CACHED_TABLET_METADATA_EXPIRATION("sserver.cache.metadata.expiration", 
"5m",
-      PropertyType.TIMEDURATION, "The time after which cached tablet metadata 
will be refreshed.",
+      PropertyType.TIMEDURATION,
+      "The time after which cached tablet metadata will be expired if not 
previously refreshed.",
       "2.1.0"),
   @Experimental
+  
SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT("sserver.cache.metadata.refresh.percent",
 ".75",
+      PropertyType.FRACTION,
+      "The time after which cached tablet metadata will be refreshed, 
expressed as a "
+          + "percentage of the expiration time. Cache hits after this time, 
but before the "
+          + "expiration time, will trigger a background refresh for future 
hits. "
+          + "Value must be less than 100%. Set to 0 will disable refresh.",
+      "2.1.3"),
+  @Experimental
   SSERV_PORTSEARCH("sserver.port.search", "true", PropertyType.BOOLEAN,
       "if the ports above are in use, search higher ports until one is 
available.", "2.1.0"),
   @Experimental
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 2ddb76e2cb..3b52ecf0fd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -179,6 +180,7 @@ public class ScanServer extends AbstractServer
   private UUID serverLockUUID;
   private final TabletMetadataLoader tabletMetadataLoader;
   private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+  private final ThreadPoolExecutor tmCacheExecutor;
   // tracks file reservations that are in the process of being added or 
removed from the metadata
   // table
   private final Set<StoredTabletFile> influxFiles = new HashSet<>();
@@ -242,14 +244,38 @@ public class ScanServer extends AbstractServer
     if (cacheExpiration == 0L) {
       LOG.warn("Tablet metadata caching disabled, may cause excessive scans on 
metadata table.");
       tabletMetadataCache = null;
+      tmCacheExecutor = null;
     } else {
       if (cacheExpiration < 60000) {
         LOG.warn(
             "Tablet metadata caching less than one minute, may cause excessive 
scans on metadata table.");
       }
-      tabletMetadataCache =
-          Caffeine.newBuilder().expireAfterWrite(cacheExpiration, 
TimeUnit.MILLISECONDS)
-              
.scheduler(Scheduler.systemScheduler()).recordStats().build(tabletMetadataLoader);
+
+      // Get the cache refresh percentage property
+      // Value must be less than 100% as 100 or over would effectively disable 
it
+      double cacheRefreshPercentage =
+          
getConfiguration().getFraction(Property.SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT);
+      Preconditions.checkArgument(cacheRefreshPercentage < cacheExpiration,
+          "Tablet metadata cache refresh percentage is '%s' but must be less 
than 1",
+          cacheRefreshPercentage);
+
+      tmCacheExecutor = 
context.threadPools().getPoolBuilder("scanServerTmCache").numCoreThreads(8)
+          .enableThreadPoolMetrics().build();
+      var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, 
TimeUnit.MILLISECONDS)
+          
.scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats();
+      if (cacheRefreshPercentage > 0) {
+        // Compute the refresh time as a percentage of the expiration time
+        // Cache hits after this time, but before expiration, will trigger a 
background
+        // non-blocking refresh of the entry so future cache hits get an 
updated entry
+        // without having to block for a refresh
+        long cacheRefresh = (long) (cacheExpiration * cacheRefreshPercentage);
+        LOG.debug("Tablet metadata refresh percentage set to {}, refresh time 
set to {} ms",
+            cacheRefreshPercentage, cacheRefresh);
+        builder.refreshAfterWrite(cacheRefresh, TimeUnit.MILLISECONDS);
+      } else {
+        LOG.warn("Tablet metadata cache refresh disabled, may cause blocking 
on cache expiration.");
+      }
+      tabletMetadataCache = builder.build(tabletMetadataLoader);
     }
 
     delegate = newThriftScanClientHandler(new WriteTracker());
@@ -413,6 +439,11 @@ public class ScanServer extends AbstractServer
         LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
       }
 
+      if (tmCacheExecutor != null) {
+        LOG.debug("Shutting down TabletMetadataCache executor");
+        tmCacheExecutor.shutdownNow();
+      }
+
       gcLogger.logGCInfo(getConfiguration());
       LOG.info("stop requested. exiting ... ");
       try {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
 
b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
index beea10ba26..8e22b9e203 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
@@ -46,6 +46,7 @@ import 
org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.zookeeper.KeeperException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -81,7 +82,7 @@ public class ScanServerConcurrentTabletScanIT extends 
SharedMiniClusterBase {
     SharedMiniClusterBase.stopMiniCluster();
   }
 
-  private void startScanServer(boolean cacheEnabled)
+  private void startScanServer(String cacheExpiration, String cacheRefresh)
       throws IOException, KeeperException, InterruptedException {
 
     String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
@@ -91,8 +92,8 @@ public class ScanServerConcurrentTabletScanIT extends 
SharedMiniClusterBase {
     
SharedMiniClusterBase.getCluster().getClusterControl().stop(ServerType.SCAN_SERVER);
 
     Map<String,String> overrides = new HashMap<>();
-    overrides.put(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey(),
-        cacheEnabled ? "300m" : "0m");
+    overrides.put(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey(), 
cacheExpiration);
+    
overrides.put(Property.SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT.getKey(), 
cacheRefresh);
     
SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
 overrides,
         1);
     while (zrw.getChildren(scanServerRoot).size() == 0) {
@@ -102,9 +103,31 @@ public class ScanServerConcurrentTabletScanIT extends 
SharedMiniClusterBase {
   }
 
   @Test
-  public void testScanSameTabletDifferentDataTabletMetadataCacheEnabled() 
throws Exception {
+  public void 
testScanSameTabletDifferentDataTmCacheEnabledRefreshNotTriggered() throws 
Exception {
+    // Set the cache time to 10 minutes so it won't expire
+    // Set the cache refresh to 50%, which is 5 minutes so a refresh won't be 
triggered
+    startScanServer("10m", ".5");
+    testScanSameTabletDifferentDataTabletMetadataCacheEnabled(false);
+  }
+
+  @Test
+  public void testScanSameTabletDifferentDataTmCacheEnabledRefreshTriggered() 
throws Exception {
+    // Set the cache time to 10 minutes so it won't expire
+    // Set the cache refresh to 6ms, so a second hit after 6ms will trigger a 
background refresh
+    // .00001 * 10m (600000ms) = 6 ms
+    startScanServer("10m", ".00001");
+    testScanSameTabletDifferentDataTabletMetadataCacheEnabled(true);
+  }
+
+  @Test
+  public void testScanSameTabletDifferentDataTmCacheEnabledRefreshDisabled() 
throws Exception {
+    // Set the cache time to 10 minutes so it won't expire and disable the 
refresh entirely
+    startScanServer("5m", "0");
+    testScanSameTabletDifferentDataTabletMetadataCacheEnabled(false);
+  }
 
-    startScanServer(true);
+  private void 
testScanSameTabletDifferentDataTabletMetadataCacheEnabled(boolean shouldRefresh)
+      throws Exception {
 
     Properties clientProperties = getClientProps();
     clientProperties.put(ClientProperty.SCANNER_BATCH_SIZE.getKey(), "100");
@@ -135,7 +158,15 @@ public class ScanServerConcurrentTabletScanIT extends 
SharedMiniClusterBase {
       // Ingest another 100 k/v with a different column family
       final int secondBatchOfEntriesCount = ingest(client, tableName, 10, 10, 
0, "COLB", true);
 
+      // Add a sleep that is long enough that the configured refresh interval 
passes if
+      // the test has been set to use one.
+      Thread.sleep(1000);
+
       // iter2 should read 1000 k/v because the tablet metadata is cached.
+      // This call will trigger a cache refresh for the entry in the 
background if refresh is
+      // enabled.
+      // This current iter2 scan will still return the old data (1000 k/v) as 
the new value
+      // won't be visible until the reload finishes
       Iterator<Entry<Key,Value>> iter2 = scanner1.iterator();
       int count2 = 0;
       boolean useIter1 = true;
@@ -157,6 +188,22 @@ public class ScanServerConcurrentTabletScanIT extends 
SharedMiniClusterBase {
       assertEquals(firstBatchOfEntriesCount, count1);
       assertEquals(firstBatchOfEntriesCount, count2);
 
+      // If a refresh was done this should see 1100 entries
+      // Keep scanning until updated value is seen
+      if (shouldRefresh) {
+        // Count the number of entries for the third iterator to test if
+        // refresh worked depending on configs
+        Wait.waitFor(
+            () -> countEntries(scanner1) == firstBatchOfEntriesCount + 
secondBatchOfEntriesCount,
+            10000, 500);
+      } else {
+        // There's not a great way to test the case of things not refreshing 
as the value
+        // should just be the same, so sleep for a period of time that should 
be longer than
+        // the refresh
+        Thread.sleep(1000);
+        assertEquals(firstBatchOfEntriesCount, countEntries(scanner1));
+      }
+
       scanner1.close();
 
       // A new scan should read all 1100 entries
@@ -167,10 +214,20 @@ public class ScanServerConcurrentTabletScanIT extends 
SharedMiniClusterBase {
     }
   }
 
+  private int countEntries(Scanner scanner) {
+    int count = 0;
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    while (iter.hasNext()) {
+      iter.next();
+      count++;
+    }
+    return count;
+  }
+
   @Test
   public void testScanSameTabletDifferentDataTabletMetadataCacheDisabled() 
throws Exception {
 
-    startScanServer(false);
+    startScanServer("0m", "0");
 
     Properties clientProperties = getClientProps();
     clientProperties.put(ClientProperty.SCANNER_BATCH_SIZE.getKey(), "100");

Reply via email to