Updated Branches: refs/heads/master 4bc0a85f9 -> 95cb49043
ACCUMULO-1747 fixed, defined, and documented usage of intance.dfs.uri and instance.volumes Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/95cb4904 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/95cb4904 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/95cb4904 Branch: refs/heads/master Commit: 95cb49043ddfe31631639ccfd6bea70201e2b5e4 Parents: 4bc0a85 Author: Keith Turner <ktur...@apache.org> Authored: Wed Oct 9 21:34:52 2013 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Wed Oct 9 21:34:52 2013 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 13 ++- .../apache/accumulo/server/ServerConstants.java | 19 ++-- .../accumulo/server/fs/VolumeManagerImpl.java | 12 ++- .../server/tabletserver/InMemoryMap.java | 2 - .../server/tabletserver/MutationLog.java | 99 -------------------- 5 files changed, 29 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index b6fbdd2..3adfd4e 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -79,8 +79,14 @@ public enum Property { INSTANCE_ZK_HOST("instance.zookeeper.host", "localhost:2181", PropertyType.HOSTLIST, "Comma separated list of zookeeper servers"), INSTANCE_ZK_TIMEOUT("instance.zookeeper.timeout", "30s", PropertyType.TIMEDURATION, "Zookeeper session timeout; max value when represented as milliseconds should be no larger than " + Integer.MAX_VALUE), - INSTANCE_DFS_URI("instance.dfs.uri", "", PropertyType.URI, - "The url accumulo should use to connect to DFS. If this is empty, accumulo will obtain this information from the hadoop configuration."), + INSTANCE_DFS_URI( + "instance.dfs.uri", + "", + PropertyType.URI, + "A url accumulo should use to connect to DFS. If this is empty, accumulo will obtain this information from the hadoop configuration. This property " + + "will only be used when creating new files if instance.volumes is empty. After an upgrade to 1.6.0 Accumulo will start using absolute paths to " + + "reference files. Files created before a 1.6.0 upgrade are referenced via relative paths. Relative paths will always be resolved using this config " + + "(if empty using the hadoop config)."), INSTANCE_DFS_DIR("instance.dfs.dir", "/accumulo", PropertyType.ABSOLUTEPATH, "HDFS directory in which accumulo instance will run. Do not change after accumulo is initialized."), @Sensitive @@ -88,7 +94,8 @@ public enum Property { "A secret unique to a given instance that all servers must know in order to communicate with one another." + " Change it before initialization. To change it later use ./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], " + " and then update conf/accumulo-site.xml everywhere."), - INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING, "A list of volumes to use. By default, this will be the namenode in the hadoop configuration in the accumulo classpath."), + INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING, + "A comma seperated list of dfs uris to use. Files will be stored across these filesystems. If this is empty, then instance.dfs.uri will be used."), INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME, "The authenticator class that accumulo will use to determine if a user has privilege to perform an action"), INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", "org.apache.accumulo.server.security.handler.ZKAuthorizor", PropertyType.CLASSNAME, http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/server/src/main/java/org/apache/accumulo/server/ServerConstants.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/src/main/java/org/apache/accumulo/server/ServerConstants.java index 3765cce..7e3e2b7 100644 --- a/server/src/main/java/org/apache/accumulo/server/ServerConstants.java +++ b/server/src/main/java/org/apache/accumulo/server/ServerConstants.java @@ -39,17 +39,20 @@ public class ServerConstants { public static String[] getBaseDirs() { String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR); String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES); + String dfsUri = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI); + if (ns == null || ns.isEmpty()) { - Configuration hadoopConfig = CachedConfiguration.getInstance(); - String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace; - return new String[] {fullPath}; + if (dfsUri == null || dfsUri.isEmpty()) { + Configuration hadoopConfig = CachedConfiguration.getInstance(); + String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace; + return new String[] {fullPath}; + } else { + String fullPath = dfsUri + singleNamespace; + return new String[] {fullPath}; + } } + String namespaces[] = ns.split(","); - if (namespaces.length < 2) { - Configuration hadoopConfig = CachedConfiguration.getInstance(); - String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace; - return new String[] {fullPath}; - } return prefix(namespaces, singleNamespace); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index d6a34f6..ce19a40 100644 --- a/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.file.FileUtil; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -318,10 +319,13 @@ public class VolumeManagerImpl implements VolumeManager { public static VolumeManager get(AccumuloConfiguration conf) throws IOException { Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>(); Configuration hadoopConf = CachedConfiguration.getInstance(); - fileSystems.put(DEFAULT, FileSystem.get(hadoopConf)); - String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES); - if (ns != null) { + fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf)); + String ns = conf.get(Property.INSTANCE_VOLUMES); + if (ns != null && !ns.isEmpty()) { for (String space : ns.split(",")) { + if (space.equals(DEFAULT)) + throw new IllegalArgumentException(); + if (space.contains(":")) { fileSystems.put(space, new Path(space).getFileSystem(hadoopConf)); } else { @@ -329,7 +333,7 @@ public class VolumeManagerImpl implements VolumeManager { } } } - return new VolumeManagerImpl(fileSystems, "", conf); + return new VolumeManagerImpl(fileSystems, DEFAULT, conf); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java index a648366..85636c5 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java @@ -192,8 +192,6 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible } public class InMemoryMap { - MutationLog mutationLog; - private SimpleMap map = null; private static final Logger log = Logger.getLogger(InMemoryMap.class); http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java deleted file mode 100644 index db76b8e..0000000 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.file.FileUtil; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.server.conf.ServerConfiguration; -import org.apache.accumulo.server.data.ServerMutation; -import org.apache.accumulo.server.trace.TraceFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -public class MutationLog { - private FSDataOutputStream logout; - - public static final byte MUTATION_EVENT = 1; - public static final byte CLOSE_EVENT = 2; - - public MutationLog(Path logfile) throws IOException { - - Configuration conf = CachedConfiguration.getInstance(); - FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())); - - if (!fs.exists(logfile)) - logout = fs.create(logfile); - } - - public void log(Mutation m) throws IOException { - // write event type - logout.writeByte(MUTATION_EVENT); - - // write event - m.write(logout); - logout.flush(); - } - - public void close() throws IOException { - logout.writeByte(CLOSE_EVENT); - logout.close(); - } - - public static Iterator<Mutation> replay(Path logfile, Tablet t, long min_timestamp) throws IOException { - Configuration conf = CachedConfiguration.getInstance(); - FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())); - - final FSDataInputStream login = fs.open(logfile); - - final Mutation mutation = new ServerMutation(); - - return new Iterator<Mutation>() { - - byte eventType; - - { - eventType = login.readByte(); - } - - public boolean hasNext() { - return eventType != CLOSE_EVENT; - } - - public Mutation next() { - try { - mutation.readFields(login); - eventType = login.readByte(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - return mutation; - } - - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } -}