This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 5d3ccec0df Enable background refresh for the scan server tablet metadata cache (#4551) 5d3ccec0df is described below commit 5d3ccec0dfed42667608a574ae2cb4b12ffa1987 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sat May 25 08:07:35 2024 -0400 Enable background refresh for the scan server tablet metadata cache (#4551) This adds a property to configure the scan server tablet metadata Caffeine cache to refresh cached tablet metadata in the background on cache hits after the refresh time has passed. The refresh time is expressed as a percentage of the expiration time. This allows the cached entries to refresh before expiration if they are frequently used so that scans will not be blocked waiting on a refresh on expiration. Entries still expire if no cache hits come after the refresh time and expiration time passes. See: https://github.com/ben-manes/caffeine/wiki/Refresh This closes #4544 --- .../org/apache/accumulo/core/conf/Property.java | 11 +++- .../org/apache/accumulo/tserver/ScanServer.java | 37 +++++++++++- .../test/ScanServerConcurrentTabletScanIT.java | 69 ++++++++++++++++++++-- 3 files changed, 107 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index fc5a52f239..fe4b8f4c18 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -446,9 +446,18 @@ public enum Property { "Specifies a default blocksize for the scan server caches.", "2.1.0"), @Experimental SSERV_CACHED_TABLET_METADATA_EXPIRATION("sserver.cache.metadata.expiration", "5m", - PropertyType.TIMEDURATION, "The time after which cached tablet metadata will be refreshed.", + PropertyType.TIMEDURATION, + "The time after which cached tablet metadata will be expired if not previously refreshed.", "2.1.0"), @Experimental + SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT("sserver.cache.metadata.refresh.percent", ".75", + PropertyType.FRACTION, + "The time after which cached tablet metadata will be refreshed, expressed as a " + + "percentage of the expiration time. Cache hits after this time, but before the " + + "expiration time, will trigger a background refresh for future hits. " + + "Value must be less than 100%. Set to 0 will disable refresh.", + "2.1.3"), + @Experimental SSERV_PORTSEARCH("sserver.port.search", "true", PropertyType.BOOLEAN, "if the ports above are in use, search higher ports until one is available.", "2.1.0"), @Experimental diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 2ddb76e2cb..3b52ecf0fd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -39,6 +39,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -179,6 +180,7 @@ public class ScanServer extends AbstractServer private UUID serverLockUUID; private final TabletMetadataLoader tabletMetadataLoader; private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache; + private final ThreadPoolExecutor tmCacheExecutor; // tracks file reservations that are in the process of being added or removed from the metadata // table private final Set<StoredTabletFile> influxFiles = new HashSet<>(); @@ -242,14 +244,38 @@ public class ScanServer extends AbstractServer if (cacheExpiration == 0L) { LOG.warn("Tablet metadata caching disabled, may cause excessive scans on metadata table."); tabletMetadataCache = null; + tmCacheExecutor = null; } else { if (cacheExpiration < 60000) { LOG.warn( "Tablet metadata caching less than one minute, may cause excessive scans on metadata table."); } - tabletMetadataCache = - Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS) - .scheduler(Scheduler.systemScheduler()).recordStats().build(tabletMetadataLoader); + + // Get the cache refresh percentage property + // Value must be less than 100% as 100 or over would effectively disable it + double cacheRefreshPercentage = + getConfiguration().getFraction(Property.SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT); + Preconditions.checkArgument(cacheRefreshPercentage < cacheExpiration, + "Tablet metadata cache refresh percentage is '%s' but must be less than 1", + cacheRefreshPercentage); + + tmCacheExecutor = context.threadPools().getPoolBuilder("scanServerTmCache").numCoreThreads(8) + .enableThreadPoolMetrics().build(); + var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS) + .scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats(); + if (cacheRefreshPercentage > 0) { + // Compute the refresh time as a percentage of the expiration time + // Cache hits after this time, but before expiration, will trigger a background + // non-blocking refresh of the entry so future cache hits get an updated entry + // without having to block for a refresh + long cacheRefresh = (long) (cacheExpiration * cacheRefreshPercentage); + LOG.debug("Tablet metadata refresh percentage set to {}, refresh time set to {} ms", + cacheRefreshPercentage, cacheRefresh); + builder.refreshAfterWrite(cacheRefresh, TimeUnit.MILLISECONDS); + } else { + LOG.warn("Tablet metadata cache refresh disabled, may cause blocking on cache expiration."); + } + tabletMetadataCache = builder.build(tabletMetadataLoader); } delegate = newThriftScanClientHandler(new WriteTracker()); @@ -413,6 +439,11 @@ public class ScanServer extends AbstractServer LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); } + if (tmCacheExecutor != null) { + LOG.debug("Shutting down TabletMetadataCache executor"); + tmCacheExecutor.shutdownNow(); + } + gcLogger.logGCInfo(getConfiguration()); LOG.info("stop requested. exiting ... "); try { diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java index beea10ba26..8e22b9e203 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java @@ -46,6 +46,7 @@ import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.util.Wait; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -81,7 +82,7 @@ public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase { SharedMiniClusterBase.stopMiniCluster(); } - private void startScanServer(boolean cacheEnabled) + private void startScanServer(String cacheExpiration, String cacheRefresh) throws IOException, KeeperException, InterruptedException { String zooRoot = getCluster().getServerContext().getZooKeeperRoot(); @@ -91,8 +92,8 @@ public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase { SharedMiniClusterBase.getCluster().getClusterControl().stop(ServerType.SCAN_SERVER); Map<String,String> overrides = new HashMap<>(); - overrides.put(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey(), - cacheEnabled ? "300m" : "0m"); + overrides.put(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey(), cacheExpiration); + overrides.put(Property.SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT.getKey(), cacheRefresh); SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, overrides, 1); while (zrw.getChildren(scanServerRoot).size() == 0) { @@ -102,9 +103,31 @@ public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase { } @Test - public void testScanSameTabletDifferentDataTabletMetadataCacheEnabled() throws Exception { + public void testScanSameTabletDifferentDataTmCacheEnabledRefreshNotTriggered() throws Exception { + // Set the cache time to 10 minutes so it won't expire + // Set the cache refresh to 50%, which is 5 minutes so a refresh won't be triggered + startScanServer("10m", ".5"); + testScanSameTabletDifferentDataTabletMetadataCacheEnabled(false); + } + + @Test + public void testScanSameTabletDifferentDataTmCacheEnabledRefreshTriggered() throws Exception { + // Set the cache time to 10 minutes so it won't expire + // Set the cache refresh to 6ms, so a second hit after 6ms will trigger a background refresh + // .00001 * 10m (600000ms) = 6 ms + startScanServer("10m", ".00001"); + testScanSameTabletDifferentDataTabletMetadataCacheEnabled(true); + } + + @Test + public void testScanSameTabletDifferentDataTmCacheEnabledRefreshDisabled() throws Exception { + // Set the cache time to 10 minutes so it won't expire and disable the refresh entirely + startScanServer("5m", "0"); + testScanSameTabletDifferentDataTabletMetadataCacheEnabled(false); + } - startScanServer(true); + private void testScanSameTabletDifferentDataTabletMetadataCacheEnabled(boolean shouldRefresh) + throws Exception { Properties clientProperties = getClientProps(); clientProperties.put(ClientProperty.SCANNER_BATCH_SIZE.getKey(), "100"); @@ -135,7 +158,15 @@ public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase { // Ingest another 100 k/v with a different column family final int secondBatchOfEntriesCount = ingest(client, tableName, 10, 10, 0, "COLB", true); + // Add a sleep that is long enough that the configured refresh interval passes if + // the test has been set to use one. + Thread.sleep(1000); + // iter2 should read 1000 k/v because the tablet metadata is cached. + // This call will trigger a cache refresh for the entry in the background if refresh is + // enabled. + // This current iter2 scan will still return the old data (1000 k/v) as the new value + // won't be visible until the reload finishes Iterator<Entry<Key,Value>> iter2 = scanner1.iterator(); int count2 = 0; boolean useIter1 = true; @@ -157,6 +188,22 @@ public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase { assertEquals(firstBatchOfEntriesCount, count1); assertEquals(firstBatchOfEntriesCount, count2); + // If a refresh was done this should see 1100 entries + // Keep scanning until updated value is seen + if (shouldRefresh) { + // Count the number of entries for the third iterator to test if + // refresh worked depending on configs + Wait.waitFor( + () -> countEntries(scanner1) == firstBatchOfEntriesCount + secondBatchOfEntriesCount, + 10000, 500); + } else { + // There's not a great way to test the case of things not refreshing as the value + // should just be the same, so sleep for a period of time that should be longer than + // the refresh + Thread.sleep(1000); + assertEquals(firstBatchOfEntriesCount, countEntries(scanner1)); + } + scanner1.close(); // A new scan should read all 1100 entries @@ -167,10 +214,20 @@ public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase { } } + private int countEntries(Scanner scanner) { + int count = 0; + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + while (iter.hasNext()) { + iter.next(); + count++; + } + return count; + } + @Test public void testScanSameTabletDifferentDataTabletMetadataCacheDisabled() throws Exception { - startScanServer(false); + startScanServer("0m", "0"); Properties clientProperties = getClientProps(); clientProperties.put(ClientProperty.SCANNER_BATCH_SIZE.getKey(), "100");