Updated Branches:
  refs/heads/master 4bc0a85f9 -> 95cb49043

ACCUMULO-1747 fixed, defined, and documented usage of intance.dfs.uri and 
instance.volumes


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/95cb4904
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/95cb4904
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/95cb4904

Branch: refs/heads/master
Commit: 95cb49043ddfe31631639ccfd6bea70201e2b5e4
Parents: 4bc0a85
Author: Keith Turner <ktur...@apache.org>
Authored: Wed Oct 9 21:34:52 2013 -0400
Committer: Keith Turner <ktur...@apache.org>
Committed: Wed Oct 9 21:34:52 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java | 13 ++-
 .../apache/accumulo/server/ServerConstants.java | 19 ++--
 .../accumulo/server/fs/VolumeManagerImpl.java   | 12 ++-
 .../server/tabletserver/InMemoryMap.java        |  2 -
 .../server/tabletserver/MutationLog.java        | 99 --------------------
 5 files changed, 29 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b6fbdd2..3adfd4e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -79,8 +79,14 @@ public enum Property {
   INSTANCE_ZK_HOST("instance.zookeeper.host", "localhost:2181", 
PropertyType.HOSTLIST, "Comma separated list of zookeeper servers"),
   INSTANCE_ZK_TIMEOUT("instance.zookeeper.timeout", "30s", 
PropertyType.TIMEDURATION,
       "Zookeeper session timeout; max value when represented as milliseconds 
should be no larger than " + Integer.MAX_VALUE),
-  INSTANCE_DFS_URI("instance.dfs.uri", "", PropertyType.URI,
-      "The url accumulo should use to connect to DFS.  If this is empty, 
accumulo will obtain this information from the hadoop configuration."),
+  INSTANCE_DFS_URI(
+      "instance.dfs.uri",
+      "",
+      PropertyType.URI,
+      "A url accumulo should use to connect to DFS. If this is empty, accumulo 
will obtain this information from the hadoop configuration.  This property "
+          + "will only be used when creating new files if instance.volumes is 
empty.  After an upgrade to 1.6.0 Accumulo will start using absolute paths to "
+          + "reference files.  Files created before a 1.6.0 upgrade are 
referenced via relative paths.  Relative paths will always be resolved using 
this config "
+          + "(if empty using the hadoop config)."),
   INSTANCE_DFS_DIR("instance.dfs.dir", "/accumulo", PropertyType.ABSOLUTEPATH,
       "HDFS directory in which accumulo instance will run.  Do not change 
after accumulo is initialized."),
   @Sensitive
@@ -88,7 +94,8 @@ public enum Property {
       "A secret unique to a given instance that all servers must know in order 
to communicate with one another."
           + " Change it before initialization. To change it later use 
./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], "
           + " and then update conf/accumulo-site.xml everywhere."),
-  INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING, "A list of 
volumes to use.  By default, this will be the namenode in the hadoop 
configuration in the accumulo classpath."),
+  INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING,
+      "A comma seperated list of dfs uris to use.  Files will be stored across 
these filesystems.  If this is empty, then instance.dfs.uri will be used."),
   INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", 
"org.apache.accumulo.server.security.handler.ZKAuthenticator", 
PropertyType.CLASSNAME,
       "The authenticator class that accumulo will use to determine if a user 
has privilege to perform an action"),
   INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", 
"org.apache.accumulo.server.security.handler.ZKAuthorizor", 
PropertyType.CLASSNAME,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/ServerConstants.java 
b/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
index 3765cce..7e3e2b7 100644
--- a/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
+++ b/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
@@ -39,17 +39,20 @@ public class ServerConstants {
   public static String[] getBaseDirs() {
     String singleNamespace = 
ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
     String ns = 
ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES);
+    String dfsUri = 
ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI);
+
     if (ns == null || ns.isEmpty()) {
-      Configuration hadoopConfig = CachedConfiguration.getInstance();
-      String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace;
-      return new String[] {fullPath};
+      if (dfsUri == null || dfsUri.isEmpty()) {
+        Configuration hadoopConfig = CachedConfiguration.getInstance();
+        String fullPath = hadoopConfig.get("fs.default.name") + 
singleNamespace;
+        return new String[] {fullPath};
+      } else {
+        String fullPath = dfsUri + singleNamespace;
+        return new String[] {fullPath};
+      }
     }
+
     String namespaces[] = ns.split(",");
-    if (namespaces.length < 2) {
-      Configuration hadoopConfig = CachedConfiguration.getInstance();
-      String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace;
-      return new String[] {fullPath};
-    }
     return prefix(namespaces, singleNamespace);
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java 
b/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index d6a34f6..ce19a40 100644
--- a/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -318,10 +319,13 @@ public class VolumeManagerImpl implements VolumeManager {
   public static VolumeManager get(AccumuloConfiguration conf) throws 
IOException {
     Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>();
     Configuration hadoopConf = CachedConfiguration.getInstance();
-    fileSystems.put(DEFAULT, FileSystem.get(hadoopConf));
-    String ns = 
ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES);
-    if (ns != null) {
+    fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf));
+    String ns = conf.get(Property.INSTANCE_VOLUMES);
+    if (ns != null && !ns.isEmpty()) {
       for (String space : ns.split(",")) {
+        if (space.equals(DEFAULT))
+          throw new IllegalArgumentException();
+
         if (space.contains(":")) {
           fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
         } else {
@@ -329,7 +333,7 @@ public class VolumeManagerImpl implements VolumeManager {
         }
       }
     }
-    return new VolumeManagerImpl(fileSystems, "", conf);
+    return new VolumeManagerImpl(fileSystems, DEFAULT, conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
index a648366..85636c5 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
@@ -192,8 +192,6 @@ class MemKeyConversionIterator extends WrappingIterator 
implements Interruptible
 }
 
 public class InMemoryMap {
-  MutationLog mutationLog;
-  
   private SimpleMap map = null;
   
   private static final Logger log = Logger.getLogger(InMemoryMap.class);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/95cb4904/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java
deleted file mode 100644
index db76b8e..0000000
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.data.ServerMutation;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class MutationLog {
-  private FSDataOutputStream logout;
-  
-  public static final byte MUTATION_EVENT = 1;
-  public static final byte CLOSE_EVENT = 2;
-  
-  public MutationLog(Path logfile) throws IOException {
-    
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, 
ServerConfiguration.getSiteConfiguration()));
-    
-    if (!fs.exists(logfile))
-      logout = fs.create(logfile);
-  }
-  
-  public void log(Mutation m) throws IOException {
-    // write event type
-    logout.writeByte(MUTATION_EVENT);
-    
-    // write event
-    m.write(logout);
-    logout.flush();
-  }
-  
-  public void close() throws IOException {
-    logout.writeByte(CLOSE_EVENT);
-    logout.close();
-  }
-  
-  public static Iterator<Mutation> replay(Path logfile, Tablet t, long 
min_timestamp) throws IOException {
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, 
ServerConfiguration.getSiteConfiguration()));
-    
-    final FSDataInputStream login = fs.open(logfile);
-    
-    final Mutation mutation = new ServerMutation();
-    
-    return new Iterator<Mutation>() {
-      
-      byte eventType;
-      
-      {
-        eventType = login.readByte();
-      }
-      
-      public boolean hasNext() {
-        return eventType != CLOSE_EVENT;
-      }
-      
-      public Mutation next() {
-        try {
-          mutation.readFields(login);
-          eventType = login.readByte();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-        
-        return mutation;
-      }
-      
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-}

Reply via email to