This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 13ce0d1f27 Cache Hadoop config in getVolumeManagerConfiguration (#3706) 13ce0d1f27 is described below commit 13ce0d1f272950dd5a1a1f882d4ab6b65b1e1419 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Aug 21 14:04:00 2023 -0400 Cache Hadoop config in getVolumeManagerConfiguration (#3706) Cache Hadoop Configuration in getVolumeManagerConfiguration so as not to call new Configuration(Configuration) frequently. Calling this version of the constructor causes a synchronization point across many threads. Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../accumulo/server/fs/VolumeManagerImpl.java | 43 ++++++++++++++++------ .../accumulo/server/fs/VolumeManagerImplTest.java | 39 ++++++++++++++++++++ 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 29e1f03d44..51ba4d4c9e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -38,12 +38,14 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.spi.fs.VolumeChooser; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.core.volume.VolumeConfiguration; @@ -66,6 +68,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -75,6 +79,9 @@ public class VolumeManagerImpl implements VolumeManager { private static final HashSet<String> WARNED_ABOUT_SYNCONCLOSE = new HashSet<>(); + private static final Cache<Pair<Configuration,String>,Configuration> HDFS_CONFIGS_FOR_VOLUME = + Caffeine.newBuilder().expireAfterWrite(24, TimeUnit.HOURS).build(); + private final Map<String,Volume> volumesByName; private final Multimap<URI,Volume> volumesByFileSystemUri; private final VolumeChooser chooser; @@ -377,27 +384,41 @@ public class VolumeManagerImpl implements VolumeManager { * </pre> * * We will use these properties to return a new Configuration object that can be used with the - * FileSystem URI. + * FileSystem URI to override properties in the original Configuration. If these properties are + * not set for a volume, then the original Configuration is returned. If they are set, a new + * Configuration is created with the overridden properties set. In either case, the returned + * Configuration is cached, to avoid unnecessary recomputation. This works because these override + * properties are instance properties and cannot change while the system is running. * * @param conf AccumuloConfiguration object * @param hadoopConf Hadoop Configuration object * @param filesystemURI Volume Filesystem URI * @return Hadoop Configuration with custom overrides for this FileSystem */ - private static Configuration getVolumeManagerConfiguration(AccumuloConfiguration conf, + protected static Configuration getVolumeManagerConfiguration(AccumuloConfiguration conf, final Configuration hadoopConf, final String filesystemURI) { - final Configuration volumeConfig = new Configuration(hadoopConf); + final var cacheKey = new Pair<>(hadoopConf, filesystemURI); + return HDFS_CONFIGS_FOR_VOLUME.get(cacheKey, (key) -> { + + Map<String,String> volumeHdfsConfigOverrides = + conf.getAllPropertiesWithPrefixStripped(Property.INSTANCE_VOLUME_CONFIG_PREFIX).entrySet() + .stream().filter(e -> e.getKey().startsWith(filesystemURI + ".")) + .collect(Collectors.toUnmodifiableMap( + e -> e.getKey().substring(filesystemURI.length() + 1), Entry::getValue)); - conf.getAllPropertiesWithPrefixStripped(Property.INSTANCE_VOLUME_CONFIG_PREFIX).entrySet() - .stream().filter(e -> e.getKey().startsWith(filesystemURI + ".")).forEach(e -> { - String key = e.getKey().substring(filesystemURI.length() + 1); - String value = e.getValue(); - log.info("Overriding property {} for volume {}", key, value, filesystemURI); - volumeConfig.set(key, value); - }); + // use the original if no overrides exist + if (volumeHdfsConfigOverrides.isEmpty()) { + return hadoopConf; + } - return volumeConfig; + Configuration volumeConfig = new Configuration(hadoopConf); + volumeHdfsConfigOverrides.forEach((k, v) -> { + log.info("Overriding property {}={} for volume {}", k, v, filesystemURI); + volumeConfig.set(k, v); + }); + return volumeConfig; + }); } protected static Stream<Entry<String,String>> diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java index a112a3c906..980526dc3e 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java @@ -24,7 +24,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @@ -37,6 +39,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.spi.common.ServiceEnvironment; @@ -142,6 +145,42 @@ public class VolumeManagerImplTest { assertEquals("20", e.getValue()); } + @Test + public void testGetVolumeManagerConfiguration() throws Exception { + + final ConfigurationCopy accumuloConf = + new ConfigurationCopy(DefaultConfiguration.getInstance()); + final Configuration hadoopConf = new Configuration(); + final String fileSystem = "file://127.0.0.1/vol1/"; + + final Configuration volumeConfig = + VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf, hadoopConf, fileSystem); + assertSame(volumeConfig, hadoopConf); + + accumuloConf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + fileSystem + "." + + DFS_CLIENT_CACHE_DROP_BEHIND_READS, "true"); + + Configuration hadoopConf2 = new Configuration(hadoopConf); + final Configuration volumeConfig2 = + VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf, hadoopConf2, fileSystem); + + assertNotSame(volumeConfig2, hadoopConf); // false because of the additional property + assertNotSame(volumeConfig, volumeConfig2); // false because of the additional property + assertEquals("true", volumeConfig2.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS)); + + final Configuration volumeConfig3 = + VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf, volumeConfig2, fileSystem); + assertEquals("true", volumeConfig3.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS)); + // false because of the different Hadoop configuration input + assertNotSame(volumeConfig3, volumeConfig2); + + final Configuration volumeConfig4 = + VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf, hadoopConf2, fileSystem); + assertEquals("true", volumeConfig4.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS)); + // true because of the same hadoop configuration input + assertSame(volumeConfig4, volumeConfig2); + } + @Test public void testConfigurationOverrides() throws Exception {