Merge branch '1.5' into 1.6

Conflicts:
        
server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
        
server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
        server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
        
server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
        server/src/main/java/org/apache/accumulo/server/master/Master.java
        server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
        
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2e5064e3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2e5064e3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2e5064e3

Branch: refs/heads/master
Commit: 2e5064e36aefa53073b42185c0ce30efba1924fb
Parents: 9255120 3bcc3c9
Author: Josh Elser <els...@apache.org>
Authored: Tue Dec 23 15:00:31 2014 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Tue Dec 23 15:00:31 2014 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/accumulo/server/init/Initialize.java  | 2 +-
 .../java/org/apache/accumulo/server/security/SecurityUtil.java     | 2 +-
 .../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java   | 2 +-
 server/master/src/main/java/org/apache/accumulo/master/Master.java | 2 +-
 .../main/java/org/apache/accumulo/master/state/SetGoalState.java   | 2 +-
 .../monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java | 2 +-
 .../src/main/java/org/apache/accumulo/tracer/TraceServer.java      | 2 +-
 .../src/main/java/org/apache/accumulo/tserver/TabletServer.java    | 2 +-
 .../java/org/apache/accumulo/tserver/log/LocalWALRecovery.java     | 2 +-
 9 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 7c4b8d3,0000000..48cf1bf
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@@ -1,587 -1,0 +1,587 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.init;
 +
 +import static com.google.common.base.Charsets.UTF_8;
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Locale;
 +import java.util.Map.Entry;
 +import java.util.UUID;
 +
 +import jline.console.ConsoleReader;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.impl.Namespaces;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.iterators.user.VersioningIterator;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.master.thrift.MasterGoalState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
- import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.ColumnFQ;
 +import org.apache.accumulo.core.volume.VolumeConfiguration;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.constraints.MetadataConstraints;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.tablets.TabletTime;
 +import org.apache.accumulo.server.util.TablePropUtil;
++import org.apache.accumulo.server.security.SecurityUtil;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.ZooDefs.Ids;
 +
 +import com.beust.jcommander.Parameter;
 +
 +/**
 + * This class is used to setup the directory structure and the root tablet to 
get an instance started
 + *
 + */
 +public class Initialize {
 +  private static final Logger log = Logger.getLogger(Initialize.class);
 +  private static final String DEFAULT_ROOT_USER = "root";
 +  public static final String TABLE_TABLETS_TABLET_DIR = "/table_info";
 +
 +  private static ConsoleReader reader = null;
 +  private static IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +
 +  private static ConsoleReader getConsoleReader() throws IOException {
 +    if (reader == null)
 +      reader = new ConsoleReader();
 +    return reader;
 +  }
 +
 +  /**
 +   * Sets this class's ZooKeeper reader/writer.
 +   *
 +   * @param izoo
 +   *          reader/writer
 +   */
 +  static void setZooReaderWriter(IZooReaderWriter izoo) {
 +    zoo = izoo;
 +  }
 +
 +  /**
 +   * Gets this class's ZooKeeper reader/writer.
 +   *
 +   * @return reader/writer
 +   */
 +  static IZooReaderWriter getZooReaderWriter() {
 +    return zoo;
 +  }
 +
 +  private static HashMap<String,String> initialMetadataConf = new 
HashMap<String,String>();
 +  static {
 +    
initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), 
"32K");
 +    initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
 +    initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true");
 +    initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
 +    initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M");
 +    initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", 
MetadataConstraints.class.getName());
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + 
"scan.vers", "10," + VersioningIterator.class.getName());
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + 
"scan.vers.opt.maxVersions", "1");
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + 
"minc.vers", "10," + VersioningIterator.class.getName());
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + 
"minc.vers.opt.maxVersions", "1");
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + 
"majc.vers", "10," + VersioningIterator.class.getName());
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + 
"majc.vers.opt.maxVersions", "1");
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + 
"majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName());
 +    initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
 +    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + 
"tablet",
 +        String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, 
TabletsSection.CurrentLocationColumnFamily.NAME));
 +    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + 
"server", String.format("%s,%s,%s,%s", TabletsSection.DataFileColumnFamily.NAME,
 +        TabletsSection.LogColumnFamily.NAME, 
TabletsSection.ServerColumnFamily.NAME, 
TabletsSection.FutureLocationColumnFamily.NAME));
 +    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), 
"tablet,server");
 +    
initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), 
"");
 +    initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), 
"true");
 +    initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), 
"true");
 +  }
 +
 +  static boolean checkInit(Configuration conf, VolumeManager fs, 
SiteConfiguration sconf) throws IOException {
 +    @SuppressWarnings("deprecation")
 +    String fsUri = sconf.get(Property.INSTANCE_DFS_URI);
 +    if (fsUri.equals(""))
 +      fsUri = FileSystem.getDefaultUri(conf).toString();
 +    log.info("Hadoop Filesystem is " + fsUri);
 +    log.info("Accumulo data dirs are " + 
Arrays.asList(VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration())));
 +    log.info("Zookeeper server is " + sconf.get(Property.INSTANCE_ZK_HOST));
 +    log.info("Checking if Zookeeper is available. If this hangs, then you 
need to make sure zookeeper is running");
 +    if (!zookeeperAvailable()) {
 +      log.fatal("Zookeeper needs to be up and running in order to init. 
Exiting ...");
 +      return false;
 +    }
 +    if 
(sconf.get(Property.INSTANCE_SECRET).equals(Property.INSTANCE_SECRET.getDefaultValue()))
 {
 +      ConsoleReader c = getConsoleReader();
 +      c.beep();
 +      c.println();
 +      c.println();
 +      c.println("Warning!!! Your instance secret is still set to the default, 
this is not secure. We highly recommend you change it.");
 +      c.println();
 +      c.println();
 +      c.println("You can change the instance secret in accumulo by using:");
 +      c.println("   bin/accumulo " + 
org.apache.accumulo.server.util.ChangeSecret.class.getName() + " oldPassword 
newPassword.");
 +      c.println("You will also need to edit your secret in your configuration 
file by adding the property instance.secret to your conf/accumulo-site.xml. 
Without this accumulo will not operate correctly");
 +    }
 +    try {
 +      if (isInitialized(fs)) {
 +        printInitializeFailureMessages(sconf);
 +        return false;
 +      }
 +    } catch (IOException e) {
 +      throw new IOException("Failed to check if filesystem already 
initialized", e);
 +    }
 +
 +    return true;
 +  }
 +
 +  static void printInitializeFailureMessages(SiteConfiguration sconf) {
 +    @SuppressWarnings("deprecation")
 +    Property INSTANCE_DFS_DIR = Property.INSTANCE_DFS_DIR;
 +    @SuppressWarnings("deprecation")
 +    Property INSTANCE_DFS_URI = Property.INSTANCE_DFS_URI;
 +    String instanceDfsDir = sconf.get(INSTANCE_DFS_DIR);
 +    log.fatal("It appears the directories " + 
Arrays.asList(VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration()))
 +        + " were previously initialized.");
 +    String instanceVolumes = sconf.get(Property.INSTANCE_VOLUMES);
 +    String instanceDfsUri = sconf.get(INSTANCE_DFS_URI);
 +
 +    if (!instanceVolumes.isEmpty()) {
 +      log.fatal("Change the property " + Property.INSTANCE_VOLUMES + " to use 
different filesystems,");
 +    } else if (!instanceDfsDir.isEmpty()) {
 +      log.fatal("Change the property " + INSTANCE_DFS_URI + " to use a 
different filesystem,");
 +    } else {
 +      log.fatal("You are using the default URI for the filesystem. Set the 
property " + Property.INSTANCE_VOLUMES + " to use a different filesystem,");
 +    }
 +    log.fatal("or change the property " + INSTANCE_DFS_DIR + " to use a 
different directory.");
 +    log.fatal("The current value of " + INSTANCE_DFS_URI + " is |" + 
instanceDfsUri + "|");
 +    log.fatal("The current value of " + INSTANCE_DFS_DIR + " is |" + 
instanceDfsDir + "|");
 +    log.fatal("The current value of " + Property.INSTANCE_VOLUMES + " is |" + 
instanceVolumes + "|");
 +  }
 +
 +  public static boolean doInit(Opts opts, Configuration conf, VolumeManager 
fs) throws IOException {
 +    if (!checkInit(conf, fs, ServerConfiguration.getSiteConfiguration())) {
 +      return false;
 +    }
 +
 +    // prompt user for instance name and root password early, in case they
 +    // abort, we don't leave an inconsistent HDFS/ZooKeeper structure
 +    String instanceNamePath;
 +    try {
 +      instanceNamePath = getInstanceNamePath(opts);
 +    } catch (Exception e) {
 +      log.fatal("Failed to talk to zookeeper", e);
 +      return false;
 +    }
 +    opts.rootpass = getRootPassword(opts);
 +    return initialize(opts, instanceNamePath, fs);
 +  }
 +
 +  public static boolean initialize(Opts opts, String instanceNamePath, 
VolumeManager fs) {
 +
 +    UUID uuid = UUID.randomUUID();
 +    // the actual disk locations of the root table and tablets
 +    String[] configuredVolumes = 
VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration());
 +    final String rootTabletDir = new Path(fs.choose(configuredVolumes) + 
Path.SEPARATOR + ServerConstants.TABLE_DIR + Path.SEPARATOR + RootTable.ID
 +        + RootTable.ROOT_TABLET_LOCATION).toString();
 +
 +    try {
 +      initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTabletDir);
 +    } catch (Exception e) {
 +      log.fatal("Failed to initialize zookeeper", e);
 +      return false;
 +    }
 +
 +    try {
 +      initFileSystem(opts, fs, uuid, rootTabletDir);
 +    } catch (Exception e) {
 +      log.fatal("Failed to initialize filesystem", e);
 +
 +      if 
(ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES).trim().equals(""))
 {
 +        Configuration fsConf = CachedConfiguration.getInstance();
 +
 +        final String defaultFsUri = "file:///";
 +        String fsDefaultName = fsConf.get("fs.default.name", defaultFsUri), 
fsDefaultFS = fsConf.get("fs.defaultFS", defaultFsUri);
 +
 +        // Try to determine when we couldn't find an appropriate 
core-site.xml on the classpath
 +        if (defaultFsUri.equals(fsDefaultName) && 
defaultFsUri.equals(fsDefaultFS)) {
 +          log.fatal("Default filesystem value ('fs.defaultFS' or 
'fs.default.name') of '" + defaultFsUri + "' was found in the Hadoop 
configuration");
 +          log.fatal("Please ensure that the Hadoop core-site.xml is on the 
classpath using 'general.classpaths' in accumulo-site.xml");
 +        }
 +      }
 +
 +      return false;
 +    }
 +
 +    try {
 +      initSecurity(opts, uuid.toString());
 +    } catch (Exception e) {
 +      log.fatal("Failed to initialize security", e);
 +      return false;
 +    }
 +    return true;
 +  }
 +
 +  private static boolean zookeeperAvailable() {
 +    try {
 +      return zoo.exists("/");
 +    } catch (KeeperException e) {
 +      return false;
 +    } catch (InterruptedException e) {
 +      return false;
 +    }
 +  }
 +
 +  private static void initDirs(VolumeManager fs, UUID uuid, String[] 
baseDirs, boolean print) throws IOException {
 +    for (String baseDir : baseDirs) {
 +      fs.mkdirs(new Path(new Path(baseDir, ServerConstants.VERSION_DIR), "" + 
ServerConstants.DATA_VERSION));
 +
 +      // create an instance id
 +      Path iidLocation = new Path(baseDir, ServerConstants.INSTANCE_ID_DIR);
 +      fs.mkdirs(iidLocation);
 +      fs.createNewFile(new Path(iidLocation, uuid.toString()));
 +      if (print)
 +        log.info("Initialized volume " + baseDir);
 +    }
 +  }
 +
 +  private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, 
String rootTabletDir) throws IOException {
 +    initDirs(fs, uuid, 
VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration()), 
false);
 +
 +    // initialize initial metadata config in zookeeper
 +    initMetadataConfig();
 +
 +    String tableMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) 
+ Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID
 +        + TABLE_TABLETS_TABLET_DIR;
 +    String defaultMetadataTabletDir = 
fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + 
Path.SEPARATOR + MetadataTable.ID
 +        + Constants.DEFAULT_TABLET_LOCATION;
 +
 +    // create table and default tablets directories
 +    createDirectories(fs, rootTabletDir, tableMetadataTabletDir, 
defaultMetadataTabletDir);
 +
 +    // populate the root tablet with info about the metadata tablets
 +    String fileName = rootTabletDir + Path.SEPARATOR + "00000_00000." + 
FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
 +    createMetadataFile(fs, fileName, MetadataTable.ID, 
tableMetadataTabletDir, defaultMetadataTabletDir);
 +  }
 +
 +  /**
 +   * Create an rfile in the default tablet's directory for a new table. This 
method is used to create the initial root tablet contents, with information 
about
 +   * the metadata table's tablets
 +   *
 +   * @param volmanager
 +   *          The VolumeManager
 +   * @param fileName
 +   *          The location to create the file
 +   * @param tableId
 +   *          TableID that is being "created"
 +   * @param tableTabletDir
 +   *          The table_info directory for the new table
 +   * @param defaultTabletDir
 +   *          The default_tablet directory for the new table
 +   */
 +  private static void createMetadataFile(VolumeManager volmanager, String 
fileName, String tableId, String tableTabletDir, String defaultTabletDir)
 +      throws IOException {
 +    FileSystem fs = volmanager.getVolumeByPath(new 
Path(fileName)).getFileSystem();
 +    FileSKVWriter tabletWriter = 
FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), 
AccumuloConfiguration.getDefaultConfiguration());
 +    tabletWriter.startDefaultLocalityGroup();
 +
 +    Text splitPoint = TabletsSection.getRange().getEndKey().getRow();
 +    createEntriesForTablet(tabletWriter, tableId, tableTabletDir, null, 
splitPoint);
 +    createEntriesForTablet(tabletWriter, tableId, defaultTabletDir, 
splitPoint, null);
 +
 +    tabletWriter.close();
 +  }
 +
 +  private static void createEntriesForTablet(FileSKVWriter writer, String 
tableId, String tabletDir, Text tabletPrevEndRow, Text tabletEndRow)
 +      throws IOException {
 +    Text extent = new Text(KeyExtent.getMetadataEntry(new Text(tableId), 
tabletEndRow));
 +    addEntry(writer, extent, DIRECTORY_COLUMN, new 
Value(tabletDir.getBytes(UTF_8)));
 +    addEntry(writer, extent, TIME_COLUMN, new 
Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8)));
 +    addEntry(writer, extent, PREV_ROW_COLUMN, 
KeyExtent.encodePrevEndRow(tabletPrevEndRow));
 +  }
 +
 +  private static void addEntry(FileSKVWriter writer, Text row, ColumnFQ col, 
Value value) throws IOException {
 +    writer.append(new Key(row, col.getColumnFamily(), 
col.getColumnQualifier(), 0), value);
 +  }
 +
 +  private static void createDirectories(VolumeManager fs, String... dirs) 
throws IOException {
 +    for (String s : dirs) {
 +      Path dir = new Path(s);
 +      try {
 +        FileStatus fstat = fs.getFileStatus(dir);
 +        // TODO Remove deprecation warning suppression when Hadoop1 support 
is dropped
 +        @SuppressWarnings("deprecation")
 +        boolean isDirectory = fstat.isDir();
 +        if (!isDirectory) {
 +          log.fatal("location " + dir + " exists but is not a directory");
 +          return;
 +        }
 +      } catch (FileNotFoundException fnfe) {
 +        // attempt to create directory, since it doesn't exist
 +        if (!fs.mkdirs(dir)) {
 +          log.fatal("unable to create directory " + dir);
 +          return;
 +        }
 +      }
 +    }
 +  }
 +
 +  private static void initZooKeeper(Opts opts, String uuid, String 
instanceNamePath, String rootTabletDir) throws KeeperException, 
InterruptedException {
 +    // setup basic data in zookeeper
 +    zoo.putPersistentData(Constants.ZROOT, new byte[0], -1, 
NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
 +    zoo.putPersistentData(Constants.ZROOT + Constants.ZINSTANCES, new 
byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
 +
 +    // setup instance name
 +    if (opts.clearInstanceName)
 +      zoo.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP);
 +    zoo.putPersistentData(instanceNamePath, uuid.getBytes(UTF_8), 
NodeExistsPolicy.FAIL);
 +
 +    final byte[] EMPTY_BYTE_ARRAY = new byte[0], ZERO_CHAR_ARRAY = new byte[] 
{'0'};
 +
 +    // setup the instance
 +    String zkInstanceRoot = Constants.ZROOT + "/" + uuid;
 +    zoo.putPersistentData(zkInstanceRoot, EMPTY_BYTE_ARRAY, 
NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, 
Constants.ZTABLES_INITIAL_ID, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZNAMESPACES, new 
byte[0], NodeExistsPolicy.FAIL);
 +    TableManager.prepareNewNamespaceState(uuid, 
Namespaces.DEFAULT_NAMESPACE_ID, Namespaces.DEFAULT_NAMESPACE, 
NodeExistsPolicy.FAIL);
 +    TableManager.prepareNewNamespaceState(uuid, 
Namespaces.ACCUMULO_NAMESPACE_ID, Namespaces.ACCUMULO_NAMESPACE, 
NodeExistsPolicy.FAIL);
 +    TableManager.prepareNewTableState(uuid, RootTable.ID, 
Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE, 
NodeExistsPolicy.FAIL);
 +    TableManager.prepareNewTableState(uuid, MetadataTable.ID, 
Namespaces.ACCUMULO_NAMESPACE_ID, MetadataTable.NAME, TableState.ONLINE, 
NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, 
rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, 
MasterGoalState.NORMAL.toString().getBytes(UTF_8), NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, EMPTY_BYTE_ARRAY, 
NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_LOCKS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, 
ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, 
ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
 +  }
 +
 +  private static String getInstanceNamePath(Opts opts) throws IOException, 
KeeperException, InterruptedException {
 +    // setup the instance name
 +    String instanceName, instanceNamePath = null;
 +    boolean exists = true;
 +    do {
 +      if (opts.cliInstanceName == null) {
 +        instanceName = getConsoleReader().readLine("Instance name : ");
 +      } else {
 +        instanceName = opts.cliInstanceName;
 +      }
 +      if (instanceName == null)
 +        System.exit(0);
 +      instanceName = instanceName.trim();
 +      if (instanceName.length() == 0)
 +        continue;
 +      instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + 
instanceName;
 +      if (opts.clearInstanceName) {
 +        exists = false;
 +        break;
 +      } else if (exists = zoo.exists(instanceNamePath)) {
 +        String decision = getConsoleReader().readLine("Instance name \"" + 
instanceName + "\" exists. Delete existing entry from zookeeper? [Y/N] : ");
 +        if (decision == null)
 +          System.exit(0);
 +        if (decision.length() == 1 && 
decision.toLowerCase(Locale.ENGLISH).charAt(0) == 'y') {
 +          opts.clearInstanceName = true;
 +          exists = false;
 +        }
 +      }
 +    } while (exists);
 +    return instanceNamePath;
 +  }
 +
 +  private static byte[] getRootPassword(Opts opts) throws IOException {
 +    if (opts.cliPassword != null) {
 +      return opts.cliPassword.getBytes(UTF_8);
 +    }
 +    String rootpass;
 +    String confirmpass;
 +    do {
 +      rootpass = getConsoleReader()
 +          .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " 
(this may not be applicable for your security setup): ", '*');
 +      if (rootpass == null)
 +        System.exit(0);
 +      confirmpass = getConsoleReader().readLine("Confirm initial password for 
" + DEFAULT_ROOT_USER + ": ", '*');
 +      if (confirmpass == null)
 +        System.exit(0);
 +      if (!rootpass.equals(confirmpass))
 +        log.error("Passwords do not match");
 +    } while (!rootpass.equals(confirmpass));
 +    return rootpass.getBytes(UTF_8);
 +  }
 +
 +  private static void initSecurity(Opts opts, String iid) throws 
AccumuloSecurityException, ThriftSecurityException {
 +    AuditedSecurityOperation.getInstance(iid, 
true).initializeSecurity(SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()),
 DEFAULT_ROOT_USER,
 +        opts.rootpass);
 +  }
 +
 +  public static void initMetadataConfig() throws IOException {
 +    try {
 +      Configuration conf = CachedConfiguration.getInstance();
 +      int max = conf.getInt("dfs.replication.max", 512);
 +      // Hadoop 0.23 switched the min value configuration name
 +      int min = Math.max(conf.getInt("dfs.replication.min", 1), 
conf.getInt("dfs.namenode.replication.min", 1));
 +      if (max < 5)
 +        setMetadataReplication(max, "max");
 +      if (min > 5)
 +        setMetadataReplication(min, "min");
 +      for (Entry<String,String> entry : initialMetadataConf.entrySet()) {
 +        if (!TablePropUtil.setTableProperty(RootTable.ID, entry.getKey(), 
entry.getValue()))
 +          throw new IOException("Cannot create per-table property " + 
entry.getKey());
 +        if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), 
entry.getValue()))
 +          throw new IOException("Cannot create per-table property " + 
entry.getKey());
 +      }
 +    } catch (Exception e) {
 +      log.fatal("error talking to zookeeper", e);
 +      throw new IOException(e);
 +    }
 +  }
 +
 +  private static void setMetadataReplication(int replication, String reason) 
throws IOException {
 +    String rep = getConsoleReader().readLine(
 +        "Your HDFS replication " + reason + " is not compatible with our 
default " + MetadataTable.NAME + " replication of 5. What do you want to set 
your "
 +            + MetadataTable.NAME + " replication to? (" + replication + ") ");
 +    if (rep == null || rep.length() == 0)
 +      rep = Integer.toString(replication);
 +    else
 +      // Lets make sure it's a number
 +      Integer.parseInt(rep);
 +    initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep);
 +  }
 +
 +  public static boolean isInitialized(VolumeManager fs) throws IOException {
 +    for (String baseDir : 
VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration())) {
 +      if (fs.exists(new Path(baseDir, ServerConstants.INSTANCE_ID_DIR)) || 
fs.exists(new Path(baseDir, ServerConstants.VERSION_DIR)))
 +        return true;
 +    }
 +
 +    return false;
 +  }
 +
 +  private static void addVolumes(VolumeManager fs) throws IOException {
 +    HashSet<String> initializedDirs = new HashSet<String>();
 +    
initializedDirs.addAll(Arrays.asList(ServerConstants.checkBaseUris(VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration()),
 true)));
 +
 +    HashSet<String> uinitializedDirs = new HashSet<String>();
 +    
uinitializedDirs.addAll(Arrays.asList(VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration())));
 +    uinitializedDirs.removeAll(initializedDirs);
 +
 +    Path aBasePath = new Path(initializedDirs.iterator().next());
 +    Path iidPath = new Path(aBasePath, ServerConstants.INSTANCE_ID_DIR);
 +    Path versionPath = new Path(aBasePath, ServerConstants.VERSION_DIR);
 +
 +    UUID uuid = UUID.fromString(ZooUtil.getInstanceIDFromHdfs(iidPath, 
ServerConfiguration.getSiteConfiguration()));
 +
 +    if (ServerConstants.DATA_VERSION != 
Accumulo.getAccumuloPersistentVersion(versionPath.getFileSystem(CachedConfiguration.getInstance()),
 versionPath)) {
 +      throw new IOException("Accumulo " + Constants.VERSION + " cannot 
initialize data version " + Accumulo.getAccumuloPersistentVersion(fs));
 +    }
 +
 +    initDirs(fs, uuid, uinitializedDirs.toArray(new 
String[uinitializedDirs.size()]), true);
 +  }
 +
 +  static class Opts extends Help {
 +    @Parameter(names = "--add-volumes", description = "Initialize any 
uninitialized volumes listed in instance.volumes")
 +    boolean addVolumes = false;
 +    @Parameter(names = "--reset-security", description = "just update the 
security information")
 +    boolean resetSecurity = false;
 +    @Parameter(names = "--clear-instance-name", description = "delete any 
existing instance name without prompting")
 +    boolean clearInstanceName = false;
 +    @Parameter(names = "--instance-name", description = "the instance name, 
if not provided, will prompt")
 +    String cliInstanceName;
 +    @Parameter(names = "--password", description = "set the password on the 
command line")
 +    String cliPassword;
 +
 +    byte[] rootpass = null;
 +  }
 +
 +  public static void main(String[] args) {
 +    Opts opts = new Opts();
 +    opts.parseArgs(Initialize.class.getName(), args);
 +
 +    try {
 +      AccumuloConfiguration acuConf = 
ServerConfiguration.getSiteConfiguration();
 +      SecurityUtil.serverLogin(acuConf);
 +      Configuration conf = CachedConfiguration.getInstance();
 +
 +      VolumeManager fs = VolumeManagerImpl.get(acuConf);
 +
 +      if (opts.resetSecurity) {
 +        if (isInitialized(fs)) {
 +          opts.rootpass = getRootPassword(opts);
 +          initSecurity(opts, HdfsZooInstance.getInstance().getInstanceID());
 +        } else {
 +          log.fatal("Attempted to reset security on accumulo before it was 
initialized");
 +        }
 +      }
 +
 +      if (opts.addVolumes) {
 +        addVolumes(fs);
 +      }
 +
 +      if (!opts.resetSecurity && !opts.addVolumes)
 +        if (!doInit(opts, conf, fs))
 +          System.exit(-1);
 +    } catch (Exception e) {
 +      log.fatal(e, e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
----------------------------------------------------------------------
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
index 684efc3,0000000..29e4939
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
@@@ -1,83 -1,0 +1,83 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
- package org.apache.accumulo.core.security;
++package org.apache.accumulo.server.security;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * 
 + */
 +public class SecurityUtil {
 +  private static final Logger log = Logger.getLogger(SecurityUtil.class);
 +  public static boolean usingKerberos = false;
 +
 +  /**
 +   * This method is for logging a server in kerberos. If this is used in 
client code, it will fail unless run as the accumulo keytab's owner. Instead, 
use
 +   * {@link #login(String, String)}
 +   */
 +  public static void serverLogin(AccumuloConfiguration acuConf) {
 +    String keyTab = acuConf.getPath(Property.GENERAL_KERBEROS_KEYTAB);
 +    if (keyTab == null || keyTab.length() == 0)
 +      return;
 +    
 +    usingKerberos = true;
 +    
 +    String principalConfig = acuConf.get(Property.GENERAL_KERBEROS_PRINCIPAL);
 +    if (principalConfig == null || principalConfig.length() == 0)
 +      return;
 +    
 +    if (login(principalConfig, keyTab)) {
 +      try {
 +        // This spawns a thread to periodically renew the logged in 
(accumulo) user
 +        UserGroupInformation.getLoginUser();
 +        return;
 +      } catch (IOException io) {
 +        log.error("Error starting up renewal thread. This shouldn't be 
happenining.", io);
 +      }
 +    }
 +
 +    throw new RuntimeException("Failed to perform Kerberos login for " + 
principalConfig + " using  " + keyTab);
 +  }
 +  
 +  /**
 +   * This will log in the given user in kerberos.
 +   * 
 +   * @param principalConfig
 +   *          This is the principals name in the format NAME/HOST@REALM. 
{@link org.apache.hadoop.security.SecurityUtil#HOSTNAME_PATTERN} will 
automatically be
 +   *          replaced by the systems host name.
 +   * @return true if login succeeded, otherwise false
 +   */
 +  public static boolean login(String principalConfig, String keyTabPath) {
 +    try {
 +      String principalName = 
org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principalConfig, 
InetAddress.getLocalHost().getCanonicalHostName());
 +      if (keyTabPath != null && principalName != null && keyTabPath.length() 
!= 0 && principalName.length() != 0) {
 +        UserGroupInformation.loginUserFromKeytab(principalName, keyTabPath);
 +        log.info("Succesfully logged in as user " + principalConfig);
 +        return true;
 +      }
 +    } catch (IOException io) {
 +      log.error("Error logging in user " + principalConfig + " using keytab 
at " + keyTabPath, io);
 +    }
 +    return false;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 4d3721e,0000000..8761480
mode 100644,000000..100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -1,763 -1,0 +1,763 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.gc;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.net.UnknownHostException;
 +import java.util.ArrayList;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
 +import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor;
 +import org.apache.accumulo.core.gc.thrift.GCStatus;
 +import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.Credentials;
- import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.NamingThreadFactory;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.ServerServices;
 +import org.apache.accumulo.core.util.ServerServices.Service;
 +import org.apache.accumulo.core.util.SslConnectionParams;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.volume.Volume;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManager.FileType;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.fs.VolumeUtil;
++import org.apache.accumulo.server.security.SecurityUtil;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.RpcWrapper;
 +import org.apache.accumulo.server.util.TServerUtils;
 +import org.apache.accumulo.server.util.TabletIterator;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.trace.instrument.CountSampler;
 +import org.apache.accumulo.trace.instrument.Sampler;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +
 +import com.beust.jcommander.Parameter;
 +import com.google.common.base.Function;
 +import com.google.common.collect.Iterators;
 +import com.google.common.net.HostAndPort;
 +
 +public class SimpleGarbageCollector implements Iface {
 +  private static final Text EMPTY_TEXT = new Text();
 +
 +  /**
 +   * Options for the garbage collector.
 +   */
 +  static class Opts extends ServerOpts {
 +    @Parameter(names = {"-v", "--verbose"}, description = "extra information 
will get printed to stdout also")
 +    boolean verbose = false;
 +    @Parameter(names = {"-s", "--safemode"}, description = "safe mode will 
not delete files")
 +    boolean safeMode = false;
 +  }
 +
 +  /**
 +   * A fraction representing how much of the JVM's available memory should be 
used for gathering candidates.
 +   */
 +  static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f;
 +
 +  private static final Logger log = 
Logger.getLogger(SimpleGarbageCollector.class);
 +
 +  private Credentials credentials;
 +  private VolumeManager fs;
 +  private AccumuloConfiguration config;
 +  private Opts opts = new Opts();
 +  private ZooLock lock;
 +
 +  private GCStatus status = new GCStatus(new GcCycleStats(), new 
GcCycleStats(), new GcCycleStats(), new GcCycleStats());
 +
 +  private Instance instance;
 +
 +  public static void main(String[] args) throws UnknownHostException, 
IOException {
 +    SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 +    final String app = "gc";
 +    Accumulo.setupLogging(app);
 +    Instance instance = HdfsZooInstance.getInstance();
 +    ServerConfiguration conf = new ServerConfiguration(instance);
 +    final VolumeManager fs = VolumeManagerImpl.get();
 +    Accumulo.init(fs, conf, app);
 +    Opts opts = new Opts();
 +    opts.parseArgs(app, args);
 +    SimpleGarbageCollector gc = new SimpleGarbageCollector(opts);
 +    AccumuloConfiguration config = conf.getConfiguration();
 +
 +    gc.init(fs, instance, SystemCredentials.get(), config);
 +    Accumulo.enableTracing(opts.getAddress(), app);
 +    gc.run();
 +  }
 +
 +  /**
 +   * Creates a new garbage collector.
 +   *
 +   * @param opts
 +   *          options
 +   */
 +  public SimpleGarbageCollector(Opts opts) {
 +    this.opts = opts;
 +  }
 +
 +  /**
 +   * Gets the credentials used by this GC.
 +   *
 +   * @return credentials
 +   */
 +  Credentials getCredentials() {
 +    return credentials;
 +  }
 +
 +  /**
 +   * Gets the delay before the first collection.
 +   *
 +   * @return start delay, in milliseconds
 +   */
 +  long getStartDelay() {
 +    return config.getTimeInMillis(Property.GC_CYCLE_START);
 +  }
 +
 +  /**
 +   * Gets the volume manager used by this GC.
 +   *
 +   * @return volume manager
 +   */
 +  VolumeManager getVolumeManager() {
 +    return fs;
 +  }
 +
 +  /**
 +   * Checks if the volume manager should move files to the trash rather than 
delete them.
 +   *
 +   * @return true if trash is used
 +   */
 +  boolean isUsingTrash() {
 +    return !config.getBoolean(Property.GC_TRASH_IGNORE);
 +  }
 +
 +  /**
 +   * Gets the options for this garbage collector.
 +   */
 +  Opts getOpts() {
 +    return opts;
 +  }
 +
 +  /**
 +   * Gets the number of threads used for deleting files.
 +   *
 +   * @return number of delete threads
 +   */
 +  int getNumDeleteThreads() {
 +    return config.getCount(Property.GC_DELETE_THREADS);
 +  }
 +
 +  /**
 +   * Gets the instance used by this GC.
 +   *
 +   * @return instance
 +   */
 +  Instance getInstance() {
 +    return instance;
 +  }
 +
 +  /**
 +   * Should files be archived (as opposed to preserved in trash)
 +   *
 +   * @return True if files should be archived, false otherwise
 +   */
 +  boolean shouldArchiveFiles() {
 +    return config.getBoolean(Property.GC_FILE_ARCHIVE);
 +  }
 +
 +  /**
 +   * Initializes this garbage collector.
 +   *
 +   * @param fs
 +   *          volume manager
 +   * @param instance
 +   *          instance
 +   * @param credentials
 +   *          credentials
 +   * @param config
 +   *          system configuration
 +   */
 +  public void init(VolumeManager fs, Instance instance, Credentials 
credentials, AccumuloConfiguration config) {
 +    this.fs = fs;
 +    this.credentials = credentials;
 +    this.instance = instance;
 +    this.config = config;
 +    long gcDelay = config.getTimeInMillis(Property.GC_CYCLE_DELAY);
 +    log.info("start delay: " + getStartDelay() + " milliseconds");
 +    log.info("time delay: " + gcDelay + " milliseconds");
 +    log.info("safemode: " + opts.safeMode);
 +    log.info("verbose: " + opts.verbose);
 +    log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + 
Runtime.getRuntime().maxMemory() + " bytes");
 +    log.info("delete threads: " + getNumDeleteThreads());
 +  }
 +
 +  private class GCEnv implements GarbageCollectionEnvironment {
 +
 +    private String tableName;
 +
 +    GCEnv(String tableName) {
 +      this.tableName = tableName;
 +    }
 +
 +    @Override
 +    public List<String> getCandidates(String continuePoint) throws 
TableNotFoundException, AccumuloException, AccumuloSecurityException {
 +      // want to ensure GC makes progress... if the 1st N deletes are stable 
and we keep processing them,
 +      // then will never inspect deletes after N
 +      Range range = MetadataSchema.DeletesSection.getRange();
 +      if (continuePoint != null && !continuePoint.isEmpty()) {
 +        String continueRow = MetadataSchema.DeletesSection.getRowPrefix() + 
continuePoint;
 +        range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), 
true, range.getEndKey(), range.isEndKeyInclusive());
 +      }
 +
 +      Scanner scanner = instance.getConnector(credentials.getPrincipal(), 
credentials.getToken()).createScanner(tableName, Authorizations.EMPTY);
 +      scanner.setRange(range);
 +      List<String> result = new ArrayList<String>();
 +      // find candidates for deletion; chop off the prefix
 +      for (Entry<Key,Value> entry : scanner) {
 +        String cand = 
entry.getKey().getRow().toString().substring(MetadataSchema.DeletesSection.getRowPrefix().length());
 +        result.add(cand);
 +        if (almostOutOfMemory(Runtime.getRuntime())) {
 +          log.info("List of delete candidates has exceeded the memory 
threshold. Attempting to delete what has been gathered so far.");
 +          break;
 +        }
 +      }
 +
 +      return result;
 +
 +    }
 +
 +    @Override
 +    public Iterator<String> getBlipIterator() throws TableNotFoundException, 
AccumuloException, AccumuloSecurityException {
 +      IsolatedScanner scanner = new 
IsolatedScanner(instance.getConnector(credentials.getPrincipal(), 
credentials.getToken()).createScanner(tableName,
 +          Authorizations.EMPTY));
 +
 +      scanner.setRange(MetadataSchema.BlipSection.getRange());
 +
 +      return Iterators.transform(scanner.iterator(), new 
Function<Entry<Key,Value>,String>() {
 +        @Override
 +        public String apply(Entry<Key,Value> entry) {
 +          return 
entry.getKey().getRow().toString().substring(MetadataSchema.BlipSection.getRowPrefix().length());
 +        }
 +      });
 +    }
 +
 +    @Override
 +    public Iterator<Entry<Key,Value>> getReferenceIterator() throws 
TableNotFoundException, AccumuloException, AccumuloSecurityException {
 +      IsolatedScanner scanner = new 
IsolatedScanner(instance.getConnector(credentials.getPrincipal(), 
credentials.getToken()).createScanner(tableName,
 +          Authorizations.EMPTY));
 +      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
 +      scanner.fetchColumnFamily(ScanFileColumnFamily.NAME);
 +      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
 +      TabletIterator tabletIterator = new TabletIterator(scanner, 
MetadataSchema.TabletsSection.getRange(), false, true);
 +
 +      return Iterators.concat(Iterators.transform(tabletIterator, new 
Function<Map<Key,Value>,Iterator<Entry<Key,Value>>>() {
 +        @Override
 +        public Iterator<Entry<Key,Value>> apply(Map<Key,Value> input) {
 +          return input.entrySet().iterator();
 +        }
 +      }));
 +    }
 +
 +    @Override
 +    public Set<String> getTableIDs() {
 +      return Tables.getIdToNameMap(instance).keySet();
 +    }
 +
 +    @Override
 +    public void delete(SortedMap<String,String> confirmedDeletes) throws 
IOException, AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
 +
 +      if (opts.safeMode) {
 +        if (opts.verbose)
 +          System.out.println("SAFEMODE: There are " + confirmedDeletes.size() 
+ " data file candidates marked for deletion.%n"
 +              + "          Examine the log files to identify them.%n");
 +        log.info("SAFEMODE: Listing all data file candidates for deletion");
 +        for (String s : confirmedDeletes.values())
 +          log.info("SAFEMODE: " + s);
 +        log.info("SAFEMODE: End candidates for deletion");
 +        return;
 +      }
 +
 +      Connector c = 
instance.getConnector(SystemCredentials.get().getPrincipal(), 
SystemCredentials.get().getToken());
 +      BatchWriter writer = c.createBatchWriter(tableName, new 
BatchWriterConfig());
 +
 +      // when deleting a dir and all files in that dir, only need to delete 
the dir
 +      // the dir will sort right before the files... so remove the files in 
this case
 +      // to minimize namenode ops
 +      Iterator<Entry<String,String>> cdIter = 
confirmedDeletes.entrySet().iterator();
 +
 +      String lastDir = null;
 +      while (cdIter.hasNext()) {
 +        Entry<String,String> entry = cdIter.next();
 +        String relPath = entry.getKey();
 +        String absPath = fs.getFullPath(FileType.TABLE, 
entry.getValue()).toString();
 +
 +        if (isDir(relPath)) {
 +          lastDir = absPath;
 +        } else if (lastDir != null) {
 +          if (absPath.startsWith(lastDir)) {
 +            log.debug("Ignoring " + entry.getValue() + " because " + lastDir 
+ " exist");
 +            try {
 +              putMarkerDeleteMutation(entry.getValue(), writer);
 +            } catch (MutationsRejectedException e) {
 +              throw new RuntimeException(e);
 +            }
 +            cdIter.remove();
 +          } else {
 +            lastDir = null;
 +          }
 +        }
 +      }
 +
 +      final BatchWriter finalWriter = writer;
 +
 +      ExecutorService deleteThreadPool = 
Executors.newFixedThreadPool(getNumDeleteThreads(), new 
NamingThreadFactory("deleting"));
 +
 +      final List<Pair<Path,Path>> replacements = 
ServerConstants.getVolumeReplacements();
 +
 +      for (final String delete : confirmedDeletes.values()) {
 +
 +        Runnable deleteTask = new Runnable() {
 +          @Override
 +          public void run() {
 +            boolean removeFlag;
 +
 +            try {
 +              Path fullPath;
 +              String switchedDelete = VolumeUtil.switchVolume(delete, 
FileType.TABLE, replacements);
 +              if (switchedDelete != null) {
 +                // actually replacing the volumes in the metadata table would 
be tricky because the entries would be different rows. So it could not be
 +                // atomically in one mutation and extreme care would need to 
be taken that delete entry was not lost. Instead of doing that, just deal with
 +                // volume switching when something needs to be deleted. Since 
the rest of the code uses suffixes to compare delete entries, there is no danger
 +                // of deleting something that should not be deleted. Must not 
change value of delete variable because thats whats stored in metadata table.
 +                log.debug("Volume replaced " + delete + " -> " + 
switchedDelete);
 +                fullPath = fs.getFullPath(FileType.TABLE, switchedDelete);
 +              } else {
 +                fullPath = fs.getFullPath(FileType.TABLE, delete);
 +              }
 +
 +              log.debug("Deleting " + fullPath);
 +
 +              if (archiveOrMoveToTrash(fullPath) || 
fs.deleteRecursively(fullPath)) {
 +                // delete succeeded, still want to delete
 +                removeFlag = true;
 +                synchronized (SimpleGarbageCollector.this) {
 +                  ++status.current.deleted;
 +                }
 +              } else if (fs.exists(fullPath)) {
 +                // leave the entry in the metadata; we'll try again later
 +                removeFlag = false;
 +                synchronized (SimpleGarbageCollector.this) {
 +                  ++status.current.errors;
 +                }
 +                log.warn("File exists, but was not deleted for an unknown 
reason: " + fullPath);
 +              } else {
 +                // this failure, we still want to remove the metadata entry
 +                removeFlag = true;
 +                synchronized (SimpleGarbageCollector.this) {
 +                  ++status.current.errors;
 +                }
 +                String parts[] = 
fullPath.toString().split(Constants.ZTABLES)[1].split("/");
 +                if (parts.length > 2) {
 +                  String tableId = parts[1];
 +                  String tabletDir = parts[2];
 +                  TableManager.getInstance().updateTableStateCache(tableId);
 +                  TableState tableState = 
TableManager.getInstance().getTableState(tableId);
 +                  if (tableState != null && tableState != 
TableState.DELETING) {
 +                    // clone directories don't always exist
 +                    if (!tabletDir.startsWith(Constants.CLONE_PREFIX))
 +                      log.debug("File doesn't exist: " + fullPath);
 +                  }
 +                } else {
 +                  log.warn("Very strange path name: " + delete);
 +                }
 +              }
 +
 +              // proceed to clearing out the flags for successful deletes and
 +              // non-existent files
 +              if (removeFlag && finalWriter != null) {
 +                putMarkerDeleteMutation(delete, finalWriter);
 +              }
 +            } catch (Exception e) {
 +              log.error(e, e);
 +            }
 +
 +          }
 +
 +        };
 +
 +        deleteThreadPool.execute(deleteTask);
 +      }
 +
 +      deleteThreadPool.shutdown();
 +
 +      try {
 +        while (!deleteThreadPool.awaitTermination(1000, 
TimeUnit.MILLISECONDS)) {}
 +      } catch (InterruptedException e1) {
 +        log.error(e1, e1);
 +      }
 +
 +      if (writer != null) {
 +        try {
 +          writer.close();
 +        } catch (MutationsRejectedException e) {
 +          log.error("Problem removing entries from the metadata table: ", e);
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public void deleteTableDirIfEmpty(String tableID) throws IOException {
 +      // if dir exist and is empty, then empty list is returned...
 +      // hadoop 1.0 will return null if the file doesn't exist
 +      // hadoop 2.0 will throw an exception if the file does not exist
 +      for (String dir : ServerConstants.getTablesDirs()) {
 +        FileStatus[] tabletDirs = null;
 +        try {
 +          tabletDirs = fs.listStatus(new Path(dir + "/" + tableID));
 +        } catch (FileNotFoundException ex) {
 +          // ignored
 +        }
 +        if (tabletDirs == null)
 +          continue;
 +
 +        if (tabletDirs.length == 0) {
 +          Path p = new Path(dir + "/" + tableID);
 +          log.debug("Removing table dir " + p);
 +          if (!archiveOrMoveToTrash(p))
 +            fs.delete(p);
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public void incrementCandidatesStat(long i) {
 +      status.current.candidates += i;
 +    }
 +
 +    @Override
 +    public void incrementInUseStat(long i) {
 +      status.current.inUse += i;
 +    }
 +
 +  }
 +
 +  private void run() {
 +    long tStart, tStop;
 +
 +    // Sleep for an initial period, giving the master time to start up and
 +    // old data files to be unused
 +    log.info("Trying to acquire ZooKeeper lock for garbage collector");
 +
 +    try {
 +      getZooLock(startStatsService());
 +    } catch (Exception ex) {
 +      log.error(ex, ex);
 +      System.exit(1);
 +    }
 +
 +    try {
 +      long delay = getStartDelay();
 +      log.debug("Sleeping for " + delay + " milliseconds before beginning 
garbage collection cycles");
 +      Thread.sleep(delay);
 +    } catch (InterruptedException e) {
 +      log.warn(e, e);
 +      return;
 +    }
 +
 +    Sampler sampler = new CountSampler(100);
 +
 +    while (true) {
 +      if (sampler.next())
 +        Trace.on("gc");
 +
 +      Span gcSpan = Trace.start("loop");
 +      tStart = System.currentTimeMillis();
 +      try {
 +        System.gc(); // make room
 +
 +        status.current.started = System.currentTimeMillis();
 +
 +        new GarbageCollectionAlgorithm().collect(new GCEnv(RootTable.NAME));
 +        new GarbageCollectionAlgorithm().collect(new 
GCEnv(MetadataTable.NAME));
 +
 +        log.info("Number of data file candidates for deletion: " + 
status.current.candidates);
 +        log.info("Number of data file candidates still in use: " + 
status.current.inUse);
 +        log.info("Number of successfully deleted data files: " + 
status.current.deleted);
 +        log.info("Number of data files delete failures: " + 
status.current.errors);
 +
 +        status.current.finished = System.currentTimeMillis();
 +        status.last = status.current;
 +        status.current = new GcCycleStats();
 +
 +      } catch (Exception e) {
 +        log.error(e, e);
 +      }
 +
 +      tStop = System.currentTimeMillis();
 +      log.info(String.format("Collect cycle took %.2f seconds", ((tStop - 
tStart) / 1000.0)));
 +
 +      // Clean up any unused write-ahead logs
 +      Span waLogs = Trace.start("walogs");
 +      try {
 +        GarbageCollectWriteAheadLogs walogCollector = new 
GarbageCollectWriteAheadLogs(instance, fs, isUsingTrash());
 +        log.info("Beginning garbage collection of write-ahead logs");
 +        walogCollector.collect(status);
 +      } catch (Exception e) {
 +        log.error(e, e);
 +      } finally {
 +        waLogs.stop();
 +      }
 +      gcSpan.stop();
 +
 +      // we just made a lot of metadata changes: flush them out
 +      try {
 +        Connector connector = 
instance.getConnector(credentials.getPrincipal(), credentials.getToken());
 +        connector.tableOperations().compact(MetadataTable.NAME, null, null, 
true, true);
 +        connector.tableOperations().compact(RootTable.NAME, null, null, true, 
true);
 +      } catch (Exception e) {
 +        log.warn(e, e);
 +      }
 +
 +      Trace.offNoFlush();
 +      try {
 +        long gcDelay = config.getTimeInMillis(Property.GC_CYCLE_DELAY);
 +        log.debug("Sleeping for " + gcDelay + " milliseconds");
 +        Thread.sleep(gcDelay);
 +      } catch (InterruptedException e) {
 +        log.warn(e, e);
 +        return;
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Moves a file to trash. If this garbage collector is not using trash, 
this method returns false and leaves the file alone. If the file is missing, 
this
 +   * method returns false as opposed to throwing an exception.
 +   *
 +   * @return true if the file was moved to trash
 +   * @throws IOException
 +   *           if the volume manager encountered a problem
 +   */
 +  boolean archiveOrMoveToTrash(Path path) throws IOException {
 +    if (shouldArchiveFiles()) {
 +      return archiveFile(path);
 +    } else {
 +      if (!isUsingTrash())
 +        return false;
 +      try {
 +        return fs.moveToTrash(path);
 +      } catch (FileNotFoundException ex) {
 +        return false;
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Move a file, that would otherwise be deleted, to the archive directory 
for files
 +   *
 +   * @param fileToArchive
 +   *          Path to file that is to be archived
 +   * @return True if the file was successfully moved to the file archive 
directory, false otherwise
 +   */
 +  boolean archiveFile(Path fileToArchive) throws IOException {
 +    // Figure out what the base path this volume uses on this FileSystem
 +    Volume sourceVolume = fs.getVolumeByPath(fileToArchive);
 +    String sourceVolumeBasePath = sourceVolume.getBasePath();
 +
 +    log.debug("Base path for volume: " + sourceVolumeBasePath);
 +
 +    // Get the path for the file we want to archive
 +    String sourcePathBasePath = fileToArchive.toUri().getPath();
 +
 +    // Strip off the common base path for the file to archive
 +    String relativeVolumePath = 
sourcePathBasePath.substring(sourceVolumeBasePath.length());
 +    if (Path.SEPARATOR_CHAR == relativeVolumePath.charAt(0)) {
 +      if (relativeVolumePath.length() > 1) {
 +        relativeVolumePath = relativeVolumePath.substring(1);
 +      } else {
 +        relativeVolumePath = "";
 +      }
 +    }
 +
 +    log.debug("Computed relative path for file to archive: " + 
relativeVolumePath);
 +
 +    // The file archive path on this volume (we can't archive this file to a 
different volume)
 +    Path archivePath = new Path(sourceVolumeBasePath, 
ServerConstants.FILE_ARCHIVE_DIR);
 +
 +    log.debug("File archive path: " + archivePath);
 +
 +    fs.mkdirs(archivePath);
 +
 +    // Preserve the path beneath the Volume's base directory (e.g. 
tables/1/A_0000001.rf)
 +    Path fileArchivePath = new Path(archivePath, relativeVolumePath);
 +
 +    log.debug("Create full path of " + fileArchivePath + " from " + 
archivePath + " and " + relativeVolumePath);
 +
 +    // Make sure that it doesn't already exist, something is wrong.
 +    if (fs.exists(fileArchivePath)) {
 +      log.warn("Tried to archive file, but it already exists: " + 
fileArchivePath);
 +      return false;
 +    }
 +
 +    log.debug("Moving " + fileToArchive + " to " + fileArchivePath);
 +    return fs.rename(fileToArchive, fileArchivePath);
 +  }
 +
 +  private void getZooLock(HostAndPort addr) throws KeeperException, 
InterruptedException {
 +    String path = ZooUtil.getRoot(instance) + Constants.ZGC_LOCK;
 +
 +    LockWatcher lockWatcher = new LockWatcher() {
 +      @Override
 +      public void lostLock(LockLossReason reason) {
 +        Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), 
exiting!");
 +      }
 +
 +      @Override
 +      public void unableToMonitorLockNode(final Throwable e) {
 +        Halt.halt(-1, new Runnable() {
 +
 +          @Override
 +          public void run() {
 +            log.fatal("No longer able to monitor lock node ", e);
 +          }
 +        });
 +
 +      }
 +    };
 +
 +    while (true) {
 +      lock = new ZooLock(path);
 +      if (lock.tryLock(lockWatcher, new ServerServices(addr.toString(), 
Service.GC_CLIENT).toString().getBytes())) {
 +        break;
 +      }
 +      UtilWaitThread.sleep(1000);
 +    }
 +  }
 +
 +  private HostAndPort startStatsService() throws UnknownHostException {
 +    Processor<Iface> processor = new 
Processor<Iface>(RpcWrapper.service(this));
 +    int port = config.getPort(Property.GC_PORT);
 +    long maxMessageSize = 
config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
 +    HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
 +    log.debug("Starting garbage collector listening on " + result);
 +    try {
 +      return TServerUtils.startTServer(result, processor, 
this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize,
 +          SslConnectionParams.forServer(config), 0).address;
 +    } catch (Exception ex) {
 +      log.fatal(ex, ex);
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +
 +  /**
 +   * Checks if the system is almost out of memory.
 +   *
 +   * @param runtime
 +   *          Java runtime
 +   * @return true if system is almost out of memory
 +   * @see #CANDIDATE_MEMORY_PERCENTAGE
 +   */
 +  static boolean almostOutOfMemory(Runtime runtime) {
 +    return runtime.totalMemory() - runtime.freeMemory() > 
CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory();
 +  }
 +
 +  final static String METADATA_TABLE_DIR = "/" + MetadataTable.ID;
 +
 +  private static void putMarkerDeleteMutation(final String delete, final 
BatchWriter writer) throws MutationsRejectedException {
 +    Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + 
delete);
 +    m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
 +    writer.addMutation(m);
 +  }
 +
 +  /**
 +   * Checks if the given string is a directory.
 +   *
 +   * @param delete
 +   *          possible directory
 +   * @return true if string is a directory
 +   */
 +  static boolean isDir(String delete) {
 +    if (delete == null) {
 +      return false;
 +    }
 +    int slashCount = 0;
 +    for (int i = 0; i < delete.length(); i++)
 +      if (delete.charAt(i) == '/')
 +        slashCount++;
 +    return slashCount == 1;
 +  }
 +
 +  @Override
 +  public GCStatus getStatus(TInfo info, TCredentials credentials) {
 +    return status;
 +  }
 +}

Reply via email to