Repository: accumulo Updated Branches: refs/heads/master 5eb084cb5 -> bd4d190ee
ACCUMULO-3393 Follow-on work for per-table volume chooser. * docs clean up + code guideline compliance. * ensure RandomVolumeChoosers are independent when used per-table. * make sure that per-table choosers can keep state the way that global choosers can * make sure that a chooser can only pick from the options it is presented. * reduce object creation in critical path for Tablet.split. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bd4d190e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bd4d190e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bd4d190e Branch: refs/heads/master Commit: bd4d190eee16058097f4f439ed3b21696633cf9c Parents: 5eb084c Author: Sean Busbey <bus...@cloudera.com> Authored: Tue Dec 9 15:55:06 2014 -0600 Committer: Sean Busbey <bus...@cloudera.com> Committed: Fri Dec 12 23:00:50 2014 -0600 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../server/fs/PerTableVolumeChooser.java | 47 ++++++++++-- .../server/fs/PreferredVolumeChooser.java | 75 ++++++++++++++------ .../accumulo/server/fs/RandomVolumeChooser.java | 2 +- .../accumulo/server/fs/VolumeChooser.java | 6 ++ .../accumulo/server/fs/VolumeManager.java | 1 + .../accumulo/server/fs/VolumeManagerImpl.java | 16 ++++- .../apache/accumulo/server/fs/VolumeUtil.java | 4 +- .../server/fs/VolumeManagerImplTest.java | 26 +++++++ 9 files changed, 147 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 4c2d0b4..e054a5f 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -164,6 +164,7 @@ public enum Property { GENERAL_MAX_MESSAGE_SIZE("general.server.message.size.max", "1G", PropertyType.MEMORY, "The maximum size of a message that can be sent to a server."), GENERAL_SIMPLETIMER_THREADPOOL_SIZE("general.server.simpletimer.threadpool.size", "1", PropertyType.COUNT, "The number of threads to use for " + "server-internal scheduled tasks"), + // If you update the default type, be sure to update the default used for initialization failures in VolumeManagerImpl @Experimental GENERAL_VOLUME_CHOOSER("general.volume.chooser", "org.apache.accumulo.server.fs.PerTableVolumeChooser", PropertyType.CLASSNAME, "The class that will be used to select which volume will be used to create new files."), @@ -467,6 +468,7 @@ public enum Property { TABLE_REPLICATION_TARGET("table.replication.target.", null, PropertyType.PREFIX, "Enumerate a mapping of other systems which this table should " + "replicate their data to. The key suffix is the identifying cluster name and the value is an identifier for a location on the target system, " + "e.g. the ID of the table on the target to replicate to"), + @Experimental TABLE_VOLUME_CHOOSER("table.volume.chooser", "org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME, "The class that will be used to select which volume will be used to create new files for this table."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java index 7a825c7..a579cc8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java @@ -16,23 +16,58 @@ */ package org.apache.accumulo.server.fs; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.conf.TableConfiguration; +/** + * A {@link VolumeChooser} that delegates to another volume chooser based on the presence of an experimental table + * property, {@link Property#TABLE_VOLUME_CHOOSER}. If it isn't found, defaults back to {@link RandomVolumeChooser}. + */ public class PerTableVolumeChooser implements VolumeChooser { - private static final VolumeChooser fallbackVolumeChooser = new RandomVolumeChooser(); - - public PerTableVolumeChooser() {} + private final VolumeChooser fallbackVolumeChooser = new RandomVolumeChooser(); + // TODO Add hint of expected size to construction, see ACCUMULO-3410 + /* Track VolumeChooser instances so they can keep state. */ + private final ConcurrentHashMap<String, VolumeChooser> tableSpecificChooser = new ConcurrentHashMap<String, VolumeChooser>(); + // TODO has to be lazily initialized currently because of the reliance on HdfsZooInstance. see ACCUMULO-3411 + private volatile ServerConfigurationFactory serverConfs; @Override public String choose(VolumeChooserEnvironment env, String[] options) { - VolumeChooser chooser; + VolumeChooser chooser = null; if (env.hasTableId()) { - TableConfiguration conf = new ServerConfigurationFactory(HdfsZooInstance.getInstance()).getTableConfiguration(env.getTableId()); - chooser = Property.createTableInstanceFromPropertyName(conf, Property.TABLE_VOLUME_CHOOSER, VolumeChooser.class, fallbackVolumeChooser); + // This local variable is an intentional component of the single-check idiom. + ServerConfigurationFactory localConf = serverConfs; + if (localConf == null) { + // If we're under contention when first getting here we'll throw away some initializations. + localConf = new ServerConfigurationFactory(HdfsZooInstance.getInstance()); + serverConfs = localConf; + } + final TableConfiguration tableConf = localConf.getTableConfiguration(env.getTableId()); + chooser = tableSpecificChooser.get(env.getTableId()); + if (chooser == null) { + VolumeChooser temp = Property.createTableInstanceFromPropertyName(tableConf, Property.TABLE_VOLUME_CHOOSER, VolumeChooser.class, fallbackVolumeChooser); + chooser = tableSpecificChooser.putIfAbsent(env.getTableId(), temp); + if (chooser == null) { + chooser = temp; + // Otherwise, someone else beat us to initializing; use theirs. + } + } else if (!(chooser.getClass().getName().equals(tableConf.get(Property.TABLE_VOLUME_CHOOSER)))) { + // the configuration for this table's chooser has been updated. In the case of failure to instantiate we'll repeat here next call. + // TODO stricter definition of when the updated property is used, ref ACCUMULO-3412 + VolumeChooser temp = Property.createTableInstanceFromPropertyName(tableConf, Property.TABLE_VOLUME_CHOOSER, VolumeChooser.class, fallbackVolumeChooser); + VolumeChooser last = tableSpecificChooser.replace(env.getTableId(), temp); + if (chooser.equals(last)) { + chooser = temp; + } else { + // Someone else beat us to updating; use theirs. + chooser = last; + } + } } else { chooser = fallbackVolumeChooser; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java index 7ed7bba..4ddf9bb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java @@ -16,24 +16,48 @@ */ package org.apache.accumulo.server.fs; +import static org.apache.commons.lang.ArrayUtils.EMPTY_STRING_ARRAY; + import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.accumulo.core.conf.AccumuloConfiguration.PropertyFilter; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.commons.collections.map.LRUMap; +import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; +/** + * A {@link RandomVolumeChooser} that limits its choices from a given set of options to the subset of those options preferred for a + * particular table. Defaults to selecting from all of the options presented. Can be customized via the table property + * {@value #PREFERRED_VOLUMES_CUSTOM_KEY}, which should contain a comma separated list of {@link Volume} URIs. Note that both the property + * name and the format of its value are specific to this particular implementation. + */ public class PreferredVolumeChooser extends RandomVolumeChooser implements VolumeChooser { private static final Logger log = Logger.getLogger(PreferredVolumeChooser.class); public static final String PREFERRED_VOLUMES_CUSTOM_KEY = Property.TABLE_ARBITRARY_PROP_PREFIX.getKey() + "preferredVolumes"; + // TODO ACCUMULO-3417 replace this with the ability to retrieve by String key. + private static final PropertyFilter PREFERRED_VOLUMES_FILTER = new PropertyFilter() { + @Override + public boolean accept(String key) { + return PREFERRED_VOLUMES_CUSTOM_KEY.equals(key); + } + }; - public PreferredVolumeChooser() {} + @SuppressWarnings("unchecked") + private final Map<String, Set<String>> parsedPreferredVolumes = Collections.synchronizedMap(new LRUMap(1000)); + // TODO has to be lazily initialized currently because of the reliance on HdfsZooInstance. see ACCUMULO-3411 + private volatile ServerConfigurationFactory serverConfs; @Override public String choose(VolumeChooserEnvironment env, String[] options) { @@ -41,40 +65,49 @@ public class PreferredVolumeChooser extends RandomVolumeChooser implements Volum return super.choose(env, options); // Get the current table's properties, and find the preferred volumes property - TableConfiguration config = new ServerConfigurationFactory(HdfsZooInstance.getInstance()).getTableConfiguration(env.getTableId()); - PropertyFilter filter = new PropertyFilter() { - @Override - public boolean accept(String key) { - return PREFERRED_VOLUMES_CUSTOM_KEY.equals(key); - } - }; - Map<String,String> props = new HashMap<>(); - config.getProperties(props, filter); + // This local variable is an intentional component of the single-check idiom. + ServerConfigurationFactory localConf = serverConfs; + if (localConf == null) { + // If we're under contention when first getting here we'll throw away some initializations. + localConf = new ServerConfigurationFactory(HdfsZooInstance.getInstance()); + serverConfs = localConf; + } + TableConfiguration tableConf = localConf.getTableConfiguration(env.getTableId()); + final Map<String,String> props = new HashMap<String, String>(); + tableConf.getProperties(props, PREFERRED_VOLUMES_FILTER); if (props.isEmpty()) { log.warn("No preferred volumes specified. Defaulting to randomly choosing from instance volumes"); return super.choose(env, options); } String volumes = props.get(PREFERRED_VOLUMES_CUSTOM_KEY); - log.trace("In custom chooser"); - log.trace("Volumes: " + volumes); - log.trace("TableID: " + env.getTableId()); - ArrayList<String> prefVol = new ArrayList<String>(); - // If the preferred volumes property is specified, split the returned string by the comma and add them to a preferred volumes list - prefVol.addAll(Arrays.asList(volumes.split(","))); + if (log.isTraceEnabled()) { + log.trace("In custom chooser"); + log.trace("Volumes: " + volumes); + log.trace("TableID: " + env.getTableId()); + } + // If the preferred volumes property was specified, split the returned string by the comma and add use it to filter the given options. + Set<String> preferred = parsedPreferredVolumes.get(volumes); + if (preferred == null) { + preferred = new HashSet<String>(Arrays.asList(StringUtils.split(volumes, ','))); + parsedPreferredVolumes.put(volumes, preferred); + } - // Change the given array to a List and only keep the preferred volumes that are in the given array. - prefVol.retainAll(Arrays.asList(options)); + // Only keep the options that are in the preferred set + final ArrayList<String> filteredOptions = new ArrayList<String>(Arrays.asList(options)); + filteredOptions.retainAll(preferred); // If there are no preferred volumes left, then warn the user and choose randomly from the instance volumes - if (prefVol.isEmpty()) { + if (filteredOptions.isEmpty()) { log.warn("Preferred volumes are not instance volumes. Defaulting to randomly choosing from instance volumes"); return super.choose(env, options); } // Randomly choose the volume from the preferred volumes - String choice = prefVol.get(random.nextInt(prefVol.size())); - log.trace("Choice = " + choice); + String choice = super.choose(env, filteredOptions.toArray(EMPTY_STRING_ARRAY)); + if (log.isTraceEnabled()) { + log.trace("Choice = " + choice); + } return choice; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java index f2eb211..766d4c7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java @@ -19,7 +19,7 @@ package org.apache.accumulo.server.fs; import java.util.Random; public class RandomVolumeChooser implements VolumeChooser { - protected static Random random = new Random(); + private final Random random = new Random(); @Override public String choose(VolumeChooserEnvironment env, String[] options) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java index f523057..9865512 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java @@ -16,6 +16,12 @@ */ package org.apache.accumulo.server.fs; +import org.apache.accumulo.core.volume.Volume; + +/** + * Helper used by {@link VolumeManager}s to select from a set of {@link Volume} URIs. + * N.B. implemenations must be threadsafe. VolumeChooser.equals will be used for internal caching. + */ public interface VolumeChooser { String choose(VolumeChooserEnvironment env, String[] options); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index 890651e..e2353d4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -33,6 +33,7 @@ import com.google.common.base.Optional; /** * A wrapper around multiple hadoop FileSystem objects, which are assumed to be different volumes. This also concentrates a bunch of meta-operations like * waiting for SAFE_MODE, and closing WALs. + * N.B. implementations must be thread safe. */ public interface VolumeManager { http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index dc1be73..8202d27 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.volume.NonConfiguredVolume; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.core.volume.VolumeConfiguration; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -84,7 +85,8 @@ public class VolumeManagerImpl implements VolumeManager { invertVolumesByFileSystem(volumesByName, volumesByFileSystemUri); this.conf = conf; ensureSyncIsEnabled(); - chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser()); + // Keep in sync with default type in the property definition. + chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new PerTableVolumeChooser()); } private void invertVolumesByFileSystem(Map<String,Volume> forward, Multimap<URI,Volume> inverted) { @@ -572,9 +574,19 @@ public class VolumeManagerImpl implements VolumeManager { return getVolumeByPath(dir).getFileSystem().getContentSummary(dir); } + // Only used as a fall back if the configured chooser misbehaves. + private final VolumeChooser failsafeChooser = new RandomVolumeChooser(); + @Override public String choose(Optional<String> tableId, String[] options) { - return chooser.choose(new VolumeChooserEnvironment(tableId), options); + final VolumeChooserEnvironment env = new VolumeChooserEnvironment(tableId); + final String choice = chooser.choose(env, options); + if (!(ArrayUtils.contains(options, choice))) { + log.error("The configured volume chooser, '" + chooser.getClass() + "', or one of its delegates returned a volume not in the set of options provided; " + + "will continue by relying on a RandomVolumeChooser. You should investigate and correct the named chooser."); + return failsafeChooser.choose(env, options); + } + return choice; } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 877d01c..e229209 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@ -265,8 +265,8 @@ public class VolumeUtil { throw new IllegalArgumentException("Unexpected table dir " + dir); } - Path newDir = new Path(vm.choose(Optional.of(extent.getTableId().toString()), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.TABLE_DIR + Path.SEPARATOR + dir.getParent().getName() - + Path.SEPARATOR + dir.getName()); + Path newDir = new Path(vm.choose(Optional.of(extent.getTableId().toString()), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.TABLE_DIR + + Path.SEPARATOR + dir.getParent().getName() + Path.SEPARATOR + dir.getName()); log.info("Updating directory for " + extent + " from " + dir + " to " + newDir); if (extent.isRootTablet()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd4d190e/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java index 582822a..23829d1 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java @@ -21,12 +21,16 @@ import java.util.List; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.fs.VolumeChooserEnvironment; import org.apache.accumulo.server.fs.VolumeManager.FileType; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Optional; + /** * */ @@ -91,4 +95,26 @@ public class VolumeManagerImplTest { conf.set(Property.INSTANCE_VOLUMES, "viewfs://dummy"); VolumeManagerImpl.get(conf); } + + public static class WrongVolumeChooser implements VolumeChooser { + @Override + public String choose(VolumeChooserEnvironment env, String[] options) { + return "file://totally-not-given/"; + } + } + + @SuppressWarnings("deprecation") + private static final Property INSTANCE_DFS_URI = Property.INSTANCE_DFS_URI; + + @Test + public void chooseFromOptions() throws Exception { + List<String> volumes = Arrays.asList("file://one/", "file://two/", "file://three/"); + ConfigurationCopy conf = new ConfigurationCopy(); + conf.set(INSTANCE_DFS_URI, volumes.get(0)); + conf.set(Property.INSTANCE_VOLUMES, StringUtils.join(volumes,",")); + conf.set(Property.GENERAL_VOLUME_CHOOSER, WrongVolumeChooser.class.getName()); + VolumeManager vm = VolumeManagerImpl.get(conf); + String choice = vm.choose(Optional.of("sometable"), volumes.toArray(new String[0])); + Assert.assertTrue("shouldn't see invalid options from misbehaving chooser.", volumes.contains(choice)); + } }