Updated Branches: refs/heads/master 129deb7ef -> 6991509cc
ACCUMULO-1832 require all volumes to have same instance id and version Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/11d803cf Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/11d803cf Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/11d803cf Branch: refs/heads/master Commit: 11d803cf11a76785c36e0f62989ae0d326c97b2b Parents: 40ef5d4 Author: Keith Turner <ktur...@apache.org> Authored: Fri Jan 24 00:01:56 2014 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Fri Jan 24 00:01:56 2014 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 5 +- .../impl/MiniAccumuloConfigImpl.java | 2 +- .../org/apache/accumulo/server/Accumulo.java | 10 +- .../apache/accumulo/server/ServerConstants.java | 98 +++++++++++---- .../accumulo/server/fs/VolumeManager.java | 3 - .../accumulo/server/fs/VolumeManagerImpl.java | 13 +- .../apache/accumulo/server/init/Initialize.java | 82 ++++++++++--- .../accumulo/server/ServerConstantsTest.java | 113 +++++++++++++++++ .../accumulo/server/init/InitializeTest.java | 1 + .../server/security/SystemCredentialsTest.java | 9 +- .../tserver/TabletServerSyncCheckTest.java | 5 - .../accumulo/tserver/log/MultiReaderTest.java | 2 +- .../java/org/apache/accumulo/test/VolumeIT.java | 123 +++++++++++++++++++ 13 files changed, 407 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6c4b1fe..bc4c7e1 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -118,7 +118,10 @@ public enum Property { + " Change it before initialization. To change it later use ./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], " + " and then update conf/accumulo-site.xml everywhere."), INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING, - "A comma seperated list of dfs uris to use. Files will be stored across these filesystems. If this is empty, then instance.dfs.uri will be used."), + "A comma seperated list of dfs uris to use. Files will be stored across these filesystems. If this is empty, then instance.dfs.uri will be used. " + + "After adding uris to this list, run 'accumulo init --add-volume' and then restart tservers. If entries are removed from this list then tservers " + + "will need to be restarted. After a uri is removed from the list Accumulo will not create new files in that location, however Accumulo can still " + + "reference files created at that location before the config change."), INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME, "The authenticator class that accumulo will use to determine if a user has privilege to perform an action"), INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", "org.apache.accumulo.server.security.handler.ZKAuthorizor", PropertyType.CLASSNAME, http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java index 2931aca..feb7020 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java @@ -263,7 +263,7 @@ public class MiniAccumuloConfigImpl { return libDir; } - File getConfDir() { + public File getConfDir() { return confDir; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java index 15e157d..bd78929 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java @@ -38,6 +38,7 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -63,10 +64,10 @@ public class Accumulo { } } - public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) { + public static synchronized int getAccumuloPersistentVersion(FileSystem fs, Path path) { int dataVersion; try { - FileStatus[] files = fs.getDefaultVolume().listStatus(ServerConstants.getDataVersionLocation()); + FileStatus[] files = fs.listStatus(path); if (files == null || files.length == 0) { dataVersion = -1; // assume it is 0.5 or earlier } else { @@ -78,6 +79,11 @@ public class Accumulo { } } + public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) { + Path path = ServerConstants.getDataVersionLocation(); + return getAccumuloPersistentVersion(fs.getFileSystemByPath(path), path); + } + public static void enableTracing(String address, String application) { try { DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address); http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java index 4c074d7..9d490e4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java @@ -17,11 +17,12 @@ package org.apache.accumulo.server; import java.io.IOException; +import java.util.ArrayList; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -29,6 +30,10 @@ import org.apache.hadoop.fs.Path; public class ServerConstants { + public static final String VERSION_DIR = "version"; + + public static final String INSTANCE_ID_DIR = "instance_id"; + /** * current version (3) reflects additional namespace operations (ACCUMULO-802) in version 1.6.0 <br /> * (versions should never be negative) @@ -70,27 +75,79 @@ public class ServerConstants { return defaultBaseDir; } + public static String[] getConfiguredBaseDirs() { + String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR); + String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES); + + String configuredBaseDirs[]; + + if (ns == null || ns.isEmpty()) { + configuredBaseDirs = new String[] {getDefaultBaseDir()}; + } else { + String namespaces[] = ns.split(","); + for (String namespace : namespaces) { + if (!namespace.contains(":")) { + throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace); + } + } + configuredBaseDirs = prefix(namespaces, singleNamespace); + } + + return configuredBaseDirs; + } + // these are functions to delay loading the Accumulo configuration unless we must public static synchronized String[] getBaseDirs() { if (baseDirs == null) { - String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR); - String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES); - - if (ns == null || ns.isEmpty()) { - baseDirs = new String[] {getDefaultBaseDir()}; - } else { - String namespaces[] = ns.split(","); - for (String namespace : namespaces) { - if (!namespace.contains(":")) { - throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace); - } - } - baseDirs = prefix(namespaces, singleNamespace); - } + baseDirs = checkBaseDirs(getConfiguredBaseDirs(), false); } + return baseDirs; } + + public static String[] checkBaseDirs(String[] configuredBaseDirs, boolean ignore) { + // all base dirs must have same instance id and data version, any dirs that have neither should be ignored + String firstDir = null; + String firstIid = null; + Integer firstVersion = null; + ArrayList<String> baseDirsList = new ArrayList<String>(); + for (String baseDir : configuredBaseDirs) { + Path path = new Path(baseDir, INSTANCE_ID_DIR); + String currentIid; + Integer currentVersion; + try { + currentIid = ZooUtil.getInstanceIDFromHdfs(new Path(baseDir, INSTANCE_ID_DIR)); + Path vpath = new Path(baseDir, VERSION_DIR); + currentVersion = Accumulo.getAccumuloPersistentVersion(vpath.getFileSystem(CachedConfiguration.getInstance()), vpath); + } catch (Exception e) { + if (ignore) + continue; + else + throw new IllegalArgumentException("Accumulo volume " + path + " not initialized", e); + } + + if (firstIid == null) { + firstIid = currentIid; + firstDir = baseDir; + firstVersion = currentVersion; + } else if (!currentIid.equals(firstIid)) { + throw new IllegalArgumentException("Configuration " + Property.INSTANCE_VOLUMES.getKey() + " contains paths that have different instance ids " + + baseDir + " has " + currentIid + " and " + firstDir + " has " + firstIid); + } else if (!currentVersion.equals(firstVersion)) { + throw new IllegalArgumentException("Configuration " + Property.INSTANCE_VOLUMES.getKey() + " contains paths that have different versions " + baseDir + + " has " + currentVersion + " and " + firstDir + " has " + firstVersion); + } + + baseDirsList.add(baseDir); + } + + if (baseDirsList.size() == 0) { + throw new RuntimeException("None of the configured paths are initialized."); + } + + return baseDirsList.toArray(new String[baseDirsList.size()]); + } public static String[] prefix(String bases[], String suffix) { if (suffix.startsWith("/")) @@ -123,17 +180,16 @@ public class ServerConstants { } public static Path getInstanceIdLocation() { - return new Path(getBaseDirs()[0], "instance_id"); + // all base dirs should have the same instance id, so can choose any one + return new Path(getBaseDirs()[0], INSTANCE_ID_DIR); } public static Path getDataVersionLocation() { - return new Path(getBaseDirs()[0], "version"); - } - - public static String[] getRootTableDirs() { - return prefix(getTablesDirs(), RootTable.ID); + // all base dirs should have the same version, so can choose any one + return new Path(getBaseDirs()[0], VERSION_DIR); } + public static String[] getMetadataTableDirs() { return prefix(getTablesDirs(), MetadataTable.ID); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index 00e86d3..c2c04e5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -117,9 +117,6 @@ public interface VolumeManager { // all volume are ready to provide service (not in SafeMode, for example) boolean isReady() throws IOException; - // ambiguous references to files go here - FileSystem getDefaultVolume(); - // forward to the appropriate FileSystem object FileStatus[] globStatus(Path path) throws IOException; http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 034bc92..b577891 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -61,13 +61,13 @@ public class VolumeManagerImpl implements VolumeManager { private static final Logger log = Logger.getLogger(VolumeManagerImpl.class); Map<String,? extends FileSystem> volumes; - String defaultVolume; + FileSystem defaultVolume; AccumuloConfiguration conf; VolumeChooser chooser; protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) { this.volumes = volumes; - this.defaultVolume = defaultVolume; + this.defaultVolume = volumes.get(defaultVolume); this.conf = conf; ensureSyncIsEnabled(); chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser()); @@ -284,7 +284,7 @@ public class VolumeManagerImpl implements VolumeManager { } } - return volumes.get(defaultVolume); + return defaultVolume; } @Override @@ -371,7 +371,7 @@ public class VolumeManagerImpl implements VolumeManager { if (space.contains(":")) { fileSystems.put(space, new Path(space).getFileSystem(hadoopConf)); } else { - fileSystems.put(space, FileSystem.get(hadoopConf)); + throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + space); } } } @@ -423,11 +423,6 @@ public class VolumeManagerImpl implements VolumeManager { } @Override - public FileSystem getDefaultVolume() { - return volumes.get(defaultVolume); - } - - @Override public FileStatus[] globStatus(Path pathPattern) throws IOException { return getFileSystemByPath(pathPattern).globStatus(pathPattern); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index ea2c57a..88f4dac 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -20,6 +20,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Locale; import java.util.Map.Entry; import java.util.UUID; @@ -54,6 +55,7 @@ import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -148,7 +150,7 @@ public class Initialize { else fsUri = FileSystem.getDefaultUri(conf).toString(); log.info("Hadoop Filesystem is " + fsUri); - log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs())); + log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getConfiguredBaseDirs())); log.info("Zookeeper server is " + sconf.get(Property.INSTANCE_ZK_HOST)); log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running"); if (!zookeeperAvailable()) { @@ -170,16 +172,21 @@ public class Initialize { try { if (isInitialized(fs)) { String instanceDfsDir = sconf.get(Property.INSTANCE_DFS_DIR); - log.fatal("It appears the directory " + fsUri + instanceDfsDir + " was previously initialized."); + log.fatal("It appears the directories " + Arrays.asList(ServerConstants.getConfiguredBaseDirs()) + " were previously initialized."); + String instanceVolumes = sconf.get(Property.INSTANCE_VOLUMES); String instanceDfsUri = sconf.get(Property.INSTANCE_DFS_URI); - if ("".equals(instanceDfsUri)) { - log.fatal("You are using the default URI for the filesystem. Set the property " + Property.INSTANCE_DFS_URI + " to use a different filesystem,"); - } else { + + if (!instanceVolumes.isEmpty()) { + log.fatal("Change the property " + Property.INSTANCE_VOLUMES + " to use different filesystems,"); + } else if (!instanceDfsDir.isEmpty()) { log.fatal("Change the property " + Property.INSTANCE_DFS_URI + " to use a different filesystem,"); + } else { + log.fatal("You are using the default URI for the filesystem. Set the property " + Property.INSTANCE_VOLUMES + " to use a different filesystem,"); } log.fatal("or change the property " + Property.INSTANCE_DFS_DIR + " to use a different directory."); log.fatal("The current value of " + Property.INSTANCE_DFS_URI + " is |" + instanceDfsUri + "|"); log.fatal("The current value of " + Property.INSTANCE_DFS_DIR + " is |" + instanceDfsDir + "|"); + log.fatal("The current value of " + Property.INSTANCE_VOLUMES + " is |" + instanceVolumes + "|"); return false; } } catch (IOException e) { @@ -211,7 +218,8 @@ public class Initialize { UUID uuid = UUID.randomUUID(); // the actual disk locations of the root table and tablets - final Path rootTablet = new Path(fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION); + String[] configuredTableDirs = ServerConstants.prefix(ServerConstants.getConfiguredBaseDirs(), ServerConstants.TABLE_DIR); + final Path rootTablet = new Path(fs.choose(configuredTableDirs) + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION); try { initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTablet); } catch (Exception e) { @@ -269,23 +277,32 @@ public class Initialize { return result; } + private static void initDirs(VolumeManager fs, UUID uuid, String[] baseDirs, boolean print) throws IOException { + for (String baseDir : baseDirs) { + fs.mkdirs(new Path(new Path(baseDir, ServerConstants.VERSION_DIR), "" + ServerConstants.DATA_VERSION)); + + // create an instance id + Path iidLocation = new Path(baseDir, ServerConstants.INSTANCE_ID_DIR); + fs.mkdirs(iidLocation); + fs.createNewFile(new Path(iidLocation, uuid.toString())); + if (print) + log.info("Initialized volume " + baseDir); + } + } + // TODO Remove deprecation warning suppression when Hadoop1 support is dropped @SuppressWarnings("deprecation") private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, Path rootTablet) throws IOException { FileStatus fstat; + initDirs(fs, uuid, ServerConstants.getConfiguredBaseDirs(), false); + // the actual disk locations of the metadata table and tablets final Path[] metadataTableDirs = paths(ServerConstants.getMetadataTableDirs()); String tableMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR)); String defaultMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION)); - fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + ServerConstants.DATA_VERSION)); - - // create an instance id - fs.mkdirs(ServerConstants.getInstanceIdLocation()); - fs.createNewFile(new Path(ServerConstants.getInstanceIdLocation(), uuid.toString())); - // initialize initial metadata config in zookeeper initMetadataConfig(); @@ -531,10 +548,38 @@ public class Initialize { } public static boolean isInitialized(VolumeManager fs) throws IOException { - return (fs.exists(ServerConstants.getInstanceIdLocation()) || fs.exists(ServerConstants.getDataVersionLocation())); + for (String baseDir : ServerConstants.getConfiguredBaseDirs()) { + if (fs.exists(new Path(baseDir, ServerConstants.INSTANCE_ID_DIR)) || fs.exists(new Path(baseDir, ServerConstants.VERSION_DIR))) + return true; + } + + return false; + } + + private static void addVolumes(VolumeManager fs) throws IOException { + HashSet<String> initializedDirs = new HashSet<String>(); + initializedDirs.addAll(Arrays.asList(ServerConstants.checkBaseDirs(ServerConstants.getConfiguredBaseDirs(), true))); + + HashSet<String> uinitializedDirs = new HashSet<String>(); + uinitializedDirs.addAll(Arrays.asList(ServerConstants.getConfiguredBaseDirs())); + uinitializedDirs.removeAll(initializedDirs); + + Path aBasePath = new Path(initializedDirs.iterator().next()); + Path iidPath = new Path(aBasePath, ServerConstants.INSTANCE_ID_DIR); + Path versionPath = new Path(aBasePath, ServerConstants.VERSION_DIR); + + UUID uuid = UUID.fromString(ZooUtil.getInstanceIDFromHdfs(iidPath)); + + if (ServerConstants.DATA_VERSION != Accumulo.getAccumuloPersistentVersion(versionPath.getFileSystem(CachedConfiguration.getInstance()), versionPath)) { + throw new IOException("Accumulo " + Constants.VERSION + " cannot initialize data version " + Accumulo.getAccumuloPersistentVersion(fs)); + } + + initDirs(fs, uuid, uinitializedDirs.toArray(new String[uinitializedDirs.size()]), true); } static class Opts extends Help { + @Parameter(names = "--add-volumes", description = "Initialize any uninitialized volumes listed in instance.volumes") + boolean addVolumes = false; @Parameter(names = "--reset-security", description = "just update the security information") boolean resetSecurity = false; @Parameter(names = "--clear-instance-name", description = "delete any existing instance name without prompting") @@ -565,8 +610,15 @@ public class Initialize { } else { log.fatal("Attempted to reset security on accumulo before it was initialized"); } - } else if (!doInit(opts, conf, fs)) - System.exit(-1); + } + + if (opts.addVolumes) { + addVolumes(fs); + } + + if (!opts.resetSecurity && !opts.addVolumes) + if (!doInit(opts, conf, fs)) + System.exit(-1); } catch (Exception e) { log.fatal(e, e); throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/test/java/org/apache/accumulo/server/ServerConstantsTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/ServerConstantsTest.java b/server/base/src/test/java/org/apache/accumulo/server/ServerConstantsTest.java new file mode 100644 index 0000000..a316155 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/ServerConstantsTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * + */ +public class ServerConstantsTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + @Test + public void testCheckBaseDirs() throws IOException { + String uuid1 = UUID.randomUUID().toString(); + String uuid2 = UUID.randomUUID().toString(); + + verifyAllPass(init(folder.newFolder(), Arrays.asList(uuid1), Arrays.asList(ServerConstants.DATA_VERSION))); + verifyAllPass(init(folder.newFolder(), Arrays.asList(uuid1, uuid1), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION))); + + verifyError(init(folder.newFolder(), Arrays.asList((String) null), Arrays.asList((Integer) null))); + verifyError(init(folder.newFolder(), Arrays.asList(uuid1, uuid2), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION))); + verifyError(init(folder.newFolder(), Arrays.asList(uuid1, uuid1), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION - 1))); + verifyError(init(folder.newFolder(), Arrays.asList(uuid1, uuid2), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION - 1))); + verifyError(init(folder.newFolder(), Arrays.asList(uuid1, uuid2, null), + Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION))); + + verifySomePass( + init(folder.newFolder(), Arrays.asList(uuid1, uuid1, null), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION, null)), 2); + } + + private void verifyAllPass(ArrayList<String> paths) { + Assert.assertEquals(paths, Arrays.asList(ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), true))); + Assert.assertEquals(paths, Arrays.asList(ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), false))); + } + + private void verifySomePass(ArrayList<String> paths, int numExpected) { + Assert.assertEquals(paths.subList(0, 2), Arrays.asList(ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), true))); + try { + ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), false); + Assert.fail(); + } catch (Exception e) {} + } + + private void verifyError(ArrayList<String> paths) { + try { + ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), true); + Assert.fail(); + } catch (Exception e) {} + + try { + ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), false); + Assert.fail(); + } catch (Exception e) {} + } + + private ArrayList<String> init(File newFile, List<String> uuids, List<Integer> dataVersions) throws IllegalArgumentException, IOException { + String base = newFile.toURI().toString(); + + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + + ArrayList<String> accumuloPaths = new ArrayList<String>(); + + for (int i = 0; i < uuids.size(); i++) { + String volume = "v" + i; + + String accumuloPath = base + "/" + volume + "/accumulo"; + accumuloPaths.add(accumuloPath); + + if (uuids.get(i) != null) { + fs.mkdirs(new Path(accumuloPath + "/" + ServerConstants.INSTANCE_ID_DIR)); + fs.createNewFile(new Path(accumuloPath + "/" + ServerConstants.INSTANCE_ID_DIR + "/" + uuids.get(i))); + } + + if (dataVersions.get(i) != null) { + fs.mkdirs(new Path(accumuloPath + "/" + ServerConstants.VERSION_DIR)); + fs.createNewFile(new Path(accumuloPath + "/" + ServerConstants.VERSION_DIR + "/" + dataVersions.get(i))); + } + } + + return accumuloPaths; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java b/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java index d308d06..251d859 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java @@ -94,6 +94,7 @@ public class InitializeTest { expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo"); expectLastCall().anyTimes(); expect(sconf.get(Property.INSTANCE_DFS_DIR)).andReturn("/bar"); + expect(sconf.get(Property.INSTANCE_VOLUMES)).andReturn(""); expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1"); expect(sconf.get(Property.INSTANCE_SECRET)).andReturn(Property.INSTANCE_SECRET.getDefaultValue()); replay(sconf); http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java index f29fb27..c8610d5 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.ConnectorImpl; import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.security.SystemCredentials.SystemToken; import org.junit.BeforeClass; import org.junit.Test; @@ -37,12 +38,18 @@ public class SystemCredentialsTest { @BeforeClass public static void setUp() throws IOException { - File testInstanceId = new File(new File(new File(new File("target"), "instanceTest"), "instance_id"), UUID.fromString( + File testInstanceId = new File(new File(new File(new File("target"), "instanceTest"), ServerConstants.INSTANCE_ID_DIR), UUID.fromString( "00000000-0000-0000-0000-000000000000").toString()); if (!testInstanceId.exists()) { testInstanceId.getParentFile().mkdirs(); testInstanceId.createNewFile(); } + + File testInstanceVersion = new File(new File(new File(new File("target"), "instanceTest"), ServerConstants.VERSION_DIR), ServerConstants.DATA_VERSION + ""); + if (!testInstanceVersion.exists()) { + testInstanceVersion.getParentFile().mkdirs(); + testInstanceVersion.createNewFile(); + } } /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java index 57f16b4..590945a 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java @@ -211,11 +211,6 @@ public class TabletServerSyncCheckTest { } @Override - public FileSystem getDefaultVolume() { - return null; - } - - @Override public FileStatus[] globStatus(Path path) throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java index 53fb27c..c4d3dfb 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java @@ -48,7 +48,7 @@ public class MultiReaderTest { Path root = new Path("file://" + path + "/manyMaps"); fs.mkdirs(root); fs.create(new Path(root, "finished")).close(); - FileSystem ns = fs.getDefaultVolume(); + FileSystem ns = fs.getFileSystemByPath(root); @SuppressWarnings("deprecation") Writer oddWriter = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class, BytesWritable.class); http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/test/src/test/java/org/apache/accumulo/test/VolumeIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java index 2201ad2..2f64d58 100644 --- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java +++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java @@ -19,19 +19,28 @@ package org.apache.accumulo.test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.BufferedOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.admin.DiskUsage; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -44,9 +53,16 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.init.Initialize; +import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.After; @@ -83,6 +99,7 @@ public class VolumeIT extends ConfigurableMacIT { public void configure(MiniAccumuloConfigImpl cfg) { // Run MAC on two locations in the local file system cfg.setProperty(Property.INSTANCE_DFS_URI, v1.toString()); + cfg.setProperty(Property.INSTANCE_DFS_DIR, "/accumulo"); cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString()); super.configure(cfg); } @@ -233,4 +250,110 @@ public class VolumeIT extends ConfigurableMacIT { } } + + + @Test + public void testAddVolumes() throws Exception { + + String[] tableNames = getTableNames(2); + + // grab this before shutting down cluster + String uuid = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()).getInstanceID(); + + verifyVolumesUsed(tableNames[0], v1, v2); + + Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor()); + cluster.stop(); + + Configuration conf = new Configuration(false); + conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml")); + + File v3f = new File(volDirBase, "v3"); + v3f.mkdir(); + Path v3 = new Path("file://" + v3f.getAbsolutePath()); + + conf.set(Property.INSTANCE_VOLUMES.getKey(), v1.toString() + "," + v2.toString()+","+v3.toString()); + BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml"))); + conf.writeXml(fos); + fos.close(); + + // initialize volume + Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor()); + + // check that all volumes are initialized + for (Path volumePath : Arrays.asList(v1, v2, v3)) { + FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance()); + Path vp = new Path(volumePath, "accumulo"); + Path vpi = new Path(vp, ServerConstants.INSTANCE_ID_DIR); + FileStatus[] iids = fs.listStatus(vpi); + Assert.assertEquals(1, iids.length); + Assert.assertEquals(uuid, iids[0].getPath().getName()); + } + + // start cluster and verify that new volume is used + cluster.start(); + + verifyVolumesUsed(tableNames[1], v1, v2, v3); + + } + + private void verifyVolumesUsed(String tableName, Path... paths) throws AccumuloException, AccumuloSecurityException, TableExistsException, + TableNotFoundException, + MutationsRejectedException { + TreeSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < 100; i++) { + splits.add(new Text(String.format("%06d", i * 100))); + } + + Connector conn = cluster.getConnector("root", ROOT_PASSWORD); + conn.tableOperations().create(tableName); + conn.tableOperations().addSplits(tableName, splits); + + List<String> expected = new ArrayList<String>(); + + BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + String row = String.format("%06d", i * 100 + 3); + Mutation m = new Mutation(row); + m.put("cf1", "cq1", "1"); + bw.addMutation(m); + expected.add(row + ":cf1:cq1:1"); + } + + bw.close(); + + verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY)); + + conn.tableOperations().flush(tableName, null, null, true); + + verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY)); + + String tableId = conn.tableOperations().tableIdMap().get(tableName); + Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); + + int counts[] = new int[paths.length]; + + for (Entry<Key,Value> entry : metaScanner) { + String cq = entry.getKey().getColumnQualifier().toString(); + for (int i = 0; i < paths.length; i++) { + if (cq.startsWith(paths[i].toString())) { + counts[i]++; + } + } + } + + // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes - + // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18 + + int sum = 0; + for (int count : counts) { + Assert.assertTrue(count > 0); + sum += count; + } + + Assert.assertEquals(100, sum); + } + }