http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java index bf0b618,0000000..f243562 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java @@@ -1,144 -1,0 +1,144 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + - import static com.google.common.base.Charsets.UTF_8; ++import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.cli.ClientOpts.Password; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +/** + * This test verifies that when a lot of files are bulk imported into a table with one tablet and then splits that not all map files go to the children tablets. + */ + +public class BulkSplitOptimizationIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_MAJC_DELAY, "1s"); + } + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + private String majcDelay; + + @Before + public void alterConfig() throws Exception { + Connector conn = getConnector(); + majcDelay = conn.instanceOperations().getSystemConfiguration().get(Property.TSERV_MAJC_DELAY.getKey()); + if (!"1s".equals(majcDelay)) { + conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1s"); + getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getClusterControl().startAllServers(ServerType.TABLET_SERVER); + } + } + + @After + public void resetConfig() throws Exception { + if (null != majcDelay) { + Connector conn = getConnector(); + conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay); + getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getClusterControl().startAllServers(ServerType.TABLET_SERVER); + } + } + + static final int ROWS = 100000; + static final int SPLITS = 99; + + @Test + public void testBulkSplitOptimization() throws Exception { + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000"); + c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000"); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G"); + FileSystem fs = cluster.getFileSystem(); + Path testDir = new Path(getUsableDir(), "testmf"); + FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8); + FileStatus[] stats = fs.listStatus(testDir); + + System.out.println("Number of generated files: " + stats.length); + FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString()); + FunctionalTestUtils.checkSplits(c, tableName, 0, 0); + FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100); + + // initiate splits + getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K"); + + sleepUninterruptibly(2, TimeUnit.SECONDS); + + // wait until over split threshold -- should be 78 splits + while (getConnector().tableOperations().listSplits(tableName).size() < 75) { + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } + + FunctionalTestUtils.checkSplits(c, tableName, 50, 100); + VerifyIngest.Opts opts = new VerifyIngest.Opts(); + opts.timestamp = 1; + opts.dataSize = 50; + opts.random = 56; + opts.rows = 100000; + opts.startRow = 0; + opts.cols = 1; + opts.setTableName(tableName); + + AuthenticationToken adminToken = getAdminToken(); + if (adminToken instanceof PasswordToken) { + PasswordToken token = (PasswordToken) getAdminToken(); + opts.setPassword(new Password(new String(token.getPassword(), UTF_8))); + opts.setPrincipal(getAdminPrincipal()); + } else if (adminToken instanceof KerberosToken) { + ClientConfiguration clientConf = cluster.getClientConfig(); + opts.updateKerberosCredentials(clientConf); + } else { + Assert.fail("Unknown token type"); + } + + VerifyIngest.verifyIngest(c, opts, new ScannerOpts()); + + // ensure each tablet does not have all map files, should be ~2.5 files per tablet + FunctionalTestUtils.checkRFiles(c, tableName, 50, 100, 1, 4); + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java index 79c4e60,0000000..189f68f mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java @@@ -1,106 -1,0 +1,105 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.apache.accumulo.cluster.AccumuloCluster; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOpts.Password; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestRandomDeletes; +import org.apache.accumulo.test.VerifyIngest; +import org.junit.Test; + - import com.google.common.base.Charsets; - +public class DeleteIT extends AccumuloClusterHarness { + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + AuthenticationToken token = getAdminToken(); + if (token instanceof KerberosToken) { + deleteTest(c, getCluster(), getAdminPrincipal(), null, tableName, getAdminUser().getKeytab().getAbsolutePath()); + } else if (token instanceof PasswordToken) { + PasswordToken passwdToken = (PasswordToken) token; - deleteTest(c, getCluster(), getAdminPrincipal(), new String(passwdToken.getPassword(), Charsets.UTF_8), tableName, null); ++ deleteTest(c, getCluster(), getAdminPrincipal(), new String(passwdToken.getPassword(), UTF_8), tableName, null); + } + } + + public static void deleteTest(Connector c, AccumuloCluster cluster, String user, String password, String tableName, String keytab) throws Exception { + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + TestIngest.Opts opts = new TestIngest.Opts(); + vopts.setTableName(tableName); + opts.setTableName(tableName); + vopts.rows = opts.rows = 1000; + vopts.cols = opts.cols = 1; + vopts.random = opts.random = 56; + + assertTrue("Expected one of password or keytab", null != password || null != keytab); + if (null != password) { + assertNull("Given password, expected null keytab", keytab); + Password passwd = new Password(password); + opts.setPassword(passwd); + opts.setPrincipal(user); + vopts.setPassword(passwd); + vopts.setPrincipal(user); + } + if (null != keytab) { + assertNull("Given keytab, expect null password", password); + ClientConfiguration clientConfig = cluster.getClientConfig(); + opts.updateKerberosCredentials(clientConfig); + vopts.updateKerberosCredentials(clientConfig); + } + + BatchWriterOpts BWOPTS = new BatchWriterOpts(); + TestIngest.ingest(c, opts, BWOPTS); + + String[] args = null; + + assertTrue("Expected one of password or keytab", null != password || null != keytab); + if (null != password) { + assertNull("Given password, expected null keytab", keytab); + args = new String[] {"-u", user, "-p", password, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "--table", tableName}; + } + if (null != keytab) { + assertNull("Given keytab, expect null password", password); + args = new String[] {"-u", user, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "--table", tableName, "--keytab", keytab}; + } + + assertEquals(0, cluster.getClusterControl().exec(TestRandomDeletes.class, args)); + TestIngest.ingest(c, opts, BWOPTS); + VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java index 826907c,0000000..279e517 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java @@@ -1,669 -1,0 +1,669 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + - import static com.google.common.base.Charsets.UTF_8; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster; +import org.apache.accumulo.cluster.standalone.StandaloneClusterControl; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.AgeOffFilter; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.examples.simple.client.Flush; +import org.apache.accumulo.examples.simple.client.RandomBatchScanner; +import org.apache.accumulo.examples.simple.client.RandomBatchWriter; +import org.apache.accumulo.examples.simple.client.ReadWriteExample; +import org.apache.accumulo.examples.simple.client.RowOperations; +import org.apache.accumulo.examples.simple.client.SequentialBatchWriter; +import org.apache.accumulo.examples.simple.client.TraceDumpExample; +import org.apache.accumulo.examples.simple.client.TracingExample; +import org.apache.accumulo.examples.simple.combiner.StatsCombiner; +import org.apache.accumulo.examples.simple.constraints.MaxMutationSize; +import org.apache.accumulo.examples.simple.dirlist.Ingest; +import org.apache.accumulo.examples.simple.dirlist.QueryUtil; +import org.apache.accumulo.examples.simple.helloworld.InsertWithBatchWriter; +import org.apache.accumulo.examples.simple.helloworld.ReadData; +import org.apache.accumulo.examples.simple.isolation.InterferenceTest; +import org.apache.accumulo.examples.simple.mapreduce.RegexExample; +import org.apache.accumulo.examples.simple.mapreduce.RowHash; +import org.apache.accumulo.examples.simple.mapreduce.TableToFile; +import org.apache.accumulo.examples.simple.mapreduce.TeraSortIngest; +import org.apache.accumulo.examples.simple.mapreduce.WordCount; +import org.apache.accumulo.examples.simple.mapreduce.bulk.BulkIngestExample; +import org.apache.accumulo.examples.simple.mapreduce.bulk.GenerateTestData; +import org.apache.accumulo.examples.simple.mapreduce.bulk.SetupTable; +import org.apache.accumulo.examples.simple.mapreduce.bulk.VerifyIngest; +import org.apache.accumulo.examples.simple.shard.ContinuousQuery; +import org.apache.accumulo.examples.simple.shard.Index; +import org.apache.accumulo.examples.simple.shard.Query; +import org.apache.accumulo.examples.simple.shard.Reverse; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.MemoryUnit; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.start.Main; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.tracer.TraceServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.Tool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +public class ExamplesIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(ExamplesIT.class); + private static final BatchWriterOpts bwOpts = new BatchWriterOpts(); + private static final BatchWriterConfig bwc = new BatchWriterConfig(); + private static final String visibility = "A|B"; + private static final String auths = "A,B"; + + Connector c; + String instance; + String keepers; + String user; + String passwd; + String keytab; + BatchWriter bw; + IteratorSetting is; + String dir; + FileSystem fs; + Authorizations origAuths; + boolean saslEnabled; + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopConf) { + // 128MB * 3 + cfg.setDefaultMemory(cfg.getDefaultMemory() * 3, MemoryUnit.BYTE); + } + + @Before + public void getClusterInfo() throws Exception { + c = getConnector(); + user = getAdminPrincipal(); + AuthenticationToken token = getAdminToken(); + if (token instanceof KerberosToken) { + keytab = getAdminUser().getKeytab().getAbsolutePath(); + saslEnabled = true; + } else if (token instanceof PasswordToken) { + passwd = new String(((PasswordToken) getAdminToken()).getPassword(), UTF_8); + saslEnabled = false; + } else { + Assert.fail("Unknown token type: " + token); + } + fs = getCluster().getFileSystem(); + instance = c.getInstance().getInstanceName(); + keepers = c.getInstance().getZooKeepers(); + dir = new Path(cluster.getTemporaryPath(), getClass().getName()).toString(); + + origAuths = c.securityOperations().getUserAuthorizations(user); + c.securityOperations().changeUserAuthorizations(user, new Authorizations(auths.split(","))); + } + + @After + public void resetAuths() throws Exception { + if (null != origAuths) { + getConnector().securityOperations().changeUserAuthorizations(getAdminPrincipal(), origAuths); + } + } + + @Override + public int defaultTimeoutSeconds() { + return 6 * 60; + } + + @Test + public void testTrace() throws Exception { + Process trace = null; + if (ClusterType.MINI == getClusterType()) { + MiniAccumuloClusterImpl impl = (MiniAccumuloClusterImpl) cluster; + trace = impl.exec(TraceServer.class); + while (!c.tableOperations().exists("trace")) + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-C", "-D", "-c"}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-C", "-D", "-c"}; + } + Entry<Integer,String> pair = cluster.getClusterControl().execWithStdout(TracingExample.class, args); + Assert.assertEquals("Expected return code of zero. STDOUT=" + pair.getValue(), 0, pair.getKey().intValue()); + String result = pair.getValue(); + Pattern pattern = Pattern.compile("TraceID: ([0-9a-f]+)"); + Matcher matcher = pattern.matcher(result); + int count = 0; + while (matcher.find()) { + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--traceid", matcher.group(1)}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--traceid", matcher.group(1)}; + } + pair = cluster.getClusterControl().execWithStdout(TraceDumpExample.class, args); + count++; + } + assertTrue(count > 0); + assertTrue("Output did not contain myApp@myHost", pair.getValue().contains("myApp@myHost")); + if (ClusterType.MINI == getClusterType() && null != trace) { + trace.destroy(); + } + } + + @Test + public void testClasspath() throws Exception { + Entry<Integer,String> entry = getCluster().getClusterControl().execWithStdout(Main.class, new String[] {"classpath"}); + assertEquals(0, entry.getKey().intValue()); + String result = entry.getValue(); + int level1 = result.indexOf("Level 1"); + int level2 = result.indexOf("Level 2"); + int level3 = result.indexOf("Level 3"); + int level4 = result.indexOf("Level 4"); + assertTrue("Level 1 classloader not present.", level1 >= 0); + assertTrue("Level 2 classloader not present.", level2 > 0); + assertTrue("Level 3 classloader not present.", level3 > 0); + assertTrue("Level 4 classloader not present.", level4 > 0); + assertTrue(level1 < level2); + assertTrue(level2 < level3); + assertTrue(level3 < level4); + } + + @Test + public void testDirList() throws Exception { + String[] names = getUniqueNames(3); + String dirTable = names[0], indexTable = names[1], dataTable = names[2]; + String[] args; + String dirListDirectory; + switch (getClusterType()) { + case MINI: + dirListDirectory = ((MiniAccumuloClusterImpl) getCluster()).getConfig().getDir().getAbsolutePath(); + break; + case STANDALONE: + dirListDirectory = ((StandaloneAccumuloCluster) getCluster()).getAccumuloHome(); + break; + default: + throw new RuntimeException("Unknown cluster type"); + } + assumeTrue(new File(dirListDirectory).exists()); + // Index a directory listing on /tmp. If this is running against a standalone cluster, we can't guarantee Accumulo source will be there. + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--dirTable", dirTable, "--indexTable", indexTable, "--dataTable", + dataTable, "--vis", visibility, "--chunkSize", Integer.toString(10000), dirListDirectory}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--dirTable", dirTable, "--indexTable", indexTable, "--dataTable", + dataTable, "--vis", visibility, "--chunkSize", Integer.toString(10000), dirListDirectory}; + } + Entry<Integer,String> entry = getClusterControl().execWithStdout(Ingest.class, args); + assertEquals("Got non-zero return code. Stdout=" + entry.getValue(), 0, entry.getKey().intValue()); + + String expectedFile; + switch (getClusterType()) { + case MINI: + // Should be present in a minicluster dir + expectedFile = "accumulo-site.xml"; + break; + case STANDALONE: + // Should be in place on standalone installs (not having to follow symlinks) + expectedFile = "LICENSE"; + break; + default: + throw new RuntimeException("Unknown cluster type"); + } + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "--keytab", keytab, "-u", user, "-t", indexTable, "--auths", auths, "--search", "--path", + expectedFile}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-p", passwd, "-u", user, "-t", indexTable, "--auths", auths, "--search", "--path", expectedFile}; + } + entry = getClusterControl().execWithStdout(QueryUtil.class, args); + if (ClusterType.MINI == getClusterType()) { + MiniAccumuloClusterImpl impl = (MiniAccumuloClusterImpl) cluster; + for (LogWriter writer : impl.getLogWriters()) { + writer.flush(); + } + } + + log.info("result " + entry.getValue()); + assertEquals(0, entry.getKey().intValue()); + assertTrue(entry.getValue().contains(expectedFile)); + } + + @Test + public void testAgeoffFilter() throws Exception { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + is = new IteratorSetting(10, AgeOffFilter.class); + AgeOffFilter.setTTL(is, 1000L); + c.tableOperations().attachIterator(tableName, is); + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); // let zookeeper updates propagate. + bw = c.createBatchWriter(tableName, bwc); + Mutation m = new Mutation("foo"); + m.put("a", "b", "c"); + bw.addMutation(m); + bw.close(); + sleepUninterruptibly(1, TimeUnit.SECONDS); + assertEquals(0, Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator())); + } + + @Test + public void testStatsCombiner() throws Exception { + String table = getUniqueNames(1)[0]; + c.tableOperations().create(table); + is = new IteratorSetting(10, StatsCombiner.class); + StatsCombiner.setCombineAllColumns(is, true); + + c.tableOperations().attachIterator(table, is); + bw = c.createBatchWriter(table, bwc); + // Write two mutations otherwise the NativeMap would dedupe them into a single update + Mutation m = new Mutation("foo"); + m.put("a", "b", "1"); + bw.addMutation(m); + m = new Mutation("foo"); + m.put("a", "b", "3"); + bw.addMutation(m); + bw.flush(); + + Iterator<Entry<Key,Value>> iter = c.createScanner(table, Authorizations.EMPTY).iterator(); + assertTrue("Iterator had no results", iter.hasNext()); + Entry<Key,Value> e = iter.next(); + assertEquals("Results ", "1,3,4,2", e.getValue().toString()); + assertFalse("Iterator had additional results", iter.hasNext()); + + m = new Mutation("foo"); + m.put("a", "b", "0,20,20,2"); + bw.addMutation(m); + bw.close(); + + iter = c.createScanner(table, Authorizations.EMPTY).iterator(); + assertTrue("Iterator had no results", iter.hasNext()); + e = iter.next(); + assertEquals("Results ", "0,20,24,4", e.getValue().toString()); + assertFalse("Iterator had additional results", iter.hasNext()); + } + + @Test + public void testBloomFilters() throws Exception { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_BLOOM_ENABLED.getKey(), "true"); + String[] args; + if (saslEnabled) { + args = new String[] {"--seed", "7", "-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--num", "100000", "--min", "0", "--max", + "1000000000", "--size", "50", "--batchMemory", "2M", "--batchLatency", "60s", "--batchThreads", "3", "-t", tableName}; + } else { + args = new String[] {"--seed", "7", "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--num", "100000", "--min", "0", "--max", "1000000000", + "--size", "50", "--batchMemory", "2M", "--batchLatency", "60s", "--batchThreads", "3", "-t", tableName}; + } + goodExec(RandomBatchWriter.class, args); + c.tableOperations().flush(tableName, null, null, true); + long diff = 0, diff2 = 0; + // try the speed test a couple times in case the system is loaded with other tests + for (int i = 0; i < 2; i++) { + long now = System.currentTimeMillis(); + if (saslEnabled) { + args = new String[] {"--seed", "7", "-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--num", "10000", "--min", "0", "--max", + "1000000000", "--size", "50", "--scanThreads", "4", "-t", tableName}; + } else { + args = new String[] {"--seed", "7", "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--num", "10000", "--min", "0", "--max", "1000000000", + "--size", "50", "--scanThreads", "4", "-t", tableName}; + } + goodExec(RandomBatchScanner.class, args); + diff = System.currentTimeMillis() - now; + now = System.currentTimeMillis(); + if (saslEnabled) { + args = new String[] {"--seed", "8", "-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--num", "10000", "--min", "0", "--max", + "1000000000", "--size", "50", "--scanThreads", "4", "-t", tableName}; + } else { + args = new String[] {"--seed", "8", "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--num", "10000", "--min", "0", "--max", "1000000000", + "--size", "50", "--scanThreads", "4", "-t", tableName}; + } + int retCode = getClusterControl().exec(RandomBatchScanner.class, args); + assertEquals(1, retCode); + diff2 = System.currentTimeMillis() - now; + if (diff2 < diff) + break; + } + assertTrue(diff2 < diff); + } + + @Test + public void testShardedIndex() throws Exception { + File src = new File(System.getProperty("user.dir") + "/src"); + assumeTrue(src.exists()); + String[] names = getUniqueNames(3); + final String shard = names[0], index = names[1]; + c.tableOperations().create(shard); + c.tableOperations().create(index); + bw = c.createBatchWriter(shard, bwc); + Index.index(30, src, "\\W+", bw); + bw.close(); + BatchScanner bs = c.createBatchScanner(shard, Authorizations.EMPTY, 4); + List<String> found = Query.query(bs, Arrays.asList("foo", "bar"), null); + bs.close(); + // should find ourselves + boolean thisFile = false; + for (String file : found) { + if (file.endsWith("/ExamplesIT.java")) + thisFile = true; + } + assertTrue(thisFile); + + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "--shardTable", shard, "--doc2Term", index, "-u", user, "--keytab", keytab}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "--shardTable", shard, "--doc2Term", index, "-u", getAdminPrincipal(), "-p", passwd}; + } + // create a reverse index + goodExec(Reverse.class, args); + + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "--shardTable", shard, "--doc2Term", index, "-u", user, "--keytab", keytab, "--terms", "5", + "--count", "1000"}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "--shardTable", shard, "--doc2Term", index, "-u", user, "-p", passwd, "--terms", "5", "--count", + "1000"}; + } + // run some queries + goodExec(ContinuousQuery.class, args); + } + + @Test + public void testMaxMutationConstraint() throws Exception { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().addConstraint(tableName, MaxMutationSize.class.getName()); + TestIngest.Opts opts = new TestIngest.Opts(); + opts.rows = 1; + opts.cols = 1000; + opts.setTableName(tableName); + if (saslEnabled) { + opts.updateKerberosCredentials(cluster.getClientConfig()); + } else { + opts.setPrincipal(getAdminPrincipal()); + } + try { + TestIngest.ingest(c, opts, bwOpts); + } catch (MutationsRejectedException ex) { + assertEquals(1, ex.getConstraintViolationSummaries().size()); + } + } + + @Test + public void testBulkIngest() throws Exception { + // TODO Figure out a way to run M/R with Kerberos + assumeTrue(getAdminToken() instanceof PasswordToken); + String tableName = getUniqueNames(1)[0]; + FileSystem fs = getFileSystem(); + Path p = new Path(dir, "tmp"); + if (fs.exists(p)) { + fs.delete(p, true); + } + goodExec(GenerateTestData.class, "--start-row", "0", "--count", "10000", "--output", dir + "/tmp/input/data"); + + List<String> commonArgs = new ArrayList<>(Arrays.asList(new String[] {"-i", instance, "-z", keepers, "-u", user, "--table", tableName})); + if (saslEnabled) { + commonArgs.add("--keytab"); + commonArgs.add(keytab); + } else { + commonArgs.add("-p"); + commonArgs.add(passwd); + } + + List<String> args = new ArrayList<>(commonArgs); + goodExec(SetupTable.class, args.toArray(new String[0])); + + args = new ArrayList<>(commonArgs); + args.addAll(Arrays.asList(new String[] {"--inputDir", dir + "/tmp/input", "--workDir", dir + "/tmp"})); + goodExec(BulkIngestExample.class, args.toArray(new String[0])); + + args = new ArrayList<>(commonArgs); + args.addAll(Arrays.asList(new String[] {"--start-row", "0", "--count", "10000"})); + goodExec(VerifyIngest.class, args.toArray(new String[0])); + } + + @Test + public void testTeraSortAndRead() throws Exception { + // TODO Figure out a way to run M/R with Kerberos + assumeTrue(getAdminToken() instanceof PasswordToken); + String tableName = getUniqueNames(1)[0]; + String[] args; + if (saslEnabled) { + args = new String[] {"--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10", "-nv", "10", "-xv", "10", "-t", tableName, "-i", instance, "-z", keepers, + "-u", user, "--keytab", keytab, "--splits", "4"}; + } else { + args = new String[] {"--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10", "-nv", "10", "-xv", "10", "-t", tableName, "-i", instance, "-z", keepers, + "-u", user, "-p", passwd, "--splits", "4"}; + } + goodExec(TeraSortIngest.class, args); + Path output = new Path(dir, "tmp/nines"); + if (fs.exists(output)) { + fs.delete(output, true); + } + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName, "--rowRegex", ".*999.*", "--output", + output.toString()}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--rowRegex", ".*999.*", "--output", output.toString()}; + } + goodExec(RegexExample.class, args); + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName, "--column", "c:"}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--column", "c:"}; + } + goodExec(RowHash.class, args); + output = new Path(dir, "tmp/tableFile"); + if (fs.exists(output)) { + fs.delete(output, true); + } + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName, "--output", output.toString()}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--output", output.toString()}; + } + goodExec(TableToFile.class, args); + } + + @Test + public void testWordCount() throws Exception { + // TODO Figure out a way to run M/R with Kerberos + assumeTrue(getAdminToken() instanceof PasswordToken); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + is = new IteratorSetting(10, SummingCombiner.class); + SummingCombiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column(new Text("count")))); + SummingCombiner.setEncodingType(is, SummingCombiner.Type.STRING); + c.tableOperations().attachIterator(tableName, is); + Path readme = new Path(new Path(System.getProperty("user.dir")).getParent(), "README.md"); + if (!new File(readme.toString()).exists()) { + log.info("Not running test: README.md does not exist)"); + return; + } + fs.copyFromLocalFile(readme, new Path(dir + "/tmp/wc/README.md")); + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-u", user, "--keytab", keytab, "-z", keepers, "--input", dir + "/tmp/wc", "-t", tableName}; + } else { + args = new String[] {"-i", instance, "-u", user, "-p", passwd, "-z", keepers, "--input", dir + "/tmp/wc", "-t", tableName}; + } + goodExec(WordCount.class, args); + } + + @Test + public void testInsertWithBatchWriterAndReadData() throws Exception { + String tableName = getUniqueNames(1)[0]; + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName}; + } + goodExec(InsertWithBatchWriter.class, args); + goodExec(ReadData.class, args); + } + + @Test + public void testIsolatedScansWithInterference() throws Exception { + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", getUniqueNames(1)[0], "--iterations", "100000", "--isolated"}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", getUniqueNames(1)[0], "--iterations", "100000", "--isolated"}; + } + goodExec(InterferenceTest.class, args); + } + + @Test + public void testScansWithInterference() throws Exception { + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", getUniqueNames(1)[0], "--iterations", "100000"}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", getUniqueNames(1)[0], "--iterations", "100000"}; + } + goodExec(InterferenceTest.class, args); + } + + @Test + public void testRowOperations() throws Exception { + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd}; + } + goodExec(RowOperations.class, args); + } + + @Test + public void testBatchWriter() throws Exception { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName, "--start", "0", "--num", "100000", "--size", "50", + "--batchMemory", "10000000", "--batchLatency", "1000", "--batchThreads", "4", "--vis", visibility}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--start", "0", "--num", "100000", "--size", "50", + "--batchMemory", "10000000", "--batchLatency", "1000", "--batchThreads", "4", "--vis", visibility}; + } + goodExec(SequentialBatchWriter.class, args); + + } + + @Test + public void testReadWriteAndDelete() throws Exception { + String tableName = getUniqueNames(1)[0]; + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--auths", auths, "--table", tableName, "--createtable", "-c", + "--debug"}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--auths", auths, "--table", tableName, "--createtable", "-c", "--debug"}; + } + goodExec(ReadWriteExample.class, args); + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--auths", auths, "--table", tableName, "-d", "--debug"}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--auths", auths, "--table", tableName, "-d", "--debug"}; + } + goodExec(ReadWriteExample.class, args); + + } + + @Test + public void testRandomBatchesAndFlush() throws Exception { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + String[] args; + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--table", tableName, "--num", "100000", "--min", "0", "--max", + "100000", "--size", "100", "--batchMemory", "1000000", "--batchLatency", "1000", "--batchThreads", "4", "--vis", visibility}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--table", tableName, "--num", "100000", "--min", "0", "--max", "100000", + "--size", "100", "--batchMemory", "1000000", "--batchLatency", "1000", "--batchThreads", "4", "--vis", visibility}; + } + goodExec(RandomBatchWriter.class, args); + + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--table", tableName, "--num", "10000", "--min", "0", "--max", + "100000", "--size", "100", "--scanThreads", "4", "--auths", auths}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--table", tableName, "--num", "10000", "--min", "0", "--max", "100000", + "--size", "100", "--scanThreads", "4", "--auths", auths}; + } + goodExec(RandomBatchScanner.class, args); + + if (saslEnabled) { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--table", tableName}; + } else { + args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--table", tableName}; + } + goodExec(Flush.class, args); + } + + private void goodExec(Class<?> theClass, String... args) throws InterruptedException, IOException { + Entry<Integer,String> pair; + if (Tool.class.isAssignableFrom(theClass) && ClusterType.STANDALONE == getClusterType()) { + StandaloneClusterControl control = (StandaloneClusterControl) getClusterControl(); + pair = control.execMapreduceWithStdout(theClass, args); + } else { + // We're already slurping stdout into memory (not redirecting to file). Might as well add it to error message. + pair = getClusterControl().execWithStdout(theClass, args); + } + Assert.assertEquals("stdout=" + pair.getValue(), 0, pair.getKey().intValue()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java index 7b37a9e,0000000..edf73eb mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@@ -1,510 -1,0 +1,510 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URL; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.PrintInfo; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.MonitorUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestMultiTableIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + - import com.google.common.base.Charsets; +import com.google.common.collect.Iterators; + +public class ReadWriteIT extends AccumuloClusterHarness { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + } + + private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class); + + static final int ROWS = 100000; + static final int COLS = 1; + static final String COLF = "colf"; + + @Override + protected int defaultTimeoutSeconds() { + return 6 * 60; + } + + @Test(expected = RuntimeException.class) + public void invalidInstanceName() throws Exception { + final Connector conn = getConnector(); + new ZooKeeperInstance("fake_instance_name", conn.getInstance().getZooKeepers()); + } + + @Test + public void sunnyDay() throws Exception { + // Start accumulo, create a table, insert some data, verify we can read it out. + // Shutdown cleanly. + log.debug("Starting Monitor"); + cluster.getClusterControl().startAllServers(ServerType.MONITOR); + Connector connector = getConnector(); + String tableName = getUniqueNames(1)[0]; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS, COLS, 50, 0, tableName); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS, COLS, 50, 0, tableName); + String monitorLocation = null; + while (null == monitorLocation) { + monitorLocation = MonitorUtil.getLocation(getConnector().getInstance()); + if (null == monitorLocation) { + log.debug("Could not fetch monitor HTTP address from zookeeper"); + Thread.sleep(2000); + } + } + String scheme = "http://"; + if (getCluster() instanceof StandaloneAccumuloCluster) { + StandaloneAccumuloCluster standaloneCluster = (StandaloneAccumuloCluster) getCluster(); + File accumuloSite = new File(standaloneCluster.getServerAccumuloConfDir(), "accumulo-site.xml"); + if (accumuloSite.isFile()) { + Configuration conf = new Configuration(false); + conf.addResource(new Path(accumuloSite.toURI())); + String monitorSslKeystore = conf.get(Property.MONITOR_SSL_KEYSTORE.getKey()); + if (null != monitorSslKeystore) { + log.info("Setting scheme to HTTPS since monitor ssl keystore configuration was observed in {}", accumuloSite); + scheme = "https://"; + SSLContext ctx = SSLContext.getInstance("SSL"); + TrustManager[] tm = new TrustManager[] {new TestTrustManager()}; + ctx.init(new KeyManager[0], tm, new SecureRandom()); + SSLContext.setDefault(ctx); + HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory()); + HttpsURLConnection.setDefaultHostnameVerifier(new TestHostnameVerifier()); + } + } else { + log.info("{} is not a normal file, not checking for monitor running with SSL", accumuloSite); + } + } + URL url = new URL(scheme + monitorLocation); + log.debug("Fetching web page " + url); + String result = FunctionalTestUtils.readAll(url.openStream()); + assertTrue(result.length() > 100); + log.debug("Stopping accumulo cluster"); + ClusterControl control = cluster.getClusterControl(); + control.adminStopAll(); + ZooReader zreader = new ZooReader(connector.getInstance().getZooKeepers(), connector.getInstance().getZooKeepersSessionTimeOut()); + ZooCache zcache = new ZooCache(zreader, null); + byte[] masterLockData; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(connector.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + + control.stopAllServers(ServerType.GARBAGE_COLLECTOR); + control.stopAllServers(ServerType.MONITOR); + control.stopAllServers(ServerType.TRACER); + log.debug("success!"); + // Restarting everything + cluster.start(); + } + + public static void ingest(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String tableName) + throws Exception { + ingest(connector, clientConfig, principal, rows, cols, width, offset, COLF, tableName); + } + + public static void ingest(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String colf, + String tableName) throws Exception { + TestIngest.Opts opts = new TestIngest.Opts(); + opts.rows = rows; + opts.cols = cols; + opts.dataSize = width; + opts.startRow = offset; + opts.columnFamily = colf; + opts.createTable = true; + opts.setTableName(tableName); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(principal); + } + + TestIngest.ingest(connector, opts, new BatchWriterOpts()); + } + + public static void verify(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String tableName) + throws Exception { + verify(connector, clientConfig, principal, rows, cols, width, offset, COLF, tableName); + } + + private static void verify(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String colf, + String tableName) throws Exception { + ScannerOpts scannerOpts = new ScannerOpts(); + VerifyIngest.Opts opts = new VerifyIngest.Opts(); + opts.rows = rows; + opts.cols = cols; + opts.dataSize = width; + opts.startRow = offset; + opts.columnFamily = colf; + opts.setTableName(tableName); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(principal); + } + + VerifyIngest.verifyIngest(connector, opts, scannerOpts); + } + + public static String[] args(String... args) { + return args; + } + + @Test + public void multiTableTest() throws Exception { + // Write to multiple tables + final String instance = cluster.getInstanceName(); + final String keepers = cluster.getZooKeepers(); + final ClusterControl control = cluster.getClusterControl(); + final String prefix = getClass().getSimpleName() + "_" + testName.getMethodName(); + ExecutorService svc = Executors.newFixedThreadPool(2); + Future<Integer> p1 = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + ClientConfiguration clientConf = cluster.getClientConfig(); + // Invocation is different for SASL. We're only logged in via this processes memory (not via some credentials cache on disk) + // Need to pass along the keytab because of that. + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + String principal = getAdminPrincipal(); + AuthenticationToken token = getAdminToken(); + assertTrue("Expected KerberosToken, but was " + token.getClass(), token instanceof KerberosToken); + KerberosToken kt = (KerberosToken) token; + assertNotNull("Expected keytab in token", kt.getKeytab()); + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "-i", instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab().getAbsolutePath(), + "-u", principal)); + } + + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "-u", getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String( - ((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8), "--tablePrefix", prefix)); ++ ((PasswordToken) getAdminToken()).getPassword(), UTF_8), "--tablePrefix", prefix)); + } catch (IOException e) { + log.error("Error running MultiTableIngest", e); + return -1; + } + } + }); + Future<Integer> p2 = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + ClientConfiguration clientConf = cluster.getClientConfig(); + // Invocation is different for SASL. We're only logged in via this processes memory (not via some credentials cache on disk) + // Need to pass along the keytab because of that. + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + String principal = getAdminPrincipal(); + AuthenticationToken token = getAdminToken(); + assertTrue("Expected KerberosToken, but was " + token.getClass(), token instanceof KerberosToken); + KerberosToken kt = (KerberosToken) token; + assertNotNull("Expected keytab in token", kt.getKeytab()); + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "--readonly", "-i", instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab() + .getAbsolutePath(), "-u", principal)); + } + + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "--readonly", "-u", getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String( - ((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8), "--tablePrefix", prefix)); ++ ((PasswordToken) getAdminToken()).getPassword(), UTF_8), "--tablePrefix", prefix)); + } catch (IOException e) { + log.error("Error running MultiTableIngest", e); + return -1; + } + } + }); + svc.shutdown(); + while (!svc.isTerminated()) { + svc.awaitTermination(15, TimeUnit.SECONDS); + } + assertEquals(0, p1.get().intValue()); + assertEquals(0, p2.get().intValue()); + } + + @Test + public void largeTest() throws Exception { + // write a few large values + Connector connector = getConnector(); + String table = getUniqueNames(1)[0]; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 1, 500000, 0, table); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 1, 500000, 0, table); + } + + @Test + public void interleaved() throws Exception { + // read and write concurrently + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + interleaveTest(connector, tableName); + } + + static void interleaveTest(final Connector connector, final String tableName) throws Exception { + final AtomicBoolean fail = new AtomicBoolean(false); + final int CHUNKSIZE = ROWS / 10; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, 0, tableName); + int i; + for (i = 0; i < ROWS; i += CHUNKSIZE) { + final int start = i; + Thread verify = new Thread() { + @Override + public void run() { + try { + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, start, tableName); + } catch (Exception ex) { + fail.set(true); + } + } + }; + verify.start(); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, i + CHUNKSIZE, tableName); + verify.join(); + assertFalse(fail.get()); + } + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, i, tableName); + } + + public static Text t(String s) { + return new Text(s); + } + + public static Mutation m(String row, String cf, String cq, String value) { + Mutation m = new Mutation(t(row)); + m.put(t(cf), t(cq), new Value(value.getBytes())); + return m; + } + + @Test + public void localityGroupPerf() throws Exception { + // verify that locality groups can make look-ups faster + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + connector.tableOperations().setProperty(tableName, "table.group.g1", "colf"); + connector.tableOperations().setProperty(tableName, "table.groups.enabled", "g1"); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + connector.tableOperations().compact(tableName, null, null, true, true); + BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); + bw.addMutation(m("zzzzzzzzzzz", "colf2", "cq", "value")); + bw.close(); + long now = System.currentTimeMillis(); + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + scanner.fetchColumnFamily(new Text("colf")); + Iterators.size(scanner.iterator()); + long diff = System.currentTimeMillis() - now; + now = System.currentTimeMillis(); + scanner = connector.createScanner(tableName, Authorizations.EMPTY); + scanner.fetchColumnFamily(new Text("colf2")); + Iterators.size(scanner.iterator()); + bw.close(); + long diff2 = System.currentTimeMillis() - now; + assertTrue(diff2 < diff); + } + + @Test + public void sunnyLG() throws Exception { + // create a locality group, write to it and ensure it exists in the RFiles that result + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + Map<String,Set<Text>> groups = new TreeMap<String,Set<Text>>(); + groups.put("g1", Collections.singleton(t("colf"))); + connector.tableOperations().setLocalityGroups(tableName, groups); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + connector.tableOperations().flush(tableName, null, null, true); + BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 1); + String tableId = connector.tableOperations().tableIdMap().get(tableName); + bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + ";"), new Text(tableId + "<")))); + bscanner.fetchColumnFamily(DataFileColumnFamily.NAME); + boolean foundFile = false; + for (Entry<Key,Value> entry : bscanner) { + foundFile = true; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream newOut = new PrintStream(baos); + PrintStream oldOut = System.out; + try { + System.setOut(newOut); + List<String> args = new ArrayList<>(); + args.add(entry.getKey().getColumnQualifier().toString()); + if (ClusterType.STANDALONE == getClusterType() && cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + args.add("--config"); + StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster; + String hadoopConfDir = sac.getHadoopConfDir(); + args.add(new Path(hadoopConfDir, "core-site.xml").toString()); + args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString()); + } + log.info("Invoking PrintInfo with " + args); + PrintInfo.main(args.toArray(new String[args.size()])); + newOut.flush(); + String stdout = baos.toString(); + assertTrue(stdout.contains("Locality group : g1")); + assertTrue(stdout.contains("families : [colf]")); + } finally { + newOut.close(); + System.setOut(oldOut); + } + } + bscanner.close(); + assertTrue(foundFile); + } + + @Test + public void localityGroupChange() throws Exception { + // Make changes to locality groups and ensure nothing is lost + final Connector connector = getConnector(); + String table = getUniqueNames(1)[0]; + TableOperations to = connector.tableOperations(); + to.create(table); + String[] config = new String[] {"lg1:colf", null, "lg1:colf,xyz", "lg1:colf,xyz;lg2:c1,c2"}; + int i = 0; + for (String cfg : config) { + to.setLocalityGroups(table, getGroups(cfg)); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * (i + 1), 1, 50, ROWS * i, table); + to.flush(table, null, null, true); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 0, 1, 50, ROWS * (i + 1), table); + i++; + } + to.delete(table); + to.create(table); + config = new String[] {"lg1:colf", null, "lg1:colf,xyz", "lg1:colf;lg2:colf",}; + i = 1; + for (String cfg : config) { + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, table); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, "xyz", table); + to.setLocalityGroups(table, getGroups(cfg)); + to.flush(table, null, null, true); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, table); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, "xyz", table); + i++; + } + } + + private Map<String,Set<Text>> getGroups(String cfg) { + Map<String,Set<Text>> groups = new TreeMap<String,Set<Text>>(); + if (cfg != null) { + for (String group : cfg.split(";")) { + String[] parts = group.split(":"); + Set<Text> cols = new HashSet<Text>(); + for (String col : parts[1].split(",")) { + cols.add(t(col)); + } + groups.put(parts[1], cols); + } + } + return groups; + } + + private static class TestTrustManager implements X509TrustManager { + @Override + public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {} + + @Override + public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {} + + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + } + + private static class TestHostnameVerifier implements HostnameVerifier { + @Override + public boolean verify(String hostname, SSLSession session) { + return true; + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java index 4ef0cab,0000000..38d388d mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java @@@ -1,367 -1,0 +1,366 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + - import com.google.common.base.Charsets; - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +public class RestartIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(RestartIT.class); + + @Override + public int defaultTimeoutSeconds() { + return 10 * 60; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + private static final ScannerOpts SOPTS = new ScannerOpts(); + private static final VerifyIngest.Opts VOPTS = new VerifyIngest.Opts(); + private static final TestIngest.Opts OPTS = new TestIngest.Opts(); + private static final BatchWriterOpts BWOPTS = new BatchWriterOpts(); + static { + OPTS.rows = VOPTS.rows = 10 * 1000; + } + + private ExecutorService svc; + + @Before + public void setup() throws Exception { + svc = Executors.newFixedThreadPool(1); + } + + @After + public void teardown() throws Exception { + if (null == svc) { + return; + } + + if (!svc.isShutdown()) { + svc.shutdown(); + } + + while (!svc.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Waiting for threadpool to terminate"); + } + } + + @Test + public void restartMaster() throws Exception { + Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + OPTS.setTableName(tableName); + VOPTS.setTableName(tableName); + c.tableOperations().create(tableName); + final AuthenticationToken token = getAdminToken(); + final ClusterControl control = getCluster().getClusterControl(); + + final String[] args; + if (token instanceof PasswordToken) { + byte[] password = ((PasswordToken) token).getPassword(); - args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z", - cluster.getZooKeepers(), "--rows", "" + OPTS.rows, "--table", tableName}; ++ args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, UTF_8), "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), ++ "--rows", "" + OPTS.rows, "--table", tableName}; + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } else if (token instanceof KerberosToken) { + ClusterUser rootUser = getAdminUser(); + args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", "" + OPTS.rows, "--table", tableName}; + ClientConfiguration clientConfig = cluster.getClientConfig(); + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + throw new RuntimeException("Unknown token"); + } + + Future<Integer> ret = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + return control.exec(TestIngest.class, args); + } catch (IOException e) { + log.error("Error running TestIngest", e); + return -1; + } + } + }); + + control.stopAllServers(ServerType.MASTER); + control.startAllServers(ServerType.MASTER); + assertEquals(0, ret.get().intValue()); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + + @Test + public void restartMasterRecovery() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + OPTS.setTableName(tableName); + VOPTS.setTableName(tableName); + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, OPTS, BWOPTS); + ClusterControl control = getCluster().getClusterControl(); + + // TODO implement a kill all too? + // cluster.stop() would also stop ZooKeeper + control.stopAllServers(ServerType.MASTER); + control.stopAllServers(ServerType.TRACER); + control.stopAllServers(ServerType.TABLET_SERVER); + control.stopAllServers(ServerType.GARBAGE_COLLECTOR); + control.stopAllServers(ServerType.MONITOR); + + ZooReader zreader = new ZooReader(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut()); + ZooCache zcache = new ZooCache(zreader, null); + byte[] masterLockData; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + + cluster.start(); + sleepUninterruptibly(5, TimeUnit.MILLISECONDS); + control.stopAllServers(ServerType.MASTER); + + masterLockData = new byte[0]; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + cluster.start(); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + + @Test + public void restartMasterSplit() throws Exception { + Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + final AuthenticationToken token = getAdminToken(); + final ClusterControl control = getCluster().getClusterControl(); + VOPTS.setTableName(tableName); + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K"); + + final String[] args; + if (token instanceof PasswordToken) { + byte[] password = ((PasswordToken) token).getPassword(); - args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z", - cluster.getZooKeepers(), "--rows", Integer.toString(VOPTS.rows), "--table", tableName}; ++ args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, UTF_8), "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), ++ "--rows", Integer.toString(VOPTS.rows), "--table", tableName}; + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } else if (token instanceof KerberosToken) { + ClusterUser rootUser = getAdminUser(); + args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", Integer.toString(VOPTS.rows), "--table", tableName}; + ClientConfiguration clientConfig = cluster.getClientConfig(); + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + throw new RuntimeException("Unknown token"); + } + + Future<Integer> ret = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + return control.exec(TestIngest.class, args); + } catch (Exception e) { + log.error("Error running TestIngest", e); + return -1; + } + } + }); + + control.stopAllServers(ServerType.MASTER); + + ZooReader zreader = new ZooReader(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut()); + ZooCache zcache = new ZooCache(zreader, null); + byte[] masterLockData; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + + cluster.start(); + assertEquals(0, ret.get().intValue()); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + + @Test + public void killedTabletServer() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + OPTS.setTableName(tableName); + VOPTS.setTableName(tableName); + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, OPTS, BWOPTS); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + cluster.getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + cluster.start(); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + + @Test + public void killedTabletServer2() throws Exception { + final Connector c = getConnector(); + final String[] names = getUniqueNames(2); + final String tableName = names[0]; + final ClusterControl control = getCluster().getClusterControl(); + c.tableOperations().create(tableName); + // Original test started and then stopped a GC. Not sure why it did this. The GC was + // already running by default, and it would have nothing to do after only creating a table + control.stopAllServers(ServerType.TABLET_SERVER); + + cluster.start(); + c.tableOperations().create(names[1]); + } + + @Test + public void killedTabletServerDuringShutdown() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + OPTS.setTableName(tableName); + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + OPTS.updateKerberosCredentials(clientConfig); + } else { + OPTS.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, OPTS, BWOPTS); + try { + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().adminStopAll(); + } finally { + getCluster().start(); + } + } + + @Test + public void shutdownDuringCompactingSplitting() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + VOPTS.setTableName(tableName); + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + OPTS.updateKerberosCredentials(clientConfig); + VOPTS.updateKerberosCredentials(clientConfig); + } else { + OPTS.setPrincipal(getAdminPrincipal()); + VOPTS.setPrincipal(getAdminPrincipal()); + } + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); + String splitThreshold = null; + for (Entry<String,String> entry : c.tableOperations().getProperties(tableName)) { + if (entry.getKey().equals(Property.TABLE_SPLIT_THRESHOLD.getKey())) { + splitThreshold = entry.getValue(); + break; + } + } + Assert.assertNotNull(splitThreshold); + try { + c.tableOperations().setProperty(MetadataTable.NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), "20K"); + TestIngest.Opts opts = new TestIngest.Opts(); + opts.setTableName(tableName); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, opts, BWOPTS); + c.tableOperations().flush(tableName, null, null, false); + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + getCluster().stop(); + } finally { + if (getClusterType() == ClusterType.STANDALONE) { + getCluster().start(); + c.tableOperations().setProperty(MetadataTable.NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), splitThreshold); + } + } + } +}