http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index 3098251,0000000..7b37a9e
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@@ -1,510 -1,0 +1,510 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.PrintStream;
 +import java.net.URL;
 +import java.security.SecureRandom;
 +import java.security.cert.CertificateException;
 +import java.security.cert.X509Certificate;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.TreeMap;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import javax.net.ssl.HostnameVerifier;
 +import javax.net.ssl.HttpsURLConnection;
 +import javax.net.ssl.KeyManager;
 +import javax.net.ssl.SSLContext;
 +import javax.net.ssl.SSLSession;
 +import javax.net.ssl.TrustManager;
 +import javax.net.ssl.X509TrustManager;
 +
 +import org.apache.accumulo.cluster.ClusterControl;
 +import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster;
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.rfile.PrintInfo;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.MonitorUtil;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.accumulo.fate.zookeeper.ZooLock;
 +import org.apache.accumulo.fate.zookeeper.ZooReader;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.accumulo.test.TestMultiTableIngest;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Charsets;
 +import com.google.common.collect.Iterators;
 +
 +public class ReadWriteIT extends AccumuloClusterHarness {
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +  }
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(ReadWriteIT.class);
 +
 +  static final int ROWS = 100000;
 +  static final int COLS = 1;
 +  static final String COLF = "colf";
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 6 * 60;
 +  }
 +
 +  @Test(expected = RuntimeException.class)
 +  public void invalidInstanceName() throws Exception {
 +    final Connector conn = getConnector();
 +    new ZooKeeperInstance("fake_instance_name", 
conn.getInstance().getZooKeepers());
 +  }
 +
 +  @Test
 +  public void sunnyDay() throws Exception {
 +    // Start accumulo, create a table, insert some data, verify we can read 
it out.
 +    // Shutdown cleanly.
 +    log.debug("Starting Monitor");
 +    cluster.getClusterControl().startAllServers(ServerType.MONITOR);
 +    Connector connector = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
ROWS, COLS, 50, 0, tableName);
 +    verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
ROWS, COLS, 50, 0, tableName);
 +    String monitorLocation = null;
 +    while (null == monitorLocation) {
 +      monitorLocation = MonitorUtil.getLocation(getConnector().getInstance());
 +      if (null == monitorLocation) {
 +        log.debug("Could not fetch monitor HTTP address from zookeeper");
 +        Thread.sleep(2000);
 +      }
 +    }
 +    String scheme = "http://";;
 +    if (getCluster() instanceof StandaloneAccumuloCluster) {
 +      StandaloneAccumuloCluster standaloneCluster = 
(StandaloneAccumuloCluster) getCluster();
 +      File accumuloSite = new 
File(standaloneCluster.getServerAccumuloConfDir(), "accumulo-site.xml");
 +      if (accumuloSite.isFile()) {
 +        Configuration conf = new Configuration(false);
 +        conf.addResource(new Path(accumuloSite.toURI()));
 +        String monitorSslKeystore = 
conf.get(Property.MONITOR_SSL_KEYSTORE.getKey());
 +        if (null != monitorSslKeystore) {
 +          log.info("Setting scheme to HTTPS since monitor ssl keystore 
configuration was observed in {}", accumuloSite);
 +          scheme = "https://";;
 +          SSLContext ctx = SSLContext.getInstance("SSL");
 +          TrustManager[] tm = new TrustManager[] {new TestTrustManager()};
 +          ctx.init(new KeyManager[0], tm, new SecureRandom());
 +          SSLContext.setDefault(ctx);
 +          
HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory());
 +          HttpsURLConnection.setDefaultHostnameVerifier(new 
TestHostnameVerifier());
 +        }
 +      } else {
 +        log.info("{} is not a normal file, not checking for monitor running 
with SSL", accumuloSite);
 +      }
 +    }
 +    URL url = new URL(scheme + monitorLocation);
 +    log.debug("Fetching web page " + url);
 +    String result = FunctionalTestUtils.readAll(url.openStream());
 +    assertTrue(result.length() > 100);
 +    log.debug("Stopping accumulo cluster");
 +    ClusterControl control = cluster.getClusterControl();
 +    control.adminStopAll();
 +    ZooReader zreader = new 
ZooReader(connector.getInstance().getZooKeepers(), 
connector.getInstance().getZooKeepersSessionTimeOut());
 +    ZooCache zcache = new ZooCache(zreader, null);
 +    byte[] masterLockData;
 +    do {
 +      masterLockData = ZooLock.getLockData(zcache, 
ZooUtil.getRoot(connector.getInstance()) + Constants.ZMASTER_LOCK, null);
 +      if (null != masterLockData) {
 +        log.info("Master lock is still held");
 +        Thread.sleep(1000);
 +      }
 +    } while (null != masterLockData);
 +
 +    control.stopAllServers(ServerType.GARBAGE_COLLECTOR);
 +    control.stopAllServers(ServerType.MONITOR);
 +    control.stopAllServers(ServerType.TRACER);
 +    log.debug("success!");
 +    // Restarting everything
 +    cluster.start();
 +  }
 +
 +  public static void ingest(Connector connector, ClientConfiguration 
clientConfig, String principal, int rows, int cols, int width, int offset, 
String tableName)
 +      throws Exception {
 +    ingest(connector, clientConfig, principal, rows, cols, width, offset, 
COLF, tableName);
 +  }
 +
 +  public static void ingest(Connector connector, ClientConfiguration 
clientConfig, String principal, int rows, int cols, int width, int offset, 
String colf,
 +      String tableName) throws Exception {
 +    TestIngest.Opts opts = new TestIngest.Opts();
 +    opts.rows = rows;
 +    opts.cols = cols;
 +    opts.dataSize = width;
 +    opts.startRow = offset;
 +    opts.columnFamily = colf;
 +    opts.createTable = true;
 +    opts.setTableName(tableName);
 +    if 
(clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +      opts.updateKerberosCredentials(clientConfig);
 +    } else {
 +      opts.setPrincipal(principal);
 +    }
 +
 +    TestIngest.ingest(connector, opts, new BatchWriterOpts());
 +  }
 +
 +  public static void verify(Connector connector, ClientConfiguration 
clientConfig, String principal, int rows, int cols, int width, int offset, 
String tableName)
 +      throws Exception {
 +    verify(connector, clientConfig, principal, rows, cols, width, offset, 
COLF, tableName);
 +  }
 +
 +  private static void verify(Connector connector, ClientConfiguration 
clientConfig, String principal, int rows, int cols, int width, int offset, 
String colf,
 +      String tableName) throws Exception {
 +    ScannerOpts scannerOpts = new ScannerOpts();
 +    VerifyIngest.Opts opts = new VerifyIngest.Opts();
 +    opts.rows = rows;
 +    opts.cols = cols;
 +    opts.dataSize = width;
 +    opts.startRow = offset;
 +    opts.columnFamily = colf;
 +    opts.setTableName(tableName);
 +    if 
(clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +      opts.updateKerberosCredentials(clientConfig);
 +    } else {
 +      opts.setPrincipal(principal);
 +    }
 +
 +    VerifyIngest.verifyIngest(connector, opts, scannerOpts);
 +  }
 +
 +  public static String[] args(String... args) {
 +    return args;
 +  }
 +
 +  @Test
 +  public void multiTableTest() throws Exception {
 +    // Write to multiple tables
 +    final String instance = cluster.getInstanceName();
 +    final String keepers = cluster.getZooKeepers();
 +    final ClusterControl control = cluster.getClusterControl();
 +    final String prefix = getClass().getSimpleName() + "_" + 
testName.getMethodName();
 +    ExecutorService svc = Executors.newFixedThreadPool(2);
 +    Future<Integer> p1 = svc.submit(new Callable<Integer>() {
 +      @Override
 +      public Integer call() {
 +        try {
 +          ClientConfiguration clientConf = cluster.getClientConfig();
 +          // Invocation is different for SASL. We're only logged in via this 
processes memory (not via some credentials cache on disk)
 +          // Need to pass along the keytab because of that.
 +          if 
(clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +            String principal = getAdminPrincipal();
 +            AuthenticationToken token = getAdminToken();
 +            assertTrue("Expected KerberosToken, but was " + token.getClass(), 
token instanceof KerberosToken);
 +            KerberosToken kt = (KerberosToken) token;
 +            assertNotNull("Expected keytab in token", kt.getKeytab());
 +            return control.exec(
 +                TestMultiTableIngest.class,
 +                args("--count", Integer.toString(ROWS), "-i", instance, "-z", 
keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab().getAbsolutePath(),
 +                    "-u", principal));
 +          }
 +
 +          return control.exec(
 +              TestMultiTableIngest.class,
 +              args("--count", Integer.toString(ROWS), "-u", 
getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String(
 +                  ((PasswordToken) getAdminToken()).getPassword(), 
Charsets.UTF_8), "--tablePrefix", prefix));
 +        } catch (IOException e) {
 +          log.error("Error running MultiTableIngest", e);
 +          return -1;
 +        }
 +      }
 +    });
 +    Future<Integer> p2 = svc.submit(new Callable<Integer>() {
 +      @Override
 +      public Integer call() {
 +        try {
 +          ClientConfiguration clientConf = cluster.getClientConfig();
 +          // Invocation is different for SASL. We're only logged in via this 
processes memory (not via some credentials cache on disk)
 +          // Need to pass along the keytab because of that.
 +          if 
(clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +            String principal = getAdminPrincipal();
 +            AuthenticationToken token = getAdminToken();
 +            assertTrue("Expected KerberosToken, but was " + token.getClass(), 
token instanceof KerberosToken);
 +            KerberosToken kt = (KerberosToken) token;
 +            assertNotNull("Expected keytab in token", kt.getKeytab());
 +            return control.exec(
 +                TestMultiTableIngest.class,
 +                args("--count", Integer.toString(ROWS), "--readonly", "-i", 
instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab()
 +                    .getAbsolutePath(), "-u", principal));
 +          }
 +
 +          return control.exec(
 +              TestMultiTableIngest.class,
 +              args("--count", Integer.toString(ROWS), "--readonly", "-u", 
getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String(
 +                  ((PasswordToken) getAdminToken()).getPassword(), 
Charsets.UTF_8), "--tablePrefix", prefix));
 +        } catch (IOException e) {
 +          log.error("Error running MultiTableIngest", e);
 +          return -1;
 +        }
 +      }
 +    });
 +    svc.shutdown();
 +    while (!svc.isTerminated()) {
 +      svc.awaitTermination(15, TimeUnit.SECONDS);
 +    }
 +    assertEquals(0, p1.get().intValue());
 +    assertEquals(0, p2.get().intValue());
 +  }
 +
 +  @Test
 +  public void largeTest() throws Exception {
 +    // write a few large values
 +    Connector connector = getConnector();
 +    String table = getUniqueNames(1)[0];
 +    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 
1, 500000, 0, table);
 +    verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 
1, 500000, 0, table);
 +  }
 +
 +  @Test
 +  public void interleaved() throws Exception {
 +    // read and write concurrently
 +    final Connector connector = getConnector();
 +    final String tableName = getUniqueNames(1)[0];
 +    interleaveTest(connector, tableName);
 +  }
 +
 +  static void interleaveTest(final Connector connector, final String 
tableName) throws Exception {
 +    final AtomicBoolean fail = new AtomicBoolean(false);
 +    final int CHUNKSIZE = ROWS / 10;
 +    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
CHUNKSIZE, 1, 50, 0, tableName);
 +    int i;
 +    for (i = 0; i < ROWS; i += CHUNKSIZE) {
 +      final int start = i;
 +      Thread verify = new Thread() {
 +        @Override
 +        public void run() {
 +          try {
 +            verify(connector, getCluster().getClientConfig(), 
getAdminPrincipal(), CHUNKSIZE, 1, 50, start, tableName);
 +          } catch (Exception ex) {
 +            fail.set(true);
 +          }
 +        }
 +      };
 +      verify.start();
 +      ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
CHUNKSIZE, 1, 50, i + CHUNKSIZE, tableName);
 +      verify.join();
 +      assertFalse(fail.get());
 +    }
 +    verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
CHUNKSIZE, 1, 50, i, tableName);
 +  }
 +
 +  public static Text t(String s) {
 +    return new Text(s);
 +  }
 +
 +  public static Mutation m(String row, String cf, String cq, String value) {
 +    Mutation m = new Mutation(t(row));
 +    m.put(t(cf), t(cq), new Value(value.getBytes()));
 +    return m;
 +  }
 +
 +  @Test
 +  public void localityGroupPerf() throws Exception {
 +    // verify that locality groups can make look-ups faster
 +    final Connector connector = getConnector();
 +    final String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +    connector.tableOperations().setProperty(tableName, "table.group.g1", 
"colf");
 +    connector.tableOperations().setProperty(tableName, 
"table.groups.enabled", "g1");
 +    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
2000, 1, 50, 0, tableName);
 +    connector.tableOperations().compact(tableName, null, null, true, true);
 +    BatchWriter bw = connector.createBatchWriter(tableName, new 
BatchWriterConfig());
 +    bw.addMutation(m("zzzzzzzzzzz", "colf2", "cq", "value"));
 +    bw.close();
 +    long now = System.currentTimeMillis();
 +    Scanner scanner = connector.createScanner(tableName, 
Authorizations.EMPTY);
 +    scanner.fetchColumnFamily(new Text("colf"));
 +    Iterators.size(scanner.iterator());
 +    long diff = System.currentTimeMillis() - now;
 +    now = System.currentTimeMillis();
 +    scanner = connector.createScanner(tableName, Authorizations.EMPTY);
 +    scanner.fetchColumnFamily(new Text("colf2"));
 +    Iterators.size(scanner.iterator());
 +    bw.close();
 +    long diff2 = System.currentTimeMillis() - now;
 +    assertTrue(diff2 < diff);
 +  }
 +
 +  @Test
 +  public void sunnyLG() throws Exception {
 +    // create a locality group, write to it and ensure it exists in the 
RFiles that result
 +    final Connector connector = getConnector();
 +    final String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +    Map<String,Set<Text>> groups = new TreeMap<String,Set<Text>>();
 +    groups.put("g1", Collections.singleton(t("colf")));
 +    connector.tableOperations().setLocalityGroups(tableName, groups);
 +    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
2000, 1, 50, 0, tableName);
 +    verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
2000, 1, 50, 0, tableName);
 +    connector.tableOperations().flush(tableName, null, null, true);
 +    BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, 
Authorizations.EMPTY, 1);
 +    String tableId = connector.tableOperations().tableIdMap().get(tableName);
 +    bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + 
";"), new Text(tableId + "<"))));
 +    bscanner.fetchColumnFamily(DataFileColumnFamily.NAME);
 +    boolean foundFile = false;
 +    for (Entry<Key,Value> entry : bscanner) {
 +      foundFile = true;
 +      ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +      PrintStream newOut = new PrintStream(baos);
 +      PrintStream oldOut = System.out;
 +      try {
 +        System.setOut(newOut);
 +        List<String> args = new ArrayList<>();
 +        args.add(entry.getKey().getColumnQualifier().toString());
 +        if (ClusterType.STANDALONE == getClusterType() && 
cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(),
 false)) {
 +          args.add("--config");
 +          StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster;
 +          String hadoopConfDir = sac.getHadoopConfDir();
 +          args.add(new Path(hadoopConfDir, "core-site.xml").toString());
 +          args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString());
 +        }
 +        log.info("Invoking PrintInfo with " + args);
 +        PrintInfo.main(args.toArray(new String[args.size()]));
 +        newOut.flush();
 +        String stdout = baos.toString();
 +        assertTrue(stdout.contains("Locality group           : g1"));
 +        assertTrue(stdout.contains("families        : [colf]"));
 +      } finally {
 +        newOut.close();
 +        System.setOut(oldOut);
 +      }
 +    }
 +    bscanner.close();
 +    assertTrue(foundFile);
 +  }
 +
 +  @Test
 +  public void localityGroupChange() throws Exception {
 +    // Make changes to locality groups and ensure nothing is lost
 +    final Connector connector = getConnector();
 +    String table = getUniqueNames(1)[0];
 +    TableOperations to = connector.tableOperations();
 +    to.create(table);
 +    String[] config = new String[] {"lg1:colf", null, "lg1:colf,xyz", 
"lg1:colf,xyz;lg2:c1,c2"};
 +    int i = 0;
 +    for (String cfg : config) {
 +      to.setLocalityGroups(table, getGroups(cfg));
 +      ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
ROWS * (i + 1), 1, 50, ROWS * i, table);
 +      to.flush(table, null, null, true);
 +      verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
0, 1, 50, ROWS * (i + 1), table);
 +      i++;
 +    }
 +    to.delete(table);
 +    to.create(table);
 +    config = new String[] {"lg1:colf", null, "lg1:colf,xyz", 
"lg1:colf;lg2:colf",};
 +    i = 1;
 +    for (String cfg : config) {
 +      ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
ROWS * i, 1, 50, 0, table);
 +      ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
ROWS * i, 1, 50, 0, "xyz", table);
 +      to.setLocalityGroups(table, getGroups(cfg));
 +      to.flush(table, null, null, true);
 +      verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
ROWS * i, 1, 50, 0, table);
 +      verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 
ROWS * i, 1, 50, 0, "xyz", table);
 +      i++;
 +    }
 +  }
 +
 +  private Map<String,Set<Text>> getGroups(String cfg) {
 +    Map<String,Set<Text>> groups = new TreeMap<String,Set<Text>>();
 +    if (cfg != null) {
 +      for (String group : cfg.split(";")) {
 +        String[] parts = group.split(":");
 +        Set<Text> cols = new HashSet<Text>();
 +        for (String col : parts[1].split(",")) {
 +          cols.add(t(col));
 +        }
 +        groups.put(parts[1], cols);
 +      }
 +    }
 +    return groups;
 +  }
 +
 +  private static class TestTrustManager implements X509TrustManager {
 +    @Override
 +    public void checkClientTrusted(X509Certificate[] arg0, String arg1) 
throws CertificateException {}
 +
 +    @Override
 +    public void checkServerTrusted(X509Certificate[] arg0, String arg1) 
throws CertificateException {}
 +
 +    @Override
 +    public X509Certificate[] getAcceptedIssuers() {
 +      return null;
 +    }
 +  }
 +
 +  private static class TestHostnameVerifier implements HostnameVerifier {
 +    @Override
 +    public boolean verify(String hostname, SSLSession session) {
 +      return true;
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
index d73bf3e,0000000..4ef0cab
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
@@@ -1,367 -1,0 +1,367 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.io.IOException;
 +import java.util.Map.Entry;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.cluster.ClusterControl;
 +import org.apache.accumulo.cluster.ClusterUser;
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.accumulo.fate.zookeeper.ZooLock;
 +import org.apache.accumulo.fate.zookeeper.ZooReader;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.After;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Charsets;
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class RestartIT extends AccumuloClusterHarness {
 +  private static final Logger log = LoggerFactory.getLogger(RestartIT.class);
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 10 * 60;
 +  }
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
 +    cfg.setProperty(Property.GC_CYCLE_START, "1s");
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  private static final ScannerOpts SOPTS = new ScannerOpts();
 +  private static final VerifyIngest.Opts VOPTS = new VerifyIngest.Opts();
 +  private static final TestIngest.Opts OPTS = new TestIngest.Opts();
 +  private static final BatchWriterOpts BWOPTS = new BatchWriterOpts();
 +  static {
 +    OPTS.rows = VOPTS.rows = 10 * 1000;
 +  }
 +
 +  private ExecutorService svc;
 +
 +  @Before
 +  public void setup() throws Exception {
 +    svc = Executors.newFixedThreadPool(1);
 +  }
 +
 +  @After
 +  public void teardown() throws Exception {
 +    if (null == svc) {
 +      return;
 +    }
 +
 +    if (!svc.isShutdown()) {
 +      svc.shutdown();
 +    }
 +
 +    while (!svc.awaitTermination(10, TimeUnit.SECONDS)) {
 +      log.info("Waiting for threadpool to terminate");
 +    }
 +  }
 +
 +  @Test
 +  public void restartMaster() throws Exception {
 +    Connector c = getConnector();
 +    final String tableName = getUniqueNames(1)[0];
 +    OPTS.setTableName(tableName);
 +    VOPTS.setTableName(tableName);
 +    c.tableOperations().create(tableName);
 +    final AuthenticationToken token = getAdminToken();
 +    final ClusterControl control = getCluster().getClusterControl();
 +
 +    final String[] args;
 +    if (token instanceof PasswordToken) {
 +      byte[] password = ((PasswordToken) token).getPassword();
 +      args = new String[] {"-u", getAdminPrincipal(), "-p", new 
String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z",
 +          cluster.getZooKeepers(), "--rows", "" + OPTS.rows, "--table", 
tableName};
 +      OPTS.setPrincipal(getAdminPrincipal());
 +      VOPTS.setPrincipal(getAdminPrincipal());
 +    } else if (token instanceof KerberosToken) {
 +      ClusterUser rootUser = getAdminUser();
 +      args = new String[] {"-u", getAdminPrincipal(), "--keytab", 
rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z",
 +          cluster.getZooKeepers(), "--rows", "" + OPTS.rows, "--table", 
tableName};
 +      ClientConfiguration clientConfig = cluster.getClientConfig();
 +      OPTS.updateKerberosCredentials(clientConfig);
 +      VOPTS.updateKerberosCredentials(clientConfig);
 +    } else {
 +      throw new RuntimeException("Unknown token");
 +    }
 +
 +    Future<Integer> ret = svc.submit(new Callable<Integer>() {
 +      @Override
 +      public Integer call() {
 +        try {
 +          return control.exec(TestIngest.class, args);
 +        } catch (IOException e) {
 +          log.error("Error running TestIngest", e);
 +          return -1;
 +        }
 +      }
 +    });
 +
 +    control.stopAllServers(ServerType.MASTER);
 +    control.startAllServers(ServerType.MASTER);
 +    assertEquals(0, ret.get().intValue());
 +    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
 +  }
 +
 +  @Test
 +  public void restartMasterRecovery() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    OPTS.setTableName(tableName);
 +    VOPTS.setTableName(tableName);
 +    ClientConfiguration clientConfig = cluster.getClientConfig();
 +    if 
(clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +      OPTS.updateKerberosCredentials(clientConfig);
 +      VOPTS.updateKerberosCredentials(clientConfig);
 +    } else {
 +      OPTS.setPrincipal(getAdminPrincipal());
 +      VOPTS.setPrincipal(getAdminPrincipal());
 +    }
 +    TestIngest.ingest(c, OPTS, BWOPTS);
 +    ClusterControl control = getCluster().getClusterControl();
 +
 +    // TODO implement a kill all too?
 +    // cluster.stop() would also stop ZooKeeper
 +    control.stopAllServers(ServerType.MASTER);
 +    control.stopAllServers(ServerType.TRACER);
 +    control.stopAllServers(ServerType.TABLET_SERVER);
 +    control.stopAllServers(ServerType.GARBAGE_COLLECTOR);
 +    control.stopAllServers(ServerType.MONITOR);
 +
 +    ZooReader zreader = new ZooReader(c.getInstance().getZooKeepers(), 
c.getInstance().getZooKeepersSessionTimeOut());
 +    ZooCache zcache = new ZooCache(zreader, null);
 +    byte[] masterLockData;
 +    do {
 +      masterLockData = ZooLock.getLockData(zcache, 
ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null);
 +      if (null != masterLockData) {
 +        log.info("Master lock is still held");
 +        Thread.sleep(1000);
 +      }
 +    } while (null != masterLockData);
 +
 +    cluster.start();
 +    sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
 +    control.stopAllServers(ServerType.MASTER);
 +
 +    masterLockData = new byte[0];
 +    do {
 +      masterLockData = ZooLock.getLockData(zcache, 
ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null);
 +      if (null != masterLockData) {
 +        log.info("Master lock is still held");
 +        Thread.sleep(1000);
 +      }
 +    } while (null != masterLockData);
 +    cluster.start();
 +    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
 +  }
 +
 +  @Test
 +  public void restartMasterSplit() throws Exception {
 +    Connector c = getConnector();
 +    final String tableName = getUniqueNames(1)[0];
 +    final AuthenticationToken token = getAdminToken();
 +    final ClusterControl control = getCluster().getClusterControl();
 +    VOPTS.setTableName(tableName);
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
 +
 +    final String[] args;
 +    if (token instanceof PasswordToken) {
 +      byte[] password = ((PasswordToken) token).getPassword();
 +      args = new String[] {"-u", getAdminPrincipal(), "-p", new 
String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z",
 +          cluster.getZooKeepers(), "--rows", Integer.toString(VOPTS.rows), 
"--table", tableName};
 +      OPTS.setPrincipal(getAdminPrincipal());
 +      VOPTS.setPrincipal(getAdminPrincipal());
 +    } else if (token instanceof KerberosToken) {
 +      ClusterUser rootUser = getAdminUser();
 +      args = new String[] {"-u", getAdminPrincipal(), "--keytab", 
rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z",
 +          cluster.getZooKeepers(), "--rows", Integer.toString(VOPTS.rows), 
"--table", tableName};
 +      ClientConfiguration clientConfig = cluster.getClientConfig();
 +      OPTS.updateKerberosCredentials(clientConfig);
 +      VOPTS.updateKerberosCredentials(clientConfig);
 +    } else {
 +      throw new RuntimeException("Unknown token");
 +    }
 +
 +    Future<Integer> ret = svc.submit(new Callable<Integer>() {
 +      @Override
 +      public Integer call() {
 +        try {
 +          return control.exec(TestIngest.class, args);
 +        } catch (Exception e) {
 +          log.error("Error running TestIngest", e);
 +          return -1;
 +        }
 +      }
 +    });
 +
 +    control.stopAllServers(ServerType.MASTER);
 +
 +    ZooReader zreader = new ZooReader(c.getInstance().getZooKeepers(), 
c.getInstance().getZooKeepersSessionTimeOut());
 +    ZooCache zcache = new ZooCache(zreader, null);
 +    byte[] masterLockData;
 +    do {
 +      masterLockData = ZooLock.getLockData(zcache, 
ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null);
 +      if (null != masterLockData) {
 +        log.info("Master lock is still held");
 +        Thread.sleep(1000);
 +      }
 +    } while (null != masterLockData);
 +
 +    cluster.start();
 +    assertEquals(0, ret.get().intValue());
 +    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
 +  }
 +
 +  @Test
 +  public void killedTabletServer() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    OPTS.setTableName(tableName);
 +    VOPTS.setTableName(tableName);
 +    ClientConfiguration clientConfig = cluster.getClientConfig();
 +    if 
(clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +      OPTS.updateKerberosCredentials(clientConfig);
 +      VOPTS.updateKerberosCredentials(clientConfig);
 +    } else {
 +      OPTS.setPrincipal(getAdminPrincipal());
 +      VOPTS.setPrincipal(getAdminPrincipal());
 +    }
 +    TestIngest.ingest(c, OPTS, BWOPTS);
 +    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
 +    cluster.getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 +    cluster.start();
 +    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
 +  }
 +
 +  @Test
 +  public void killedTabletServer2() throws Exception {
 +    final Connector c = getConnector();
 +    final String[] names = getUniqueNames(2);
 +    final String tableName = names[0];
 +    final ClusterControl control = getCluster().getClusterControl();
 +    c.tableOperations().create(tableName);
 +    // Original test started and then stopped a GC. Not sure why it did this. 
The GC was
 +    // already running by default, and it would have nothing to do after only 
creating a table
 +    control.stopAllServers(ServerType.TABLET_SERVER);
 +
 +    cluster.start();
 +    c.tableOperations().create(names[1]);
 +  }
 +
 +  @Test
 +  public void killedTabletServerDuringShutdown() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    OPTS.setTableName(tableName);
 +    ClientConfiguration clientConfig = cluster.getClientConfig();
 +    if 
(clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +      OPTS.updateKerberosCredentials(clientConfig);
 +    } else {
 +      OPTS.setPrincipal(getAdminPrincipal());
 +    }
 +    TestIngest.ingest(c, OPTS, BWOPTS);
 +    try {
 +      
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 +      getCluster().getClusterControl().adminStopAll();
 +    } finally {
 +      getCluster().start();
 +    }
 +  }
 +
 +  @Test
 +  public void shutdownDuringCompactingSplitting() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    VOPTS.setTableName(tableName);
 +    ClientConfiguration clientConfig = cluster.getClientConfig();
 +    if 
(clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +      OPTS.updateKerberosCredentials(clientConfig);
 +      VOPTS.updateKerberosCredentials(clientConfig);
 +    } else {
 +      OPTS.setPrincipal(getAdminPrincipal());
 +      VOPTS.setPrincipal(getAdminPrincipal());
 +    }
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
 +    String splitThreshold = null;
 +    for (Entry<String,String> entry : 
c.tableOperations().getProperties(tableName)) {
 +      if (entry.getKey().equals(Property.TABLE_SPLIT_THRESHOLD.getKey())) {
 +        splitThreshold = entry.getValue();
 +        break;
 +      }
 +    }
 +    Assert.assertNotNull(splitThreshold);
 +    try {
 +      c.tableOperations().setProperty(MetadataTable.NAME, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "20K");
 +      TestIngest.Opts opts = new TestIngest.Opts();
 +      opts.setTableName(tableName);
 +      if 
(clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +        opts.updateKerberosCredentials(clientConfig);
 +      } else {
 +        opts.setPrincipal(getAdminPrincipal());
 +      }
 +      TestIngest.ingest(c, opts, BWOPTS);
 +      c.tableOperations().flush(tableName, null, null, false);
 +      VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
 +      getCluster().stop();
 +    } finally {
 +      if (getClusterType() == ClusterType.STANDALONE) {
 +        getCluster().start();
 +        c.tableOperations().setProperty(MetadataTable.NAME, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), splitThreshold);
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
index f607d16,0000000..f58db38
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
@@@ -1,153 -1,0 +1,153 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.Map;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.cluster.ClusterControl;
 +import org.apache.accumulo.cluster.ClusterUser;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Charsets;
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class RestartStressIT extends AccumuloClusterHarness {
 +  private static final Logger log = 
LoggerFactory.getLogger(RestartStressIT.class);
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 +    Map<String,String> opts = cfg.getSiteConfig();
 +    opts.put(Property.TSERV_MAXMEM.getKey(), "100K");
 +    opts.put(Property.TSERV_MAJC_DELAY.getKey(), "100ms");
 +    opts.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1M");
-     opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s");
++    opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
 +    opts.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s");
 +    cfg.setSiteConfig(opts);
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 10 * 60;
 +  }
 +
 +  private ExecutorService svc;
 +
 +  @Before
 +  public void setup() throws Exception {
 +    svc = Executors.newFixedThreadPool(1);
 +  }
 +
 +  @After
 +  public void teardown() throws Exception {
 +    if (null == svc) {
 +      return;
 +    }
 +
 +    if (!svc.isShutdown()) {
 +      svc.shutdown();
 +    }
 +
 +    while (!svc.awaitTermination(10, TimeUnit.SECONDS)) {
 +      log.info("Waiting for threadpool to terminate");
 +    }
 +  }
 +
 +  private static final VerifyIngest.Opts VOPTS;
 +  static {
 +    VOPTS = new VerifyIngest.Opts();
 +    VOPTS.rows = 10 * 1000;
 +  }
 +  private static final ScannerOpts SOPTS = new ScannerOpts();
 +
 +  @Test
 +  public void test() throws Exception {
 +    final Connector c = getConnector();
 +    final String tableName = getUniqueNames(1)[0];
 +    final AuthenticationToken token = getAdminToken();
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "500K");
 +    final ClusterControl control = getCluster().getClusterControl();
 +    final String[] args;
 +    if (token instanceof PasswordToken) {
 +      byte[] password = ((PasswordToken) token).getPassword();
 +      args = new String[] {"-u", getAdminPrincipal(), "-p", new 
String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z",
 +          cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", 
tableName};
 +    } else if (token instanceof KerberosToken) {
 +      ClusterUser rootUser = getAdminUser();
 +      args = new String[] {"-u", getAdminPrincipal(), "--keytab", 
rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z",
 +          cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", 
tableName};
 +    } else {
 +      throw new RuntimeException("Unrecognized token");
 +    }
 +
 +    Future<Integer> retCode = svc.submit(new Callable<Integer>() {
 +      @Override
 +      public Integer call() {
 +        try {
 +          return control.exec(TestIngest.class, args);
 +        } catch (Exception e) {
 +          log.error("Error running TestIngest", e);
 +          return -1;
 +        }
 +      }
 +    });
 +
 +    for (int i = 0; i < 2; i++) {
 +      sleepUninterruptibly(10, TimeUnit.SECONDS);
 +      control.stopAllServers(ServerType.TABLET_SERVER);
 +      control.startAllServers(ServerType.TABLET_SERVER);
 +    }
 +    assertEquals(0, retCode.get().intValue());
 +    VOPTS.setTableName(tableName);
 +
 +    if (token instanceof PasswordToken) {
 +      VOPTS.setPrincipal(getAdminPrincipal());
 +    } else if (token instanceof KerberosToken) {
 +      VOPTS.updateKerberosCredentials(cluster.getClientConfig());
 +    } else {
 +      throw new RuntimeException("Unrecognized token");
 +    }
 +
 +    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
index 36bdd7a,0000000..5f7ca88
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
@@@ -1,153 -1,0 +1,153 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.ConditionalWriter;
 +import org.apache.accumulo.core.client.ConditionalWriter.Status;
 +import org.apache.accumulo.core.client.ConditionalWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Durability;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Condition;
 +import org.apache.accumulo.core.data.ConditionalMutation;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
 +
 +public class SessionDurabilityIT extends ConfigurableMacBase {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 +    cfg.setNumTservers(1);
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +  }
 +
 +  @Test(timeout = 3 * 60 * 1000)
 +  public void nondurableTableHasDurableWrites() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    // table default has no durability
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, 
Property.TABLE_DURABILITY.getKey(), "none");
 +    // send durable writes
 +    BatchWriterConfig cfg = new BatchWriterConfig();
 +    cfg.setDurability(Durability.SYNC);
 +    writeSome(tableName, 10, cfg);
 +    assertEquals(10, count(tableName));
 +    // verify writes servive restart
 +    restartTServer();
 +    assertEquals(10, count(tableName));
 +  }
 +
 +  @Test(timeout = 3 * 60 * 1000)
 +  public void durableTableLosesNonDurableWrites() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    // table default is durable writes
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, 
Property.TABLE_DURABILITY.getKey(), "sync");
 +    // write with no durability
 +    BatchWriterConfig cfg = new BatchWriterConfig();
 +    cfg.setDurability(Durability.NONE);
 +    writeSome(tableName, 10, cfg);
 +    // verify writes are lost on restart
 +    restartTServer();
 +    assertTrue(10 > count(tableName));
 +  }
 +
 +  private int count(String tableName) throws Exception {
 +    return Iterators.size(getConnector().createScanner(tableName, 
Authorizations.EMPTY).iterator());
 +  }
 +
 +  private void writeSome(String tableName, int n, BatchWriterConfig cfg) 
throws Exception {
 +    Connector c = getConnector();
 +    BatchWriter bw = c.createBatchWriter(tableName, cfg);
 +    for (int i = 0; i < n; i++) {
 +      Mutation m = new Mutation(i + "");
 +      m.put("", "", "");
 +      bw.addMutation(m);
 +    }
 +    bw.close();
 +  }
 +
 +  @Test(timeout = 3 * 60 * 1000)
 +  public void testConditionDurability() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    // table default is durable writes
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, 
Property.TABLE_DURABILITY.getKey(), "sync");
 +    // write without durability
 +    ConditionalWriterConfig cfg = new ConditionalWriterConfig();
 +    cfg.setDurability(Durability.NONE);
 +    conditionWriteSome(tableName, 10, cfg);
 +    // everything in there?
 +    assertEquals(10, count(tableName));
 +    // restart the server and verify the updates are lost
 +    restartTServer();
 +    assertEquals(0, count(tableName));
 +  }
 +
 +  @Test(timeout = 3 * 60 * 1000)
 +  public void testConditionDurability2() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    // table default is durable writes
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, 
Property.TABLE_DURABILITY.getKey(), "none");
 +    // write with durability
 +    ConditionalWriterConfig cfg = new ConditionalWriterConfig();
 +    cfg.setDurability(Durability.SYNC);
 +    conditionWriteSome(tableName, 10, cfg);
 +    // everything in there?
 +    assertEquals(10, count(tableName));
 +    // restart the server and verify the updates are still there
 +    restartTServer();
 +    assertEquals(10, count(tableName));
 +  }
 +
 +  private void conditionWriteSome(String tableName, int n, 
ConditionalWriterConfig cfg) throws Exception {
 +    Connector c = getConnector();
 +    ConditionalWriter cw = c.createConditionalWriter(tableName, cfg);
 +    for (int i = 0; i < n; i++) {
 +      ConditionalMutation m = new ConditionalMutation((CharSequence) (i + 
""), new Condition("", ""));
 +      m.put("", "", "X");
 +      assertEquals(Status.ACCEPTED, cw.write(m).getStatus());
 +    }
 +  }
 +
 +  private void restartTServer() throws Exception {
 +    for (ProcessReference proc : 
cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +      cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +    }
 +    cluster.start();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index ab3c662,0000000..6f0bc7a
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@@ -1,235 -1,0 +1,235 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY;
 +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START;
 +import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT;
 +import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
 +import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION;
 +import static org.apache.accumulo.core.security.Authorizations.EMPTY;
 +import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
 +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 +import org.apache.accumulo.master.state.SetGoalState;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.server.log.WalStateManager;
 +import org.apache.accumulo.server.log.WalStateManager.WalState;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class WALSunnyDayIT extends ConfigurableMacBase {
 +
 +  private static final Text CF = new Text(new byte[0]);
 +
 +  @Override
 +  protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 +    cfg.setProperty(GC_CYCLE_DELAY, "1s");
 +    cfg.setProperty(GC_CYCLE_START, "0s");
 +    cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
 +    cfg.setProperty(TSERV_WAL_REPLICATION, "1");
-     cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s");
++    cfg.setProperty(INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setNumTservers(1);
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  int countTrue(Collection<Boolean> bools) {
 +    int result = 0;
 +    for (Boolean b : bools) {
 +      if (b.booleanValue())
 +        result++;
 +    }
 +    return result;
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    MiniAccumuloClusterImpl mac = getCluster();
 +    MiniAccumuloClusterControl control = mac.getClusterControl();
 +    control.stop(GARBAGE_COLLECTOR);
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    writeSomeData(c, tableName, 1, 1);
 +
 +    // wal markers are added lazily
 +    Map<String,Boolean> wals = getWals(c);
 +    assertEquals(wals.toString(), 2, wals.size());
 +    for (Boolean b : wals.values()) {
 +      assertTrue("logs should be in use", b.booleanValue());
 +    }
 +
 +    // roll log, get a new next
 +    writeSomeData(c, tableName, 1001, 50);
 +    Map<String,Boolean> walsAfterRoll = getWals(c);
 +    assertEquals("should have 3 WALs after roll", 3, walsAfterRoll.size());
 +    assertTrue("new WALs should be a superset of the old WALs", 
walsAfterRoll.keySet().containsAll(wals.keySet()));
 +    assertEquals("all WALs should be in use", 3, 
countTrue(walsAfterRoll.values()));
 +
 +    // flush the tables
 +    for (String table : new String[] {tableName, MetadataTable.NAME, 
RootTable.NAME}) {
 +      c.tableOperations().flush(table, null, null, true);
 +    }
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    // rolled WAL is no longer in use, but needs to be GC'd
 +    Map<String,Boolean> walsAfterflush = getWals(c);
 +    assertEquals(walsAfterflush.toString(), 3, walsAfterflush.size());
 +    assertEquals("inUse should be 2", 2, countTrue(walsAfterflush.values()));
 +
 +    // let the GC run for a little bit
 +    control.start(GARBAGE_COLLECTOR);
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    // make sure the unused WAL goes away
 +    Map<String,Boolean> walsAfterGC = getWals(c);
 +    assertEquals(walsAfterGC.toString(), 2, walsAfterGC.size());
 +    control.stop(GARBAGE_COLLECTOR);
 +    // restart the tserver, but don't run recovery on all tablets
 +    control.stop(TABLET_SERVER);
 +    // this delays recovery on the normal tables
 +    assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
 +    control.start(TABLET_SERVER);
 +
 +    // wait for the metadata table to go back online
 +    getRecoveryMarkers(c);
 +    // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER 
tablets
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
 +    // log.debug("markers " + markers);
 +    assertEquals("one tablet should have markers", 1, 
markers.keySet().size());
 +    assertEquals("tableId of the keyExtent should be 1", 
markers.keySet().iterator().next().getTableId(), new Text("1"));
 +
 +    // put some data in the WAL
 +    assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
 +    verifySomeData(c, tableName, 1001 * 50 + 1);
 +    writeSomeData(c, tableName, 100, 100);
 +
 +    Map<String,Boolean> walsAfterRestart = getWals(c);
 +    // log.debug("wals after " + walsAfterRestart);
 +    assertEquals("used WALs after restart should be 4", 4, 
countTrue(walsAfterRestart.values()));
 +    control.start(GARBAGE_COLLECTOR);
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    Map<String,Boolean> walsAfterRestartAndGC = getWals(c);
 +    assertEquals("wals left should be 2", 2, walsAfterRestartAndGC.size());
 +    assertEquals("logs in use should be 2", 2, 
countTrue(walsAfterRestartAndGC.values()));
 +  }
 +
 +  private void verifySomeData(Connector c, String tableName, int expected) 
throws Exception {
 +    Scanner scan = c.createScanner(tableName, EMPTY);
 +    int result = Iterators.size(scan.iterator());
 +    scan.close();
 +    Assert.assertEquals(expected, result);
 +  }
 +
 +  private void writeSomeData(Connector conn, String tableName, int row, int 
col) throws Exception {
 +    Random rand = new Random();
 +    BatchWriter bw = conn.createBatchWriter(tableName, null);
 +    byte[] rowData = new byte[10];
 +    byte[] cq = new byte[10];
 +    byte[] value = new byte[10];
 +
 +    for (int r = 0; r < row; r++) {
 +      rand.nextBytes(rowData);
 +      Mutation m = new Mutation(rowData);
 +      for (int c = 0; c < col; c++) {
 +        rand.nextBytes(cq);
 +        rand.nextBytes(value);
 +        m.put(CF, new Text(cq), new Value(value));
 +      }
 +      bw.addMutation(m);
 +      if (r % 100 == 0) {
 +        bw.flush();
 +      }
 +    }
 +    bw.close();
 +  }
 +
 +  private Map<String,Boolean> getWals(Connector c) throws Exception {
 +    Map<String,Boolean> result = new HashMap<>();
 +    Instance i = c.getInstance();
 +    ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), 
i.getZooKeepersSessionTimeOut(), "");
 +    WalStateManager wals = new WalStateManager(c.getInstance(), zk);
 +    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
 +      // WALs are in use if they are not unreferenced
 +      result.put(entry.getKey().toString(), entry.getValue() != 
WalState.UNREFERENCED);
 +    }
 +    return result;
 +  }
 +
 +  private Map<KeyExtent,List<String>> getRecoveryMarkers(Connector c) throws 
Exception {
 +    Map<KeyExtent,List<String>> result = new HashMap<>();
 +    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
 +    root.setRange(TabletsSection.getRange());
 +    root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
 +    TabletColumnFamily.PREV_ROW_COLUMN.fetch(root);
 +
 +    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
 +    meta.setRange(TabletsSection.getRange());
 +    meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
 +    TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta);
 +
 +    List<String> logs = new ArrayList<>();
 +    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), 
meta.iterator());
 +    while (both.hasNext()) {
 +      Entry<Key,Value> entry = both.next();
 +      Key key = entry.getKey();
 +      if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) {
 +        logs.add(key.getColumnQualifier().toString());
 +      }
 +      if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && 
!logs.isEmpty()) {
 +        KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
 +        result.put(extent, logs);
 +        logs = new ArrayList<String>();
 +      }
 +    }
 +    return result;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
index d877969,0000000..0074f5f
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
@@@ -1,79 -1,0 +1,79 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.Test;
 +
 +public class WriteAheadLogIT extends AccumuloClusterHarness {
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 +    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
 +    cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
 +    cfg.setProperty(Property.GC_CYCLE_START, "1");
 +    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
 +    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "4s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 10 * 60;
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "750K");
 +    TestIngest.Opts opts = new TestIngest.Opts();
 +    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
 +    opts.setTableName(tableName);
 +
 +    ClientConfiguration clientConfig = cluster.getClientConfig();
 +    if 
(clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +      opts.updateKerberosCredentials(clientConfig);
 +      vopts.updateKerberosCredentials(clientConfig);
 +    } else {
 +      opts.setPrincipal(getAdminPrincipal());
 +      vopts.setPrincipal(getAdminPrincipal());
 +    }
 +
 +    TestIngest.ingest(c, opts, new BatchWriterOpts());
 +    vopts.setTableName(tableName);
 +    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
 +    getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 +    
getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
 +    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
index b19ec2f,0000000..5ece1ac
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
@@@ -1,89 -1,0 +1,89 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.hadoop.conf.Configuration;
 +import org.junit.Test;
 +
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class ZookeeperRestartIT extends ConfigurableMacBase {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 +    Map<String,String> siteConfig = new HashMap<String,String>();
-     siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s");
++    siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
 +    cfg.setSiteConfig(siteConfig);
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    Connector c = getConnector();
 +    c.tableOperations().create("test_ingest");
 +    BatchWriter bw = c.createBatchWriter("test_ingest", null);
 +    Mutation m = new Mutation("row");
 +    m.put("cf", "cq", "value");
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    // kill zookeeper
 +    for (ProcessReference proc : 
cluster.getProcesses().get(ServerType.ZOOKEEPER))
 +      cluster.killProcess(ServerType.ZOOKEEPER, proc);
 +
 +    // give the servers time to react
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +
 +    // start zookeeper back up
 +    cluster.start();
 +
 +    // use the tservers
 +    Scanner s = c.createScanner("test_ingest", Authorizations.EMPTY);
 +    Iterator<Entry<Key,Value>> i = s.iterator();
 +    assertTrue(i.hasNext());
 +    assertEquals("row", i.next().getKey().getRow().toString());
 +    assertFalse(i.hasNext());
 +    // use the master
 +    c.tableOperations().delete("test_ingest");
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
index 7f61a08,0000000..133c09c
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
@@@ -1,146 -1,0 +1,146 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.proxy;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.File;
 +import java.nio.ByteBuffer;
 +import java.nio.file.Files;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +import java.util.TreeMap;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.accumulo.proxy.Proxy;
 +import org.apache.accumulo.proxy.thrift.AccumuloProxy.Client;
 +import org.apache.accumulo.proxy.thrift.Column;
 +import org.apache.accumulo.proxy.thrift.ColumnUpdate;
 +import org.apache.accumulo.proxy.thrift.Condition;
 +import org.apache.accumulo.proxy.thrift.ConditionalStatus;
 +import org.apache.accumulo.proxy.thrift.ConditionalUpdates;
 +import org.apache.accumulo.proxy.thrift.ConditionalWriterOptions;
 +import org.apache.accumulo.proxy.thrift.Durability;
 +import org.apache.accumulo.proxy.thrift.TimeType;
 +import org.apache.accumulo.proxy.thrift.WriterOptions;
 +import org.apache.accumulo.server.util.PortUtils;
 +import org.apache.accumulo.test.functional.ConfigurableMacBase;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.apache.thrift.protocol.TJSONProtocol;
 +import org.apache.thrift.server.TServer;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
 +import com.google.common.net.HostAndPort;
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class ProxyDurabilityIT extends ConfigurableMacBase {
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
-     return 60;
++    return 120;
 +  }
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setNumTservers(1);
 +  }
 +
 +  private static ByteBuffer bytes(String value) {
 +    return ByteBuffer.wrap(value.getBytes());
 +  }
 +
 +  @Test
 +  public void testDurability() throws Exception {
 +    Connector c = getConnector();
 +    Properties props = new Properties();
 +    // Avoid issues with locally installed client configuration files with 
custom properties
 +    File emptyFile = Files.createTempFile(null, null).toFile();
 +    emptyFile.deleteOnExit();
 +    props.put("instance", c.getInstance().getInstanceName());
 +    props.put("zookeepers", c.getInstance().getZooKeepers());
 +    props.put("tokenClass", PasswordToken.class.getName());
 +    props.put("clientConfigurationFile", emptyFile.toString());
 +
 +    TJSONProtocol.Factory protocol = new TJSONProtocol.Factory();
 +
 +    int proxyPort = PortUtils.getRandomFreePort();
 +    final TServer proxyServer = 
Proxy.createProxyServer(HostAndPort.fromParts("localhost", proxyPort), 
protocol, props).server;
 +    while (!proxyServer.isServing())
 +      sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +    Client client = new TestProxyClient("localhost", proxyPort, 
protocol).proxy();
 +    Map<String,String> properties = new TreeMap<String,String>();
 +    properties.put("password", ROOT_PASSWORD);
 +    ByteBuffer login = client.login("root", properties);
 +
 +    String tableName = getUniqueNames(1)[0];
 +    client.createTable(login, tableName, true, TimeType.MILLIS);
 +    assertTrue(c.tableOperations().exists(tableName));
 +
 +    WriterOptions options = new WriterOptions();
 +    options.setDurability(Durability.NONE);
 +    String writer = client.createWriter(login, tableName, options);
 +    Map<ByteBuffer,List<ColumnUpdate>> cells = new 
TreeMap<ByteBuffer,List<ColumnUpdate>>();
 +    ColumnUpdate column = new ColumnUpdate(bytes("cf"), bytes("cq"));
 +    column.setValue("value".getBytes());
 +    cells.put(bytes("row"), Collections.singletonList(column));
 +    client.update(writer, cells);
 +    client.closeWriter(writer);
 +    assertEquals(1, count(tableName));
 +    restartTServer();
 +    assertEquals(0, count(tableName));
 +
 +    ConditionalWriterOptions cfg = new ConditionalWriterOptions();
 +    cfg.setDurability(Durability.SYNC);
 +    String cwriter = client.createConditionalWriter(login, tableName, cfg);
 +    ConditionalUpdates updates = new ConditionalUpdates();
 +    updates.addToConditions(new Condition(new Column(bytes("cf"), 
bytes("cq"), bytes(""))));
 +    updates.addToUpdates(column);
 +    Map<ByteBuffer,ConditionalStatus> status = 
client.updateRowsConditionally(cwriter, Collections.singletonMap(bytes("row"), 
updates));
 +    assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row")));
 +    assertEquals(1, count(tableName));
 +    restartTServer();
 +    assertEquals(1, count(tableName));
 +
 +    proxyServer.stop();
 +  }
 +
 +  private void restartTServer() throws Exception {
 +    for (ProcessReference proc : 
cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +      cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +    }
 +    cluster.start();
 +  }
 +
 +  private int count(String tableName) throws Exception {
 +    return Iterators.size((getConnector().createScanner(tableName, 
Authorizations.EMPTY)).iterator());
 +  }
 +
 +}

Reply via email to