This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 13ce0d1f27 Cache Hadoop config in getVolumeManagerConfiguration (#3706)
13ce0d1f27 is described below

commit 13ce0d1f272950dd5a1a1f882d4ab6b65b1e1419
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Aug 21 14:04:00 2023 -0400

    Cache Hadoop config in getVolumeManagerConfiguration (#3706)
    
    Cache Hadoop Configuration in getVolumeManagerConfiguration
    so as not to call new Configuration(Configuration) frequently. Calling
    this version of the constructor causes a synchronization point across
    many threads.
    
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../accumulo/server/fs/VolumeManagerImpl.java      | 43 ++++++++++++++++------
 .../accumulo/server/fs/VolumeManagerImplTest.java  | 39 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 11 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 29e1f03d44..51ba4d4c9e 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -38,12 +38,14 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.spi.fs.VolumeChooser;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.core.volume.VolumeConfiguration;
@@ -66,6 +68,8 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
@@ -75,6 +79,9 @@ public class VolumeManagerImpl implements VolumeManager {
 
   private static final HashSet<String> WARNED_ABOUT_SYNCONCLOSE = new 
HashSet<>();
 
+  private static final Cache<Pair<Configuration,String>,Configuration> 
HDFS_CONFIGS_FOR_VOLUME =
+      Caffeine.newBuilder().expireAfterWrite(24, TimeUnit.HOURS).build();
+
   private final Map<String,Volume> volumesByName;
   private final Multimap<URI,Volume> volumesByFileSystemUri;
   private final VolumeChooser chooser;
@@ -377,27 +384,41 @@ public class VolumeManagerImpl implements VolumeManager {
    * </pre>
    *
    * We will use these properties to return a new Configuration object that 
can be used with the
-   * FileSystem URI.
+   * FileSystem URI to override properties in the original Configuration. If 
these properties are
+   * not set for a volume, then the original Configuration is returned. If 
they are set, a new
+   * Configuration is created with the overridden properties set. In either 
case, the returned
+   * Configuration is cached, to avoid unnecessary recomputation. This works 
because these override
+   * properties are instance properties and cannot change while the system is 
running.
    *
    * @param conf AccumuloConfiguration object
    * @param hadoopConf Hadoop Configuration object
    * @param filesystemURI Volume Filesystem URI
    * @return Hadoop Configuration with custom overrides for this FileSystem
    */
-  private static Configuration 
getVolumeManagerConfiguration(AccumuloConfiguration conf,
+  protected static Configuration 
getVolumeManagerConfiguration(AccumuloConfiguration conf,
       final Configuration hadoopConf, final String filesystemURI) {
 
-    final Configuration volumeConfig = new Configuration(hadoopConf);
+    final var cacheKey = new Pair<>(hadoopConf, filesystemURI);
+    return HDFS_CONFIGS_FOR_VOLUME.get(cacheKey, (key) -> {
+
+      Map<String,String> volumeHdfsConfigOverrides =
+          
conf.getAllPropertiesWithPrefixStripped(Property.INSTANCE_VOLUME_CONFIG_PREFIX).entrySet()
+              .stream().filter(e -> e.getKey().startsWith(filesystemURI + "."))
+              .collect(Collectors.toUnmodifiableMap(
+                  e -> e.getKey().substring(filesystemURI.length() + 1), 
Entry::getValue));
 
-    
conf.getAllPropertiesWithPrefixStripped(Property.INSTANCE_VOLUME_CONFIG_PREFIX).entrySet()
-        .stream().filter(e -> e.getKey().startsWith(filesystemURI + 
".")).forEach(e -> {
-          String key = e.getKey().substring(filesystemURI.length() + 1);
-          String value = e.getValue();
-          log.info("Overriding property {} for volume {}", key, value, 
filesystemURI);
-          volumeConfig.set(key, value);
-        });
+      // use the original if no overrides exist
+      if (volumeHdfsConfigOverrides.isEmpty()) {
+        return hadoopConf;
+      }
 
-    return volumeConfig;
+      Configuration volumeConfig = new Configuration(hadoopConf);
+      volumeHdfsConfigOverrides.forEach((k, v) -> {
+        log.info("Overriding property {}={} for volume {}", k, v, 
filesystemURI);
+        volumeConfig.set(k, v);
+      });
+      return volumeConfig;
+    });
   }
 
   protected static Stream<Entry<String,String>>
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
index a112a3c906..980526dc3e 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
@@ -24,7 +24,9 @@ import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -37,6 +39,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
@@ -142,6 +145,42 @@ public class VolumeManagerImplTest {
     assertEquals("20", e.getValue());
   }
 
+  @Test
+  public void testGetVolumeManagerConfiguration() throws Exception {
+
+    final ConfigurationCopy accumuloConf =
+        new ConfigurationCopy(DefaultConfiguration.getInstance());
+    final Configuration hadoopConf = new Configuration();
+    final String fileSystem = "file://127.0.0.1/vol1/";
+
+    final Configuration volumeConfig =
+        VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf, 
hadoopConf, fileSystem);
+    assertSame(volumeConfig, hadoopConf);
+
+    accumuloConf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + 
fileSystem + "."
+        + DFS_CLIENT_CACHE_DROP_BEHIND_READS, "true");
+
+    Configuration hadoopConf2 = new Configuration(hadoopConf);
+    final Configuration volumeConfig2 =
+        VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf, 
hadoopConf2, fileSystem);
+
+    assertNotSame(volumeConfig2, hadoopConf); // false because of the 
additional property
+    assertNotSame(volumeConfig, volumeConfig2); // false because of the 
additional property
+    assertEquals("true", 
volumeConfig2.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+
+    final Configuration volumeConfig3 =
+        VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf, 
volumeConfig2, fileSystem);
+    assertEquals("true", 
volumeConfig3.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+    // false because of the different Hadoop configuration input
+    assertNotSame(volumeConfig3, volumeConfig2);
+
+    final Configuration volumeConfig4 =
+        VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf, 
hadoopConf2, fileSystem);
+    assertEquals("true", 
volumeConfig4.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+    // true because of the same hadoop configuration input
+    assertSame(volumeConfig4, volumeConfig2);
+  }
+
   @Test
   public void testConfigurationOverrides() throws Exception {
 

Reply via email to