http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java index 3098251,0000000..7b37a9e mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@@ -1,510 -1,0 +1,510 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URL; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.PrintInfo; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.MonitorUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestMultiTableIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.collect.Iterators; + +public class ReadWriteIT extends AccumuloClusterHarness { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + } + + private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class); + + static final int ROWS = 100000; + static final int COLS = 1; + static final String COLF = "colf"; + + @Override + protected int defaultTimeoutSeconds() { + return 6 * 60; + } + + @Test(expected = RuntimeException.class) + public void invalidInstanceName() throws Exception { + final Connector conn = getConnector(); + new ZooKeeperInstance("fake_instance_name", conn.getInstance().getZooKeepers()); + } + + @Test + public void sunnyDay() throws Exception { + // Start accumulo, create a table, insert some data, verify we can read it out. + // Shutdown cleanly. + log.debug("Starting Monitor"); + cluster.getClusterControl().startAllServers(ServerType.MONITOR); + Connector connector = getConnector(); + String tableName = getUniqueNames(1)[0]; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS, COLS, 50, 0, tableName); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS, COLS, 50, 0, tableName); + String monitorLocation = null; + while (null == monitorLocation) { + monitorLocation = MonitorUtil.getLocation(getConnector().getInstance()); + if (null == monitorLocation) { + log.debug("Could not fetch monitor HTTP address from zookeeper"); + Thread.sleep(2000); + } + } + String scheme = "http://"; + if (getCluster() instanceof StandaloneAccumuloCluster) { + StandaloneAccumuloCluster standaloneCluster = (StandaloneAccumuloCluster) getCluster(); + File accumuloSite = new File(standaloneCluster.getServerAccumuloConfDir(), "accumulo-site.xml"); + if (accumuloSite.isFile()) { + Configuration conf = new Configuration(false); + conf.addResource(new Path(accumuloSite.toURI())); + String monitorSslKeystore = conf.get(Property.MONITOR_SSL_KEYSTORE.getKey()); + if (null != monitorSslKeystore) { + log.info("Setting scheme to HTTPS since monitor ssl keystore configuration was observed in {}", accumuloSite); + scheme = "https://"; + SSLContext ctx = SSLContext.getInstance("SSL"); + TrustManager[] tm = new TrustManager[] {new TestTrustManager()}; + ctx.init(new KeyManager[0], tm, new SecureRandom()); + SSLContext.setDefault(ctx); + HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory()); + HttpsURLConnection.setDefaultHostnameVerifier(new TestHostnameVerifier()); + } + } else { + log.info("{} is not a normal file, not checking for monitor running with SSL", accumuloSite); + } + } + URL url = new URL(scheme + monitorLocation); + log.debug("Fetching web page " + url); + String result = FunctionalTestUtils.readAll(url.openStream()); + assertTrue(result.length() > 100); + log.debug("Stopping accumulo cluster"); + ClusterControl control = cluster.getClusterControl(); + control.adminStopAll(); + ZooReader zreader = new ZooReader(connector.getInstance().getZooKeepers(), connector.getInstance().getZooKeepersSessionTimeOut()); + ZooCache zcache = new ZooCache(zreader, null); + byte[] masterLockData; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(connector.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + + control.stopAllServers(ServerType.GARBAGE_COLLECTOR); + control.stopAllServers(ServerType.MONITOR); + control.stopAllServers(ServerType.TRACER); + log.debug("success!"); + // Restarting everything + cluster.start(); + } + + public static void ingest(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String tableName) + throws Exception { + ingest(connector, clientConfig, principal, rows, cols, width, offset, COLF, tableName); + } + + public static void ingest(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String colf, + String tableName) throws Exception { + TestIngest.Opts opts = new TestIngest.Opts(); + opts.rows = rows; + opts.cols = cols; + opts.dataSize = width; + opts.startRow = offset; + opts.columnFamily = colf; + opts.createTable = true; + opts.setTableName(tableName); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(principal); + } + + TestIngest.ingest(connector, opts, new BatchWriterOpts()); + } + + public static void verify(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String tableName) + throws Exception { + verify(connector, clientConfig, principal, rows, cols, width, offset, COLF, tableName); + } + + private static void verify(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String colf, + String tableName) throws Exception { + ScannerOpts scannerOpts = new ScannerOpts(); + VerifyIngest.Opts opts = new VerifyIngest.Opts(); + opts.rows = rows; + opts.cols = cols; + opts.dataSize = width; + opts.startRow = offset; + opts.columnFamily = colf; + opts.setTableName(tableName); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(principal); + } + + VerifyIngest.verifyIngest(connector, opts, scannerOpts); + } + + public static String[] args(String... args) { + return args; + } + + @Test + public void multiTableTest() throws Exception { + // Write to multiple tables + final String instance = cluster.getInstanceName(); + final String keepers = cluster.getZooKeepers(); + final ClusterControl control = cluster.getClusterControl(); + final String prefix = getClass().getSimpleName() + "_" + testName.getMethodName(); + ExecutorService svc = Executors.newFixedThreadPool(2); + Future<Integer> p1 = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + ClientConfiguration clientConf = cluster.getClientConfig(); + // Invocation is different for SASL. We're only logged in via this processes memory (not via some credentials cache on disk) + // Need to pass along the keytab because of that. + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + String principal = getAdminPrincipal(); + AuthenticationToken token = getAdminToken(); + assertTrue("Expected KerberosToken, but was " + token.getClass(), token instanceof KerberosToken); + KerberosToken kt = (KerberosToken) token; + assertNotNull("Expected keytab in token", kt.getKeytab()); + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "-i", instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab().getAbsolutePath(), + "-u", principal)); + } + + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "-u", getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String( + ((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8), "--tablePrefix", prefix)); + } catch (IOException e) { + log.error("Error running MultiTableIngest", e); + return -1; + } + } + }); + Future<Integer> p2 = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + ClientConfiguration clientConf = cluster.getClientConfig(); + // Invocation is different for SASL. We're only logged in via this processes memory (not via some credentials cache on disk) + // Need to pass along the keytab because of that. + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + String principal = getAdminPrincipal(); + AuthenticationToken token = getAdminToken(); + assertTrue("Expected KerberosToken, but was " + token.getClass(), token instanceof KerberosToken); + KerberosToken kt = (KerberosToken) token; + assertNotNull("Expected keytab in token", kt.getKeytab()); + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "--readonly", "-i", instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab() + .getAbsolutePath(), "-u", principal)); + } + + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "--readonly", "-u", getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String( + ((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8), "--tablePrefix", prefix)); + } catch (IOException e) { + log.error("Error running MultiTableIngest", e); + return -1; + } + } + }); + svc.shutdown(); + while (!svc.isTerminated()) { + svc.awaitTermination(15, TimeUnit.SECONDS); + } + assertEquals(0, p1.get().intValue()); + assertEquals(0, p2.get().intValue()); + } + + @Test + public void largeTest() throws Exception { + // write a few large values + Connector connector = getConnector(); + String table = getUniqueNames(1)[0]; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 1, 500000, 0, table); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 1, 500000, 0, table); + } + + @Test + public void interleaved() throws Exception { + // read and write concurrently + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + interleaveTest(connector, tableName); + } + + static void interleaveTest(final Connector connector, final String tableName) throws Exception { + final AtomicBoolean fail = new AtomicBoolean(false); + final int CHUNKSIZE = ROWS / 10; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, 0, tableName); + int i; + for (i = 0; i < ROWS; i += CHUNKSIZE) { + final int start = i; + Thread verify = new Thread() { + @Override + public void run() { + try { + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, start, tableName); + } catch (Exception ex) { + fail.set(true); + } + } + }; + verify.start(); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, i + CHUNKSIZE, tableName); + verify.join(); + assertFalse(fail.get()); + } + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, i, tableName); + } + + public static Text t(String s) { + return new Text(s); + } + + public static Mutation m(String row, String cf, String cq, String value) { + Mutation m = new Mutation(t(row)); + m.put(t(cf), t(cq), new Value(value.getBytes())); + return m; + } + + @Test + public void localityGroupPerf() throws Exception { + // verify that locality groups can make look-ups faster + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + connector.tableOperations().setProperty(tableName, "table.group.g1", "colf"); + connector.tableOperations().setProperty(tableName, "table.groups.enabled", "g1"); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + connector.tableOperations().compact(tableName, null, null, true, true); + BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); + bw.addMutation(m("zzzzzzzzzzz", "colf2", "cq", "value")); + bw.close(); + long now = System.currentTimeMillis(); + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + scanner.fetchColumnFamily(new Text("colf")); + Iterators.size(scanner.iterator()); + long diff = System.currentTimeMillis() - now; + now = System.currentTimeMillis(); + scanner = connector.createScanner(tableName, Authorizations.EMPTY); + scanner.fetchColumnFamily(new Text("colf2")); + Iterators.size(scanner.iterator()); + bw.close(); + long diff2 = System.currentTimeMillis() - now; + assertTrue(diff2 < diff); + } + + @Test + public void sunnyLG() throws Exception { + // create a locality group, write to it and ensure it exists in the RFiles that result + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + Map<String,Set<Text>> groups = new TreeMap<String,Set<Text>>(); + groups.put("g1", Collections.singleton(t("colf"))); + connector.tableOperations().setLocalityGroups(tableName, groups); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + connector.tableOperations().flush(tableName, null, null, true); + BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 1); + String tableId = connector.tableOperations().tableIdMap().get(tableName); + bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + ";"), new Text(tableId + "<")))); + bscanner.fetchColumnFamily(DataFileColumnFamily.NAME); + boolean foundFile = false; + for (Entry<Key,Value> entry : bscanner) { + foundFile = true; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream newOut = new PrintStream(baos); + PrintStream oldOut = System.out; + try { + System.setOut(newOut); + List<String> args = new ArrayList<>(); + args.add(entry.getKey().getColumnQualifier().toString()); + if (ClusterType.STANDALONE == getClusterType() && cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + args.add("--config"); + StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster; + String hadoopConfDir = sac.getHadoopConfDir(); + args.add(new Path(hadoopConfDir, "core-site.xml").toString()); + args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString()); + } + log.info("Invoking PrintInfo with " + args); + PrintInfo.main(args.toArray(new String[args.size()])); + newOut.flush(); + String stdout = baos.toString(); + assertTrue(stdout.contains("Locality group : g1")); + assertTrue(stdout.contains("families : [colf]")); + } finally { + newOut.close(); + System.setOut(oldOut); + } + } + bscanner.close(); + assertTrue(foundFile); + } + + @Test + public void localityGroupChange() throws Exception { + // Make changes to locality groups and ensure nothing is lost + final Connector connector = getConnector(); + String table = getUniqueNames(1)[0]; + TableOperations to = connector.tableOperations(); + to.create(table); + String[] config = new String[] {"lg1:colf", null, "lg1:colf,xyz", "lg1:colf,xyz;lg2:c1,c2"}; + int i = 0; + for (String cfg : config) { + to.setLocalityGroups(table, getGroups(cfg)); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * (i + 1), 1, 50, ROWS * i, table); + to.flush(table, null, null, true); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 0, 1, 50, ROWS * (i + 1), table); + i++; + } + to.delete(table); + to.create(table); + config = new String[] {"lg1:colf", null, "lg1:colf,xyz", "lg1:colf;lg2:colf",}; + i = 1; + for (String cfg : config) { + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, table); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, "xyz", table); + to.setLocalityGroups(table, getGroups(cfg)); + to.flush(table, null, null, true); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, table); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, "xyz", table); + i++; + } + } + + private Map<String,Set<Text>> getGroups(String cfg) { + Map<String,Set<Text>> groups = new TreeMap<String,Set<Text>>(); + if (cfg != null) { + for (String group : cfg.split(";")) { + String[] parts = group.split(":"); + Set<Text> cols = new HashSet<Text>(); + for (String col : parts[1].split(",")) { + cols.add(t(col)); + } + groups.put(parts[1], cols); + } + } + return groups; + } + + private static class TestTrustManager implements X509TrustManager { + @Override + public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {} + + @Override + public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {} + + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + } + + private static class TestHostnameVerifier implements HostnameVerifier { + @Override + public boolean verify(String hostname, SSLSession session) { + return true; + } + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java index d73bf3e,0000000..4ef0cab mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java @@@ -1,367 -1,0 +1,367 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class RestartIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(RestartIT.class); + + @Override + public int defaultTimeoutSeconds() { + return 10 * 60; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + private static final ScannerOpts SOPTS = new ScannerOpts(); + private static final VerifyIngest.Opts VOPTS = new VerifyIngest.Opts(); + private static final TestIngest.Opts OPTS = new TestIngest.Opts(); + private static final BatchWriterOpts BWOPTS = new BatchWriterOpts(); + static { + OPTS.rows = VOPTS.rows = 10 * 1000; + } + + private ExecutorService svc; + + @Before + public void setup() throws Exception { + svc = Executors.newFixedThreadPool(1); + } + + @After + public void teardown() throws Exception { + if (null == svc) { + return; + } + + if (!svc.isShutdown()) { + svc.shutdown(); + } + + while (!svc.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Waiting for threadpool to terminate"); + } + } + + @Test + public void restartMaster() throws Exception { + Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + OPTS.setTableName(tableName); + VOPTS.setTableName(tableName); + c.tableOperations().create(tableName); + final AuthenticationToken token = getAdminToken(); + final ClusterControl control = getCluster().getClusterControl(); + + final String[] args; + if (token instanceof PasswordToken) { + byte[] password = ((PasswordToken) token).getPassword(); + args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", "" + OPTS.rows, "--table", tableName}; + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } else if (token instanceof KerberosToken) { + ClusterUser rootUser = getAdminUser(); + args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", "" + OPTS.rows, "--table", tableName}; + ClientConfiguration clientConfig = cluster.getClientConfig(); + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + throw new RuntimeException("Unknown token"); + } + + Future<Integer> ret = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + return control.exec(TestIngest.class, args); + } catch (IOException e) { + log.error("Error running TestIngest", e); + return -1; + } + } + }); + + control.stopAllServers(ServerType.MASTER); + control.startAllServers(ServerType.MASTER); + assertEquals(0, ret.get().intValue()); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + + @Test + public void restartMasterRecovery() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + OPTS.setTableName(tableName); + VOPTS.setTableName(tableName); + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, OPTS, BWOPTS); + ClusterControl control = getCluster().getClusterControl(); + + // TODO implement a kill all too? + // cluster.stop() would also stop ZooKeeper + control.stopAllServers(ServerType.MASTER); + control.stopAllServers(ServerType.TRACER); + control.stopAllServers(ServerType.TABLET_SERVER); + control.stopAllServers(ServerType.GARBAGE_COLLECTOR); + control.stopAllServers(ServerType.MONITOR); + + ZooReader zreader = new ZooReader(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut()); + ZooCache zcache = new ZooCache(zreader, null); + byte[] masterLockData; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + + cluster.start(); + sleepUninterruptibly(5, TimeUnit.MILLISECONDS); + control.stopAllServers(ServerType.MASTER); + + masterLockData = new byte[0]; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + cluster.start(); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + + @Test + public void restartMasterSplit() throws Exception { + Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + final AuthenticationToken token = getAdminToken(); + final ClusterControl control = getCluster().getClusterControl(); + VOPTS.setTableName(tableName); + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K"); + + final String[] args; + if (token instanceof PasswordToken) { + byte[] password = ((PasswordToken) token).getPassword(); + args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", Integer.toString(VOPTS.rows), "--table", tableName}; + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } else if (token instanceof KerberosToken) { + ClusterUser rootUser = getAdminUser(); + args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", Integer.toString(VOPTS.rows), "--table", tableName}; + ClientConfiguration clientConfig = cluster.getClientConfig(); + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + throw new RuntimeException("Unknown token"); + } + + Future<Integer> ret = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + return control.exec(TestIngest.class, args); + } catch (Exception e) { + log.error("Error running TestIngest", e); + return -1; + } + } + }); + + control.stopAllServers(ServerType.MASTER); + + ZooReader zreader = new ZooReader(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut()); + ZooCache zcache = new ZooCache(zreader, null); + byte[] masterLockData; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + + cluster.start(); + assertEquals(0, ret.get().intValue()); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + + @Test + public void killedTabletServer() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + OPTS.setTableName(tableName); + VOPTS.setTableName(tableName); + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, OPTS, BWOPTS); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + cluster.getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + cluster.start(); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + + @Test + public void killedTabletServer2() throws Exception { + final Connector c = getConnector(); + final String[] names = getUniqueNames(2); + final String tableName = names[0]; + final ClusterControl control = getCluster().getClusterControl(); + c.tableOperations().create(tableName); + // Original test started and then stopped a GC. Not sure why it did this. The GC was + // already running by default, and it would have nothing to do after only creating a table + control.stopAllServers(ServerType.TABLET_SERVER); + + cluster.start(); + c.tableOperations().create(names[1]); + } + + @Test + public void killedTabletServerDuringShutdown() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + OPTS.setTableName(tableName); + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + OPTS.updateKerberosCredentials(clientConfig); + } else { + OPTS.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, OPTS, BWOPTS); + try { + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().adminStopAll(); + } finally { + getCluster().start(); + } + } + + @Test + public void shutdownDuringCompactingSplitting() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + VOPTS.setTableName(tableName); + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); + String splitThreshold = null; + for (Entry<String,String> entry : c.tableOperations().getProperties(tableName)) { + if (entry.getKey().equals(Property.TABLE_SPLIT_THRESHOLD.getKey())) { + splitThreshold = entry.getValue(); + break; + } + } + Assert.assertNotNull(splitThreshold); + try { + c.tableOperations().setProperty(MetadataTable.NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), "20K"); + TestIngest.Opts opts = new TestIngest.Opts(); + opts.setTableName(tableName); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, opts, BWOPTS); + c.tableOperations().flush(tableName, null, null, false); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + getCluster().stop(); + } finally { + if (getClusterType() == ClusterType.STANDALONE) { + getCluster().start(); + c.tableOperations().setProperty(MetadataTable.NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), splitThreshold); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java index f607d16,0000000..f58db38 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java @@@ -1,153 -1,0 +1,153 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class RestartStressIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(RestartStressIT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> opts = cfg.getSiteConfig(); + opts.put(Property.TSERV_MAXMEM.getKey(), "100K"); + opts.put(Property.TSERV_MAJC_DELAY.getKey(), "100ms"); + opts.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1M"); - opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s"); ++ opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); + opts.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s"); + cfg.setSiteConfig(opts); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Override + protected int defaultTimeoutSeconds() { + return 10 * 60; + } + + private ExecutorService svc; + + @Before + public void setup() throws Exception { + svc = Executors.newFixedThreadPool(1); + } + + @After + public void teardown() throws Exception { + if (null == svc) { + return; + } + + if (!svc.isShutdown()) { + svc.shutdown(); + } + + while (!svc.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Waiting for threadpool to terminate"); + } + } + + private static final VerifyIngest.Opts VOPTS; + static { + VOPTS = new VerifyIngest.Opts(); + VOPTS.rows = 10 * 1000; + } + private static final ScannerOpts SOPTS = new ScannerOpts(); + + @Test + public void test() throws Exception { + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + final AuthenticationToken token = getAdminToken(); + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "500K"); + final ClusterControl control = getCluster().getClusterControl(); + final String[] args; + if (token instanceof PasswordToken) { + byte[] password = ((PasswordToken) token).getPassword(); + args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", tableName}; + } else if (token instanceof KerberosToken) { + ClusterUser rootUser = getAdminUser(); + args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", tableName}; + } else { + throw new RuntimeException("Unrecognized token"); + } + + Future<Integer> retCode = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + return control.exec(TestIngest.class, args); + } catch (Exception e) { + log.error("Error running TestIngest", e); + return -1; + } + } + }); + + for (int i = 0; i < 2; i++) { + sleepUninterruptibly(10, TimeUnit.SECONDS); + control.stopAllServers(ServerType.TABLET_SERVER); + control.startAllServers(ServerType.TABLET_SERVER); + } + assertEquals(0, retCode.get().intValue()); + VOPTS.setTableName(tableName); + + if (token instanceof PasswordToken) { + VOPTS.setPrincipal(getAdminPrincipal()); + } else if (token instanceof KerberosToken) { + VOPTS.updateKerberosCredentials(cluster.getClientConfig()); + } else { + throw new RuntimeException("Unrecognized token"); + } + + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java index 36bdd7a,0000000..5f7ca88 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java @@@ -1,153 -1,0 +1,153 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriter.Status; +import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Durability; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +public class SessionDurabilityIT extends ConfigurableMacBase { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + } + + @Test(timeout = 3 * 60 * 1000) + public void nondurableTableHasDurableWrites() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + // table default has no durability + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); + // send durable writes + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setDurability(Durability.SYNC); + writeSome(tableName, 10, cfg); + assertEquals(10, count(tableName)); + // verify writes servive restart + restartTServer(); + assertEquals(10, count(tableName)); + } + + @Test(timeout = 3 * 60 * 1000) + public void durableTableLosesNonDurableWrites() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); + // write with no durability + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setDurability(Durability.NONE); + writeSome(tableName, 10, cfg); + // verify writes are lost on restart + restartTServer(); + assertTrue(10 > count(tableName)); + } + + private int count(String tableName) throws Exception { + return Iterators.size(getConnector().createScanner(tableName, Authorizations.EMPTY).iterator()); + } + + private void writeSome(String tableName, int n, BatchWriterConfig cfg) throws Exception { + Connector c = getConnector(); + BatchWriter bw = c.createBatchWriter(tableName, cfg); + for (int i = 0; i < n; i++) { + Mutation m = new Mutation(i + ""); + m.put("", "", ""); + bw.addMutation(m); + } + bw.close(); + } + + @Test(timeout = 3 * 60 * 1000) + public void testConditionDurability() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); + // write without durability + ConditionalWriterConfig cfg = new ConditionalWriterConfig(); + cfg.setDurability(Durability.NONE); + conditionWriteSome(tableName, 10, cfg); + // everything in there? + assertEquals(10, count(tableName)); + // restart the server and verify the updates are lost + restartTServer(); + assertEquals(0, count(tableName)); + } + + @Test(timeout = 3 * 60 * 1000) + public void testConditionDurability2() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); + // write with durability + ConditionalWriterConfig cfg = new ConditionalWriterConfig(); + cfg.setDurability(Durability.SYNC); + conditionWriteSome(tableName, 10, cfg); + // everything in there? + assertEquals(10, count(tableName)); + // restart the server and verify the updates are still there + restartTServer(); + assertEquals(10, count(tableName)); + } + + private void conditionWriteSome(String tableName, int n, ConditionalWriterConfig cfg) throws Exception { + Connector c = getConnector(); + ConditionalWriter cw = c.createConditionalWriter(tableName, cfg); + for (int i = 0; i < n; i++) { + ConditionalMutation m = new ConditionalMutation((CharSequence) (i + ""), new Condition("", "")); + m.put("", "", "X"); + assertEquals(Status.ACCEPTED, cw.write(m).getStatus()); + } + } + + private void restartTServer() throws Exception { + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.start(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java index ab3c662,0000000..6f0bc7a mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java @@@ -1,235 -1,0 +1,235 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY; +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START; +import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT; +import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE; +import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION; +import static org.apache.accumulo.core.security.Authorizations.EMPTY; +import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR; +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.master.state.SetGoalState; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.log.WalStateManager; +import org.apache.accumulo.server.log.WalStateManager.WalState; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Iterators; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class WALSunnyDayIT extends ConfigurableMacBase { + + private static final Text CF = new Text(new byte[0]); + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(GC_CYCLE_DELAY, "1s"); + cfg.setProperty(GC_CYCLE_START, "0s"); + cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M"); + cfg.setProperty(TSERV_WAL_REPLICATION, "1"); - cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s"); ++ cfg.setProperty(INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + int countTrue(Collection<Boolean> bools) { + int result = 0; + for (Boolean b : bools) { + if (b.booleanValue()) + result++; + } + return result; + } + + @Test + public void test() throws Exception { + MiniAccumuloClusterImpl mac = getCluster(); + MiniAccumuloClusterControl control = mac.getClusterControl(); + control.stop(GARBAGE_COLLECTOR); + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + writeSomeData(c, tableName, 1, 1); + + // wal markers are added lazily + Map<String,Boolean> wals = getWals(c); + assertEquals(wals.toString(), 2, wals.size()); + for (Boolean b : wals.values()) { + assertTrue("logs should be in use", b.booleanValue()); + } + + // roll log, get a new next + writeSomeData(c, tableName, 1001, 50); + Map<String,Boolean> walsAfterRoll = getWals(c); + assertEquals("should have 3 WALs after roll", 3, walsAfterRoll.size()); + assertTrue("new WALs should be a superset of the old WALs", walsAfterRoll.keySet().containsAll(wals.keySet())); + assertEquals("all WALs should be in use", 3, countTrue(walsAfterRoll.values())); + + // flush the tables + for (String table : new String[] {tableName, MetadataTable.NAME, RootTable.NAME}) { + c.tableOperations().flush(table, null, null, true); + } + sleepUninterruptibly(1, TimeUnit.SECONDS); + // rolled WAL is no longer in use, but needs to be GC'd + Map<String,Boolean> walsAfterflush = getWals(c); + assertEquals(walsAfterflush.toString(), 3, walsAfterflush.size()); + assertEquals("inUse should be 2", 2, countTrue(walsAfterflush.values())); + + // let the GC run for a little bit + control.start(GARBAGE_COLLECTOR); + sleepUninterruptibly(5, TimeUnit.SECONDS); + // make sure the unused WAL goes away + Map<String,Boolean> walsAfterGC = getWals(c); + assertEquals(walsAfterGC.toString(), 2, walsAfterGC.size()); + control.stop(GARBAGE_COLLECTOR); + // restart the tserver, but don't run recovery on all tablets + control.stop(TABLET_SERVER); + // this delays recovery on the normal tables + assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor()); + control.start(TABLET_SERVER); + + // wait for the metadata table to go back online + getRecoveryMarkers(c); + // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets + sleepUninterruptibly(5, TimeUnit.SECONDS); + Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c); + // log.debug("markers " + markers); + assertEquals("one tablet should have markers", 1, markers.keySet().size()); + assertEquals("tableId of the keyExtent should be 1", markers.keySet().iterator().next().getTableId(), new Text("1")); + + // put some data in the WAL + assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor()); + verifySomeData(c, tableName, 1001 * 50 + 1); + writeSomeData(c, tableName, 100, 100); + + Map<String,Boolean> walsAfterRestart = getWals(c); + // log.debug("wals after " + walsAfterRestart); + assertEquals("used WALs after restart should be 4", 4, countTrue(walsAfterRestart.values())); + control.start(GARBAGE_COLLECTOR); + sleepUninterruptibly(5, TimeUnit.SECONDS); + Map<String,Boolean> walsAfterRestartAndGC = getWals(c); + assertEquals("wals left should be 2", 2, walsAfterRestartAndGC.size()); + assertEquals("logs in use should be 2", 2, countTrue(walsAfterRestartAndGC.values())); + } + + private void verifySomeData(Connector c, String tableName, int expected) throws Exception { + Scanner scan = c.createScanner(tableName, EMPTY); + int result = Iterators.size(scan.iterator()); + scan.close(); + Assert.assertEquals(expected, result); + } + + private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception { + Random rand = new Random(); + BatchWriter bw = conn.createBatchWriter(tableName, null); + byte[] rowData = new byte[10]; + byte[] cq = new byte[10]; + byte[] value = new byte[10]; + + for (int r = 0; r < row; r++) { + rand.nextBytes(rowData); + Mutation m = new Mutation(rowData); + for (int c = 0; c < col; c++) { + rand.nextBytes(cq); + rand.nextBytes(value); + m.put(CF, new Text(cq), new Value(value)); + } + bw.addMutation(m); + if (r % 100 == 0) { + bw.flush(); + } + } + bw.close(); + } + + private Map<String,Boolean> getWals(Connector c) throws Exception { + Map<String,Boolean> result = new HashMap<>(); + Instance i = c.getInstance(); + ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), ""); + WalStateManager wals = new WalStateManager(c.getInstance(), zk); + for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) { + // WALs are in use if they are not unreferenced + result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED); + } + return result; + } + + private Map<KeyExtent,List<String>> getRecoveryMarkers(Connector c) throws Exception { + Map<KeyExtent,List<String>> result = new HashMap<>(); + Scanner root = c.createScanner(RootTable.NAME, EMPTY); + root.setRange(TabletsSection.getRange()); + root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME); + TabletColumnFamily.PREV_ROW_COLUMN.fetch(root); + + Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY); + meta.setRange(TabletsSection.getRange()); + meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME); + TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta); + + List<String> logs = new ArrayList<>(); + Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator()); + while (both.hasNext()) { + Entry<Key,Value> entry = both.next(); + Key key = entry.getKey(); + if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) { + logs.add(key.getColumnQualifier().toString()); + } + if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) { + KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue()); + result.put(extent, logs); + logs = new ArrayList<String>(); + } + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java index d877969,0000000..0074f5f mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java @@@ -1,79 -1,0 +1,79 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; + +public class WriteAheadLogIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); + cfg.setProperty(Property.GC_CYCLE_START, "1"); + cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s"); + cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "4s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Override + protected int defaultTimeoutSeconds() { + return 10 * 60; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "750K"); + TestIngest.Opts opts = new TestIngest.Opts(); + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + opts.setTableName(tableName); + + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + vopts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(getAdminPrincipal()); + vopts.setPrincipal(getAdminPrincipal()); + } + + TestIngest.ingest(c, opts, new BatchWriterOpts()); + vopts.setTableName(tableName); + VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java index b19ec2f,0000000..5ece1ac mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java @@@ -1,89 -1,0 +1,89 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class ZookeeperRestartIT extends ConfigurableMacBase { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> siteConfig = new HashMap<String,String>(); - siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s"); ++ siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); + cfg.setSiteConfig(siteConfig); + } + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + c.tableOperations().create("test_ingest"); + BatchWriter bw = c.createBatchWriter("test_ingest", null); + Mutation m = new Mutation("row"); + m.put("cf", "cq", "value"); + bw.addMutation(m); + bw.close(); + + // kill zookeeper + for (ProcessReference proc : cluster.getProcesses().get(ServerType.ZOOKEEPER)) + cluster.killProcess(ServerType.ZOOKEEPER, proc); + + // give the servers time to react + sleepUninterruptibly(1, TimeUnit.SECONDS); + + // start zookeeper back up + cluster.start(); + + // use the tservers + Scanner s = c.createScanner("test_ingest", Authorizations.EMPTY); + Iterator<Entry<Key,Value>> i = s.iterator(); + assertTrue(i.hasNext()); + assertEquals("row", i.next().getKey().getRow().toString()); + assertFalse(i.hasNext()); + // use the master + c.tableOperations().delete("test_ingest"); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java index 7f61a08,0000000..133c09c mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java +++ b/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java @@@ -1,146 -1,0 +1,146 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.proxy.Proxy; +import org.apache.accumulo.proxy.thrift.AccumuloProxy.Client; +import org.apache.accumulo.proxy.thrift.Column; +import org.apache.accumulo.proxy.thrift.ColumnUpdate; +import org.apache.accumulo.proxy.thrift.Condition; +import org.apache.accumulo.proxy.thrift.ConditionalStatus; +import org.apache.accumulo.proxy.thrift.ConditionalUpdates; +import org.apache.accumulo.proxy.thrift.ConditionalWriterOptions; +import org.apache.accumulo.proxy.thrift.Durability; +import org.apache.accumulo.proxy.thrift.TimeType; +import org.apache.accumulo.proxy.thrift.WriterOptions; +import org.apache.accumulo.server.util.PortUtils; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.thrift.protocol.TJSONProtocol; +import org.apache.thrift.server.TServer; +import org.junit.Test; + +import com.google.common.collect.Iterators; +import com.google.common.net.HostAndPort; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class ProxyDurabilityIT extends ConfigurableMacBase { + + @Override + protected int defaultTimeoutSeconds() { - return 60; ++ return 120; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setNumTservers(1); + } + + private static ByteBuffer bytes(String value) { + return ByteBuffer.wrap(value.getBytes()); + } + + @Test + public void testDurability() throws Exception { + Connector c = getConnector(); + Properties props = new Properties(); + // Avoid issues with locally installed client configuration files with custom properties + File emptyFile = Files.createTempFile(null, null).toFile(); + emptyFile.deleteOnExit(); + props.put("instance", c.getInstance().getInstanceName()); + props.put("zookeepers", c.getInstance().getZooKeepers()); + props.put("tokenClass", PasswordToken.class.getName()); + props.put("clientConfigurationFile", emptyFile.toString()); + + TJSONProtocol.Factory protocol = new TJSONProtocol.Factory(); + + int proxyPort = PortUtils.getRandomFreePort(); + final TServer proxyServer = Proxy.createProxyServer(HostAndPort.fromParts("localhost", proxyPort), protocol, props).server; + while (!proxyServer.isServing()) + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + Client client = new TestProxyClient("localhost", proxyPort, protocol).proxy(); + Map<String,String> properties = new TreeMap<String,String>(); + properties.put("password", ROOT_PASSWORD); + ByteBuffer login = client.login("root", properties); + + String tableName = getUniqueNames(1)[0]; + client.createTable(login, tableName, true, TimeType.MILLIS); + assertTrue(c.tableOperations().exists(tableName)); + + WriterOptions options = new WriterOptions(); + options.setDurability(Durability.NONE); + String writer = client.createWriter(login, tableName, options); + Map<ByteBuffer,List<ColumnUpdate>> cells = new TreeMap<ByteBuffer,List<ColumnUpdate>>(); + ColumnUpdate column = new ColumnUpdate(bytes("cf"), bytes("cq")); + column.setValue("value".getBytes()); + cells.put(bytes("row"), Collections.singletonList(column)); + client.update(writer, cells); + client.closeWriter(writer); + assertEquals(1, count(tableName)); + restartTServer(); + assertEquals(0, count(tableName)); + + ConditionalWriterOptions cfg = new ConditionalWriterOptions(); + cfg.setDurability(Durability.SYNC); + String cwriter = client.createConditionalWriter(login, tableName, cfg); + ConditionalUpdates updates = new ConditionalUpdates(); + updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes("")))); + updates.addToUpdates(column); + Map<ByteBuffer,ConditionalStatus> status = client.updateRowsConditionally(cwriter, Collections.singletonMap(bytes("row"), updates)); + assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row"))); + assertEquals(1, count(tableName)); + restartTServer(); + assertEquals(1, count(tableName)); + + proxyServer.stop(); + } + + private void restartTServer() throws Exception { + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.start(); + } + + private int count(String tableName) throws Exception { + return Iterators.size((getConnector().createScanner(tableName, Authorizations.EMPTY)).iterator()); + } + +}