http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java deleted file mode 100644 index fbbe542..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; - -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.keyfunctor.ColumnFamilyFunctor; -import org.apache.accumulo.core.file.keyfunctor.ColumnQualifierFunctor; -import org.apache.accumulo.core.file.keyfunctor.RowFunctor; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.fate.util.UtilWaitThread; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.MemoryUnit; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BloomFilterIT extends AccumuloClusterHarness { - private static final Logger log = LoggerFactory.getLogger(BloomFilterIT.class); - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setDefaultMemory(1, MemoryUnit.GIGABYTE); - cfg.setNumTservers(1); - Map<String,String> siteConfig = cfg.getSiteConfig(); - siteConfig.put(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "10M"); - cfg.setSiteConfig(siteConfig); - } - - @Override - protected int defaultTimeoutSeconds() { - return 6 * 60; - } - - @Test - public void test() throws Exception { - Connector c = getConnector(); - final String readAhead = c.instanceOperations().getSystemConfiguration().get(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey()); - c.instanceOperations().setProperty(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), "1"); - try { - Thread.sleep(1000); - final String[] tables = getUniqueNames(4); - for (String table : tables) { - TableOperations tops = c.tableOperations(); - tops.create(table); - tops.setProperty(table, Property.TABLE_INDEXCACHE_ENABLED.getKey(), "false"); - tops.setProperty(table, Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "false"); - tops.setProperty(table, Property.TABLE_BLOOM_SIZE.getKey(), "2000000"); - tops.setProperty(table, Property.TABLE_BLOOM_ERRORRATE.getKey(), "1%"); - tops.setProperty(table, Property.TABLE_BLOOM_LOAD_THRESHOLD.getKey(), "0"); - tops.setProperty(table, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64K"); - } - log.info("Writing"); - write(c, tables[0], 1, 0, 2000000000, 500); - write(c, tables[1], 2, 0, 2000000000, 500); - write(c, tables[2], 3, 0, 2000000000, 500); - log.info("Writing complete"); - - // test inserting an empty key - BatchWriter bw = c.createBatchWriter(tables[3], new BatchWriterConfig()); - Mutation m = new Mutation(new Text("")); - m.put(new Text(""), new Text(""), new Value("foo1".getBytes())); - bw.addMutation(m); - bw.close(); - c.tableOperations().flush(tables[3], null, null, true); - - for (String table : Arrays.asList(tables[0], tables[1], tables[2])) { - c.tableOperations().compact(table, null, null, true, true); - } - - // ensure compactions are finished - for (String table : tables) { - FunctionalTestUtils.checkRFiles(c, table, 1, 1, 1, 1); - } - - // these queries should only run quickly if bloom filters are working, so lets get a base - log.info("Base query"); - long t1 = query(c, tables[0], 1, 0, 2000000000, 5000, 500); - long t2 = query(c, tables[1], 2, 0, 2000000000, 5000, 500); - long t3 = query(c, tables[2], 3, 0, 2000000000, 5000, 500); - log.info("Base query complete"); - - log.info("Rewriting with bloom filters"); - c.tableOperations().setProperty(tables[0], Property.TABLE_BLOOM_ENABLED.getKey(), "true"); - c.tableOperations().setProperty(tables[0], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), RowFunctor.class.getName()); - - c.tableOperations().setProperty(tables[1], Property.TABLE_BLOOM_ENABLED.getKey(), "true"); - c.tableOperations().setProperty(tables[1], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), ColumnFamilyFunctor.class.getName()); - - c.tableOperations().setProperty(tables[2], Property.TABLE_BLOOM_ENABLED.getKey(), "true"); - c.tableOperations().setProperty(tables[2], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), ColumnQualifierFunctor.class.getName()); - - c.tableOperations().setProperty(tables[3], Property.TABLE_BLOOM_ENABLED.getKey(), "true"); - c.tableOperations().setProperty(tables[3], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), RowFunctor.class.getName()); - - // ensure the updates to zookeeper propagate - UtilWaitThread.sleep(500); - - c.tableOperations().compact(tables[3], null, null, false, true); - c.tableOperations().compact(tables[0], null, null, false, true); - c.tableOperations().compact(tables[1], null, null, false, true); - c.tableOperations().compact(tables[2], null, null, false, true); - log.info("Rewriting with bloom filters complete"); - - // these queries should only run quickly if bloom - // filters are working - log.info("Bloom query"); - long tb1 = query(c, tables[0], 1, 0, 2000000000, 5000, 500); - long tb2 = query(c, tables[1], 2, 0, 2000000000, 5000, 500); - long tb3 = query(c, tables[2], 3, 0, 2000000000, 5000, 500); - log.info("Bloom query complete"); - timeCheck(t1 + t2 + t3, tb1 + tb2 + tb3); - - // test querying for empty key - Scanner scanner = c.createScanner(tables[3], Authorizations.EMPTY); - scanner.setRange(new Range(new Text(""))); - - if (!scanner.iterator().next().getValue().toString().equals("foo1")) { - throw new Exception("Did not see foo1"); - } - } finally { - c.instanceOperations().setProperty(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), readAhead); - } - } - - private void timeCheck(long t1, long t2) throws Exception { - double improvement = (t1 - t2) * 1.0 / t1; - if (improvement < .1) { - throw new Exception("Queries had less than 10% improvement (old: " + t1 + " new: " + t2 + " improvement: " + (improvement * 100) + "%)"); - } - log.info(String.format("Improvement: %.2f%% (%d vs %d)", (improvement * 100), t1, t2)); - } - - private long query(Connector c, String table, int depth, long start, long end, int num, int step) throws Exception { - Random r = new Random(42); - - HashSet<Long> expected = new HashSet<Long>(); - List<Range> ranges = new ArrayList<Range>(num); - Text key = new Text(); - Text row = new Text("row"), cq = new Text("cq"), cf = new Text("cf"); - - for (int i = 0; i < num; ++i) { - Long k = ((r.nextLong() & 0x7fffffffffffffffl) % (end - start)) + start; - key.set(String.format("k_%010d", k)); - Range range = null; - Key acuKey; - - if (k % (start + step) == 0) { - expected.add(k); - } - - switch (depth) { - case 1: - range = new Range(new Text(key)); - break; - case 2: - acuKey = new Key(row, key, cq); - range = new Range(acuKey, true, acuKey.followingKey(PartialKey.ROW_COLFAM), false); - break; - case 3: - acuKey = new Key(row, cf, key); - range = new Range(acuKey, true, acuKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), false); - break; - } - - ranges.add(range); - } - - BatchScanner bs = c.createBatchScanner(table, Authorizations.EMPTY, 1); - bs.setRanges(ranges); - - long t1 = System.currentTimeMillis(); - for (Entry<Key,Value> entry : bs) { - long v = Long.parseLong(entry.getValue().toString()); - if (!expected.remove(v)) { - throw new Exception("Got unexpected return " + entry.getKey() + " " + entry.getValue()); - } - } - long t2 = System.currentTimeMillis(); - - if (expected.size() > 0) { - throw new Exception("Did not get all expected values " + expected.size()); - } - - bs.close(); - - return t2 - t1; - } - - private void write(Connector c, String table, int depth, long start, long end, int step) throws Exception { - - BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig()); - - for (long i = start; i < end; i += step) { - String key = String.format("k_%010d", i); - - Mutation m = null; - - switch (depth) { - case 1: - m = new Mutation(new Text(key)); - m.put(new Text("cf"), new Text("cq"), new Value(("" + i).getBytes())); - break; - case 2: - m = new Mutation(new Text("row")); - m.put(new Text(key), new Text("cq"), new Value(("" + i).getBytes())); - break; - case 3: - m = new Mutation(new Text("row")); - m.put(new Text("cf"), new Text(key), new Value(("" + i).getBytes())); - break; - } - - bw.addMutation(m); - } - - bw.close(); - - c.tableOperations().flush(table, null, null, true); - } -}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java deleted file mode 100644 index 1abafeb..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.file.rfile.RFile; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.MemoryUnit; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.server.conf.ServerConfigurationFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.junit.Test; - -public class BulkFileIT extends AccumuloClusterHarness { - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) { - cfg.setMemory(ServerType.TABLET_SERVER, 128 * 4, MemoryUnit.MEGABYTE); - } - - @Override - protected int defaultTimeoutSeconds() { - return 4 * 60; - } - - @Test - public void testBulkFile() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - SortedSet<Text> splits = new TreeSet<Text>(); - for (String split : "0333 0666 0999 1333 1666".split(" ")) - splits.add(new Text(split)); - c.tableOperations().addSplits(tableName, splits); - Configuration conf = new Configuration(); - AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance()).getConfiguration(); - FileSystem fs = getCluster().getFileSystem(); - - String rootPath = cluster.getTemporaryPath().toString(); - - String dir = rootPath + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0]; - - fs.delete(new Path(dir), true); - - FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, aconf); - writer1.startDefaultLocalityGroup(); - writeData(writer1, 0, 333); - writer1.close(); - - FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, aconf); - writer2.startDefaultLocalityGroup(); - writeData(writer2, 334, 999); - writer2.close(); - - FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, aconf); - writer3.startDefaultLocalityGroup(); - writeData(writer3, 1000, 1999); - writer3.close(); - - FunctionalTestUtils.bulkImport(c, fs, tableName, dir); - - FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1); - - verifyData(tableName, 0, 1999); - - } - - private void verifyData(String table, int s, int e) throws Exception { - Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY); - - Iterator<Entry<Key,Value>> iter = scanner.iterator(); - - for (int i = s; i <= e; i++) { - if (!iter.hasNext()) - throw new Exception("row " + i + " not found"); - - Entry<Key,Value> entry = iter.next(); - - String row = String.format("%04d", i); - - if (!entry.getKey().getRow().equals(new Text(row))) - throw new Exception("unexpected row " + entry.getKey() + " " + i); - - if (Integer.parseInt(entry.getValue().toString()) != i) - throw new Exception("unexpected value " + entry + " " + i); - } - - if (iter.hasNext()) - throw new Exception("found more than expected " + iter.next()); - } - - private void writeData(FileSKVWriter w, int s, int e) throws Exception { - for (int i = s; i <= e; i++) { - w.append(new Key(new Text(String.format("%04d", i))), new Value(Integer.toString(i).getBytes(UTF_8))); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/BulkIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BulkIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BulkIT.java deleted file mode 100644 index f60724e..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/BulkIT.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.test.TestIngest; -import org.apache.accumulo.test.TestIngest.Opts; -import org.apache.accumulo.test.VerifyIngest; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsShell; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class BulkIT extends AccumuloClusterHarness { - - private static final int N = 100000; - private static final int COUNT = 5; - private static final BatchWriterOpts BWOPTS = new BatchWriterOpts(); - private static final ScannerOpts SOPTS = new ScannerOpts(); - - @Override - protected int defaultTimeoutSeconds() { - return 4 * 60; - } - - private Configuration origConf; - - @Before - public void saveConf() { - origConf = CachedConfiguration.getInstance(); - } - - @After - public void restoreConf() { - if (null != origConf) { - CachedConfiguration.setInstance(origConf); - } - } - - @Test - public void test() throws Exception { - runTest(getConnector(), getCluster().getFileSystem(), getCluster().getTemporaryPath(), getAdminPrincipal(), getUniqueNames(1)[0], - this.getClass().getName(), testName.getMethodName()); - } - - static void runTest(Connector c, FileSystem fs, Path basePath, String principal, String tableName, String filePrefix, String dirSuffix) throws Exception { - c.tableOperations().create(tableName); - CachedConfiguration.setInstance(fs.getConf()); - - Path base = new Path(basePath, "testBulkFail_" + dirSuffix); - fs.delete(base, true); - fs.mkdirs(base); - Path bulkFailures = new Path(base, "failures"); - Path files = new Path(base, "files"); - fs.mkdirs(bulkFailures); - fs.mkdirs(files); - - Opts opts = new Opts(); - opts.timestamp = 1; - opts.random = 56; - opts.rows = N; - opts.instance = c.getInstance().getInstanceName(); - opts.cols = 1; - opts.setTableName(tableName); - opts.conf = CachedConfiguration.getInstance(); - opts.fs = fs; - String fileFormat = filePrefix + "rf%02d"; - for (int i = 0; i < COUNT; i++) { - opts.outputFile = new Path(files, String.format(fileFormat, i)).toString(); - opts.startRow = N * i; - TestIngest.ingest(c, opts, BWOPTS); - } - opts.outputFile = base + String.format(fileFormat, N); - opts.startRow = N; - opts.rows = 1; - // create an rfile with one entry, there was a bug with this: - TestIngest.ingest(c, opts, BWOPTS); - - // Make sure the server can modify the files - FsShell fsShell = new FsShell(fs.getConf()); - Assert.assertEquals("Failed to chmod " + base.toString(), 0, fsShell.run(new String[] {"-chmod", "-R", "777", base.toString()})); - - c.tableOperations().importDirectory(tableName, files.toString(), bulkFailures.toString(), false); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.setTableName(tableName); - vopts.random = 56; - vopts.setPrincipal(principal); - for (int i = 0; i < COUNT; i++) { - vopts.startRow = i * N; - vopts.rows = N; - VerifyIngest.verifyIngest(c, vopts, SOPTS); - } - vopts.startRow = N; - vopts.rows = 1; - VerifyIngest.verifyIngest(c, vopts, SOPTS); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java deleted file mode 100644 index 74d3e96..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static com.google.common.base.Charsets.UTF_8; - -import org.apache.accumulo.core.cli.ClientOpts.Password; -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.KerberosToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.VerifyIngest; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * This test verifies that when a lot of files are bulk imported into a table with one tablet and then splits that not all map files go to the children tablets. - */ - -public class BulkSplitOptimizationIT extends AccumuloClusterHarness { - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.TSERV_MAJC_DELAY, "1s"); - } - - @Override - protected int defaultTimeoutSeconds() { - return 2 * 60; - } - - private String majcDelay; - - @Before - public void alterConfig() throws Exception { - Connector conn = getConnector(); - majcDelay = conn.instanceOperations().getSystemConfiguration().get(Property.TSERV_MAJC_DELAY.getKey()); - if (!"1s".equals(majcDelay)) { - conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1s"); - getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - getClusterControl().startAllServers(ServerType.TABLET_SERVER); - } - } - - @After - public void resetConfig() throws Exception { - if (null != majcDelay) { - Connector conn = getConnector(); - conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay); - getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - getClusterControl().startAllServers(ServerType.TABLET_SERVER); - } - } - - static final int ROWS = 100000; - static final int SPLITS = 99; - - @Test - public void testBulkSplitOptimization() throws Exception { - final Connector c = getConnector(); - final String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000"); - c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000"); - c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G"); - - FileSystem fs = getFileSystem(); - Path testDir = new Path(getUsableDir(), "testmf"); - FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8); - FileStatus[] stats = fs.listStatus(testDir); - - System.out.println("Number of generated files: " + stats.length); - FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString()); - FunctionalTestUtils.checkSplits(c, tableName, 0, 0); - FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100); - - // initiate splits - getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K"); - - UtilWaitThread.sleep(2000); - - // wait until over split threshold -- should be 78 splits - while (getConnector().tableOperations().listSplits(tableName).size() < 75) { - UtilWaitThread.sleep(500); - } - - FunctionalTestUtils.checkSplits(c, tableName, 50, 100); - VerifyIngest.Opts opts = new VerifyIngest.Opts(); - opts.timestamp = 1; - opts.dataSize = 50; - opts.random = 56; - opts.rows = 100000; - opts.startRow = 0; - opts.cols = 1; - opts.setTableName(tableName); - - AuthenticationToken adminToken = getAdminToken(); - if (adminToken instanceof PasswordToken) { - PasswordToken token = (PasswordToken) getAdminToken(); - opts.setPassword(new Password(new String(token.getPassword(), UTF_8))); - opts.setPrincipal(getAdminPrincipal()); - } else if (adminToken instanceof KerberosToken) { - ClientConfiguration clientConf = cluster.getClientConfig(); - opts.updateKerberosCredentials(clientConf); - } else { - Assert.fail("Unknown token type"); - } - - VerifyIngest.verifyIngest(c, opts, new ScannerOpts()); - - // ensure each tablet does not have all map files, should be ~2.5 files per tablet - FunctionalTestUtils.checkRFiles(c, tableName, 50, 100, 1, 4); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java deleted file mode 100644 index 4055c3a..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.server.master.balancer.ChaoticLoadBalancer; -import org.apache.accumulo.test.TestIngest; -import org.apache.accumulo.test.VerifyIngest; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.junit.Test; - -public class ChaoticBalancerIT extends AccumuloClusterHarness { - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - Map<String,String> siteConfig = cfg.getSiteConfig(); - siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K"); - siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0"); - cfg.setSiteConfig(siteConfig); - } - - @Override - protected int defaultTimeoutSeconds() { - return 4 * 60; - } - - @Test - public void test() throws Exception { - Connector c = getConnector(); - String[] names = getUniqueNames(2); - String tableName = names[0], unused = names[1]; - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_LOAD_BALANCER.getKey(), ChaoticLoadBalancer.class.getName()); - c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); - SortedSet<Text> splits = new TreeSet<Text>(); - for (int i = 0; i < 100; i++) { - splits.add(new Text(String.format("%03d", i))); - } - c.tableOperations().create(unused); - c.tableOperations().addSplits(unused, splits); - TestIngest.Opts opts = new TestIngest.Opts(); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.rows = opts.rows = 20000; - opts.setTableName(tableName); - vopts.setTableName(tableName); - ClientConfiguration clientConfig = getCluster().getClientConfig(); - if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { - opts.updateKerberosCredentials(clientConfig); - vopts.updateKerberosCredentials(clientConfig); - } else { - opts.setPrincipal(getAdminPrincipal()); - vopts.setPrincipal(getAdminPrincipal()); - } - TestIngest.ingest(c, opts, new BatchWriterOpts()); - c.tableOperations().flush(tableName, null, null, true); - VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/ClassLoaderIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ClassLoaderIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ClassLoaderIT.java deleted file mode 100644 index c06feed..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/ClassLoaderIT.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Collections; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.Combiner; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.hamcrest.CoreMatchers; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; - -public class ClassLoaderIT extends AccumuloClusterHarness { - - private static final long ZOOKEEPER_PROPAGATION_TIME = 10 * 1000; - - @Override - protected int defaultTimeoutSeconds() { - return 2 * 60; - } - - private String rootPath; - - @Before - public void checkCluster() { - Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI)); - MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster(); - rootPath = mac.getConfig().getDir().getAbsolutePath(); - } - - @Test - public void test() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); - Mutation m = new Mutation("row1"); - m.put("cf", "col1", "Test"); - bw.addMutation(m); - bw.close(); - scanCheck(c, tableName, "Test"); - FileSystem fs = FileSystem.get(CachedConfiguration.getInstance()); - Path jarPath = new Path(rootPath + "/lib/ext/Test.jar"); - fs.copyFromLocalFile(new Path(System.getProperty("user.dir") + "/src/test/resources/TestCombinerX.jar"), jarPath); - UtilWaitThread.sleep(1000); - IteratorSetting is = new IteratorSetting(10, "TestCombiner", "org.apache.accumulo.test.functional.TestCombiner"); - Combiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column("cf"))); - c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.scan)); - UtilWaitThread.sleep(ZOOKEEPER_PROPAGATION_TIME); - scanCheck(c, tableName, "TestX"); - fs.delete(jarPath, true); - fs.copyFromLocalFile(new Path(System.getProperty("user.dir") + "/src/test/resources/TestCombinerY.jar"), jarPath); - UtilWaitThread.sleep(5000); - scanCheck(c, tableName, "TestY"); - fs.delete(jarPath, true); - } - - private void scanCheck(Connector c, String tableName, String expected) throws Exception { - Scanner bs = c.createScanner(tableName, Authorizations.EMPTY); - Iterator<Entry<Key,Value>> iterator = bs.iterator(); - assertTrue(iterator.hasNext()); - Entry<Key,Value> next = iterator.next(); - assertFalse(iterator.hasNext()); - assertEquals(expected, next.getValue().toString()); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java deleted file mode 100644 index 779b407..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; - -public class CleanTmpIT extends ConfigurableMacBase { - private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class); - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); - cfg.setNumTservers(1); - // use raw local file system so walogs sync and flush will work - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - @Override - protected int defaultTimeoutSeconds() { - return 4 * 60; - } - - @Test - public void test() throws Exception { - Connector c = getConnector(); - // make a table - String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - // write to it - BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); - Mutation m = new Mutation("row"); - m.put("cf", "cq", "value"); - bw.addMutation(m); - bw.flush(); - - // Compact memory to make a file - c.tableOperations().compact(tableName, null, null, true, true); - - // Make sure that we'll have a WAL - m = new Mutation("row2"); - m.put("cf", "cq", "value"); - bw.addMutation(m); - bw.close(); - - // create a fake _tmp file in its directory - String id = c.tableOperations().tableIdMap().get(tableName); - Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(Range.prefix(id)); - s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); - Entry<Key,Value> entry = Iterables.getOnlyElement(s); - Path file = new Path(entry.getKey().getColumnQualifier().toString()); - - FileSystem fs = getCluster().getFileSystem(); - assertTrue("Could not find file: " + file, fs.exists(file)); - Path tabletDir = file.getParent(); - assertNotNull("Tablet dir should not be null", tabletDir); - Path tmp = new Path(tabletDir, "junk.rf_tmp"); - // Make the file - fs.create(tmp).close(); - log.info("Created tmp file {}", tmp.toString()); - getCluster().stop(); - getCluster().start(); - - Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY); - assertEquals(2, Iterators.size(scanner.iterator())); - // If we performed log recovery, we should have cleaned up any stray files - assertFalse("File still exists: " + tmp, fs.exists(tmp)); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java deleted file mode 100644 index 1f6d1a0..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.CleanUp; -import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Ensures that all threads spawned for ZooKeeper and Thrift connectivity are reaped after calling CleanUp.shutdown(). - * - * Because this is destructive across the current context classloader, the normal teardown methods will fail (because they attempt to create a Connector). Until - * the ZooKeeperInstance and Connector are self-contained WRT resource management, we can't leverage the AccumuloClusterBase. - */ -public class CleanUpIT extends SharedMiniClusterBase { - private static final Logger log = LoggerFactory.getLogger(CleanUpIT.class); - - @Override - protected int defaultTimeoutSeconds() { - return 30; - } - - @Test - public void run() throws Exception { - - String tableName = getUniqueNames(1)[0]; - getConnector().tableOperations().create(tableName); - - BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); - - Mutation m1 = new Mutation("r1"); - m1.put("cf1", "cq1", 1, "5"); - - bw.addMutation(m1); - - bw.flush(); - - Scanner scanner = getConnector().createScanner(tableName, new Authorizations()); - - int count = 0; - for (Entry<Key,Value> entry : scanner) { - count++; - if (!entry.getValue().toString().equals("5")) { - Assert.fail("Unexpected value " + entry.getValue()); - } - } - - Assert.assertEquals("Unexpected count", 1, count); - - int threadCount = countThreads(); - if (threadCount < 2) { - printThreadNames(); - Assert.fail("Not seeing expected threads. Saw " + threadCount); - } - - CleanUp.shutdownNow(); - - Mutation m2 = new Mutation("r2"); - m2.put("cf1", "cq1", 1, "6"); - - try { - bw.addMutation(m1); - bw.flush(); - Assert.fail("batch writer did not fail"); - } catch (Exception e) { - - } - - try { - // expect this to fail also, want to clean up batch writer threads - bw.close(); - Assert.fail("batch writer close not fail"); - } catch (Exception e) { - - } - - try { - count = 0; - Iterator<Entry<Key,Value>> iter = scanner.iterator(); - while (iter.hasNext()) { - iter.next(); - count++; - } - Assert.fail("scanner did not fail"); - } catch (Exception e) { - - } - - threadCount = countThreads(); - if (threadCount > 0) { - printThreadNames(); - Assert.fail("Threads did not go away. Saw " + threadCount); - } - } - - private void printThreadNames() { - Set<Thread> threads = Thread.getAllStackTraces().keySet(); - Exception e = new Exception(); - for (Thread thread : threads) { - e.setStackTrace(thread.getStackTrace()); - log.info("thread name: " + thread.getName(), e); - } - } - - /** - * count threads that should be cleaned up - * - */ - private int countThreads() { - int count = 0; - Set<Thread> threads = Thread.getAllStackTraces().keySet(); - for (Thread thread : threads) { - - if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread")) - count++; - - if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool")) - count++; - } - - return count; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/CloneTestIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CloneTestIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CloneTestIT.java deleted file mode 100644 index b3d0ab5..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/CloneTestIT.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.accumulo.cluster.AccumuloCluster; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.DiskUsage; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; -import org.apache.accumulo.server.ServerConstants; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Test; - -/** - * - */ -public class CloneTestIT extends AccumuloClusterHarness { - - @Override - protected int defaultTimeoutSeconds() { - return 2 * 60; - } - - @Test - public void testProps() throws Exception { - String[] tableNames = getUniqueNames(2); - String table1 = tableNames[0]; - String table2 = tableNames[1]; - - Connector c = getConnector(); - - c.tableOperations().create(table1); - - c.tableOperations().setProperty(table1, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1M"); - c.tableOperations().setProperty(table1, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), "2M"); - c.tableOperations().setProperty(table1, Property.TABLE_FILE_MAX.getKey(), "23"); - - BatchWriter bw = writeData(table1, c); - - Map<String,String> props = new HashMap<String,String>(); - props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "500K"); - - Set<String> exclude = new HashSet<String>(); - exclude.add(Property.TABLE_FILE_MAX.getKey()); - - c.tableOperations().clone(table1, table2, true, props, exclude); - - Mutation m3 = new Mutation("009"); - m3.put("data", "x", "1"); - m3.put("data", "y", "2"); - bw.addMutation(m3); - bw.close(); - - checkData(table2, c); - - checkMetadata(table2, c); - - HashMap<String,String> tableProps = new HashMap<String,String>(); - for (Entry<String,String> prop : c.tableOperations().getProperties(table2)) { - tableProps.put(prop.getKey(), prop.getValue()); - } - - Assert.assertEquals("500K", tableProps.get(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey())); - Assert.assertEquals(Property.TABLE_FILE_MAX.getDefaultValue(), tableProps.get(Property.TABLE_FILE_MAX.getKey())); - Assert.assertEquals("2M", tableProps.get(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey())); - - c.tableOperations().delete(table1); - c.tableOperations().delete(table2); - - } - - private void checkData(String table2, Connector c) throws TableNotFoundException { - Scanner scanner = c.createScanner(table2, Authorizations.EMPTY); - - HashMap<String,String> expected = new HashMap<String,String>(); - expected.put("001:x", "9"); - expected.put("001:y", "7"); - expected.put("008:x", "3"); - expected.put("008:y", "4"); - - HashMap<String,String> actual = new HashMap<String,String>(); - - for (Entry<Key,Value> entry : scanner) - actual.put(entry.getKey().getRowData().toString() + ":" + entry.getKey().getColumnQualifierData().toString(), entry.getValue().toString()); - - Assert.assertEquals(expected, actual); - } - - private void checkMetadata(String table, Connector conn) throws Exception { - Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - - s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); - MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(s); - String tableId = conn.tableOperations().tableIdMap().get(table); - - Assert.assertNotNull("Could not get table id for " + table, tableId); - - s.setRange(Range.prefix(tableId)); - - Key k; - Text cf = new Text(), cq = new Text(); - int itemsInspected = 0; - for (Entry<Key,Value> entry : s) { - itemsInspected++; - k = entry.getKey(); - k.getColumnFamily(cf); - k.getColumnQualifier(cq); - - if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) { - Path p = new Path(cq.toString()); - FileSystem fs = cluster.getFileSystem(); - Assert.assertTrue("File does not exist: " + p, fs.exists(p)); - } else if (cf.equals(MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily())) { - Assert.assertEquals("Saw unexpected cq", MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), cq); - Path tabletDir = new Path(entry.getValue().toString()); - Path tableDir = tabletDir.getParent(); - Path tablesDir = tableDir.getParent(); - - Assert.assertEquals(ServerConstants.TABLE_DIR, tablesDir.getName()); - } else { - Assert.fail("Got unexpected key-value: " + entry); - throw new RuntimeException(); - } - } - - Assert.assertTrue("Expected to find metadata entries", itemsInspected > 0); - } - - private BatchWriter writeData(String table1, Connector c) throws TableNotFoundException, MutationsRejectedException { - BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig()); - - Mutation m1 = new Mutation("001"); - m1.put("data", "x", "9"); - m1.put("data", "y", "7"); - - Mutation m2 = new Mutation("008"); - m2.put("data", "x", "3"); - m2.put("data", "y", "4"); - - bw.addMutation(m1); - bw.addMutation(m2); - - bw.flush(); - return bw; - } - - @Test - public void testDeleteClone() throws Exception { - String[] tableNames = getUniqueNames(3); - String table1 = tableNames[0]; - String table2 = tableNames[1]; - String table3 = tableNames[2]; - - Connector c = getConnector(); - AccumuloCluster cluster = getCluster(); - Assume.assumeTrue(cluster instanceof MiniAccumuloClusterImpl); - MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster; - String rootPath = mac.getConfig().getDir().getAbsolutePath(); - - // verify that deleting a new table removes the files - c.tableOperations().create(table3); - writeData(table3, c).close(); - c.tableOperations().flush(table3, null, null, true); - // check for files - FileSystem fs = getCluster().getFileSystem(); - String id = c.tableOperations().tableIdMap().get(table3); - FileStatus[] status = fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)); - assertTrue(status.length > 0); - // verify disk usage - List<DiskUsage> diskUsage = c.tableOperations().getDiskUsage(Collections.singleton(table3)); - assertEquals(1, diskUsage.size()); - assertTrue(diskUsage.get(0).getUsage() > 100); - // delete the table - c.tableOperations().delete(table3); - // verify its gone from the file system - Path tablePath = new Path(rootPath + "/accumulo/tables/" + id); - if (fs.exists(tablePath)) { - status = fs.listStatus(tablePath); - assertTrue(status == null || status.length == 0); - } - - c.tableOperations().create(table1); - - BatchWriter bw = writeData(table1, c); - - Map<String,String> props = new HashMap<String,String>(); - props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "500K"); - - Set<String> exclude = new HashSet<String>(); - exclude.add(Property.TABLE_FILE_MAX.getKey()); - - c.tableOperations().clone(table1, table2, true, props, exclude); - - Mutation m3 = new Mutation("009"); - m3.put("data", "x", "1"); - m3.put("data", "y", "2"); - bw.addMutation(m3); - bw.close(); - - // delete source table, should not affect clone - c.tableOperations().delete(table1); - - checkData(table2, c); - - c.tableOperations().compact(table2, null, null, true, true); - - checkData(table2, c); - - c.tableOperations().delete(table2); - - } - - @Test - public void testCloneWithSplits() throws Exception { - Connector conn = getConnector(); - - List<Mutation> mutations = new ArrayList<Mutation>(); - TreeSet<Text> splits = new TreeSet<Text>(); - for (int i = 0; i < 10; i++) { - splits.add(new Text(Integer.toString(i))); - Mutation m = new Mutation(Integer.toString(i)); - m.put("", "", ""); - mutations.add(m); - } - - String[] tables = getUniqueNames(2); - - conn.tableOperations().create(tables[0]); - - conn.tableOperations().addSplits(tables[0], splits); - - BatchWriter bw = conn.createBatchWriter(tables[0], new BatchWriterConfig()); - bw.addMutations(mutations); - bw.close(); - - conn.tableOperations().clone(tables[0], tables[1], true, null, null); - - conn.tableOperations().deleteRows(tables[1], new Text("4"), new Text("8")); - - List<String> rows = Arrays.asList("0", "1", "2", "3", "4", "9"); - List<String> actualRows = new ArrayList<String>(); - for (Entry<Key,Value> entry : conn.createScanner(tables[1], Authorizations.EMPTY)) { - actualRows.add(entry.getKey().getRow().toString()); - } - - Assert.assertEquals(rows, actualRows); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/CombinerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CombinerIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CombinerIT.java deleted file mode 100644 index d4ef18e..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/CombinerIT.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Collections; -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.LongCombiner.Type; -import org.apache.accumulo.core.iterators.user.SummingCombiner; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.junit.Test; - -public class CombinerIT extends AccumuloClusterHarness { - - @Override - protected int defaultTimeoutSeconds() { - return 60; - } - - private void checkSum(String tableName, Connector c) throws Exception { - Scanner s = c.createScanner(tableName, Authorizations.EMPTY); - Iterator<Entry<Key,Value>> i = s.iterator(); - assertTrue(i.hasNext()); - Entry<Key,Value> entry = i.next(); - assertEquals("45", entry.getValue().toString()); - assertFalse(i.hasNext()); - } - - @Test - public void aggregationTest() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - IteratorSetting setting = new IteratorSetting(10, SummingCombiner.class); - SummingCombiner.setEncodingType(setting, Type.STRING); - SummingCombiner.setColumns(setting, Collections.singletonList(new IteratorSetting.Column("cf"))); - c.tableOperations().attachIterator(tableName, setting); - BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); - for (int i = 0; i < 10; i++) { - Mutation m = new Mutation("row1"); - m.put("cf".getBytes(), "col1".getBytes(), ("" + i).getBytes()); - bw.addMutation(m); - } - bw.close(); - checkSum(tableName, c); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java deleted file mode 100644 index 862365f..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.accumulo.core.cli.ClientOpts.Password; -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.InstanceOperations; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.VerifyIngest; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Iterators; - -public class CompactionIT extends AccumuloClusterHarness { - private static final Logger log = LoggerFactory.getLogger(CompactionIT.class); - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - cfg.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN, "4"); - cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); - cfg.setProperty(Property.TSERV_MAJC_MAXCONCURRENT, "1"); - // use raw local file system so walogs sync and flush will work - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - @Override - protected int defaultTimeoutSeconds() { - return 4 * 60; - } - - private String majcThreadMaxOpen, majcDelay, majcMaxConcurrent; - - @Before - public void alterConfig() throws Exception { - if (ClusterType.STANDALONE == getClusterType()) { - InstanceOperations iops = getConnector().instanceOperations(); - Map<String,String> config = iops.getSystemConfiguration(); - majcThreadMaxOpen = config.get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()); - majcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey()); - majcMaxConcurrent = config.get(Property.TSERV_MAJC_MAXCONCURRENT.getKey()); - - iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4"); - iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1"); - iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1"); - - getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - getClusterControl().startAllServers(ServerType.TABLET_SERVER); - } - } - - @After - public void resetConfig() throws Exception { - // We set the values.. - if (null != majcThreadMaxOpen) { - InstanceOperations iops = getConnector().instanceOperations(); - - iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), majcThreadMaxOpen); - iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay); - iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), majcMaxConcurrent); - - getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - getClusterControl().startAllServers(ServerType.TABLET_SERVER); - } - } - - @Test - public void test() throws Exception { - final Connector c = getConnector(); - final String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0"); - FileSystem fs = getFileSystem(); - Path root = new Path(cluster.getTemporaryPath(), getClass().getName()); - Path testrf = new Path(root, "testrf"); - FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4); - - FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString()); - int beforeCount = countFiles(c); - - final AtomicBoolean fail = new AtomicBoolean(false); - final ClientConfiguration clientConf = cluster.getClientConfig(); - for (int count = 0; count < 5; count++) { - List<Thread> threads = new ArrayList<Thread>(); - final int span = 500000 / 59; - for (int i = 0; i < 500000; i += 500000 / 59) { - final int finalI = i; - Thread t = new Thread() { - @Override - public void run() { - try { - VerifyIngest.Opts opts = new VerifyIngest.Opts(); - opts.startRow = finalI; - opts.rows = span; - opts.random = 56; - opts.dataSize = 50; - opts.cols = 1; - opts.setTableName(tableName); - if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { - opts.updateKerberosCredentials(clientConf); - } else { - opts.setPrincipal(getAdminPrincipal()); - PasswordToken passwordToken = (PasswordToken) getAdminToken(); - opts.setPassword(new Password(new String(passwordToken.getPassword(), UTF_8))); - } - VerifyIngest.verifyIngest(c, opts, new ScannerOpts()); - } catch (Exception ex) { - log.warn("Got exception verifying data", ex); - fail.set(true); - } - } - }; - t.start(); - threads.add(t); - } - for (Thread t : threads) - t.join(); - assertFalse("Failed to successfully run all threads, Check the test output for error", fail.get()); - } - - int finalCount = countFiles(c); - assertTrue(finalCount < beforeCount); - try { - getClusterControl().adminStopAll(); - } finally { - // Make sure the internal state in the cluster is reset (e.g. processes in MAC) - getCluster().stop(); - if (ClusterType.STANDALONE == getClusterType()) { - // Then restart things for the next test if it's a standalone - getCluster().start(); - } - } - } - - private int countFiles(Connector c) throws Exception { - Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.fetchColumnFamily(MetadataSchema.TabletsSection.TabletColumnFamily.NAME); - s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); - return Iterators.size(s.iterator()); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java deleted file mode 100644 index 75eecfd..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.EnumSet; -import java.util.Map; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.junit.Test; - -import com.google.common.collect.Iterators; - -public class ConcurrencyIT extends AccumuloClusterHarness { - - static class ScanTask extends Thread { - - int count = 0; - Scanner scanner; - - ScanTask(Connector conn, String tableName, long time) throws Exception { - scanner = conn.createScanner(tableName, Authorizations.EMPTY); - IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class); - SlowIterator.setSleepTime(slow, time); - scanner.addScanIterator(slow); - } - - @Override - public void run() { - count = Iterators.size(scanner.iterator()); - } - - } - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - Map<String,String> siteConfig = cfg.getSiteConfig(); - siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "1"); - cfg.setSiteConfig(siteConfig); - } - - @Override - protected int defaultTimeoutSeconds() { - return 2 * 60; - } - - // @formatter:off - // Below is a diagram of the operations in this test over time. - // - // Scan 0 |------------------------------| - // Scan 1 |----------| - // Minc 1 |-----| - // Scan 2 |----------| - // Scan 3 |---------------| - // Minc 2 |-----| - // Majc 1 |-----| - // @formatter:on - @Test - public void run() throws Exception { - Connector c = getConnector(); - runTest(c, getUniqueNames(1)[0]); - } - - static void runTest(Connector c, String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, - MutationsRejectedException, Exception, InterruptedException { - c.tableOperations().create(tableName); - IteratorSetting is = new IteratorSetting(10, SlowIterator.class); - SlowIterator.setSleepTime(is, 50); - c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc, IteratorScope.majc)); - c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0"); - - BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); - for (int i = 0; i < 50; i++) { - Mutation m = new Mutation(new Text(String.format("%06d", i))); - m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8))); - bw.addMutation(m); - } - bw.flush(); - - ScanTask st0 = new ScanTask(c, tableName, 300); - st0.start(); - - ScanTask st1 = new ScanTask(c, tableName, 100); - st1.start(); - - UtilWaitThread.sleep(50); - c.tableOperations().flush(tableName, null, null, true); - - for (int i = 0; i < 50; i++) { - Mutation m = new Mutation(new Text(String.format("%06d", i))); - m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8))); - bw.addMutation(m); - } - - bw.flush(); - - ScanTask st2 = new ScanTask(c, tableName, 100); - st2.start(); - - st1.join(); - st2.join(); - if (st1.count != 50) - throw new Exception("Thread 1 did not see 50, saw " + st1.count); - - if (st2.count != 50) - throw new Exception("Thread 2 did not see 50, saw " + st2.count); - - ScanTask st3 = new ScanTask(c, tableName, 150); - st3.start(); - - UtilWaitThread.sleep(50); - c.tableOperations().flush(tableName, null, null, false); - - st3.join(); - if (st3.count != 50) - throw new Exception("Thread 3 did not see 50, saw " + st3.count); - - st0.join(); - if (st0.count != 50) - throw new Exception("Thread 0 did not see 50, saw " + st0.count); - - bw.close(); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java deleted file mode 100644 index 66695e0..0000000 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.functional; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.Random; -import java.util.TreeSet; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.fate.util.UtilWaitThread; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.tserver.compaction.CompactionPlan; -import org.apache.accumulo.tserver.compaction.CompactionStrategy; -import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.collect.Iterators; - -public class ConfigurableCompactionIT extends ConfigurableMacBase { - - @Override - public int defaultTimeoutSeconds() { - return 2 * 60; - } - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_MAJC_DELAY.getKey(), "1s")); - } - - public static class SimpleCompactionStrategy extends CompactionStrategy { - - @Override - public void init(Map<String,String> options) { - String countString = options.get("count"); - if (countString != null) - count = Integer.parseInt(countString); - } - - int count = 3; - - @Override - public boolean shouldCompact(MajorCompactionRequest request) throws IOException { - return request.getFiles().size() == count; - - } - - @Override - public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { - CompactionPlan result = new CompactionPlan(); - result.inputFiles.addAll(request.getFiles().keySet()); - return result; - } - - } - - @Test - public void test() throws Exception { - final Connector c = getConnector(); - final String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_COMPACTION_STRATEGY.getKey(), SimpleCompactionStrategy.class.getName()); - runTest(c, tableName, 3); - c.tableOperations().setProperty(tableName, Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey() + "count", "" + 5); - runTest(c, tableName, 5); - } - - @Test - public void testPerTableClasspath() throws Exception { - final Connector c = getConnector(); - final String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - c.instanceOperations().setProperty(Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1", - System.getProperty("user.dir") + "/src/test/resources/TestCompactionStrat.jar"); - c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "10"); - c.tableOperations().setProperty(tableName, Property.TABLE_CLASSPATH.getKey(), "context1"); - // EfgCompactionStrat will only compact a tablet w/ end row of 'efg'. No other tablets are compacted. - c.tableOperations().setProperty(tableName, Property.TABLE_COMPACTION_STRATEGY.getKey(), "org.apache.accumulo.test.EfgCompactionStrat"); - - c.tableOperations().addSplits(tableName, new TreeSet<Text>(Arrays.asList(new Text("efg")))); - - for (char ch = 'a'; ch < 'l'; ch++) - writeFlush(c, tableName, ch + ""); - - while (countFiles(c, tableName) != 7) { - UtilWaitThread.sleep(200); - } - } - - private void writeFlush(Connector conn, String tablename, String row) throws Exception { - BatchWriter bw = conn.createBatchWriter(tablename, new BatchWriterConfig()); - Mutation m = new Mutation(row); - m.put("", "", ""); - bw.addMutation(m); - bw.close(); - conn.tableOperations().flush(tablename, null, null, true); - } - - final static Random r = new Random(); - - private void makeFile(Connector conn, String tablename) throws Exception { - BatchWriter bw = conn.createBatchWriter(tablename, new BatchWriterConfig()); - byte[] empty = {}; - byte[] row = new byte[10]; - r.nextBytes(row); - Mutation m = new Mutation(row, 0, 10); - m.put(empty, empty, empty); - bw.addMutation(m); - bw.flush(); - bw.close(); - conn.tableOperations().flush(tablename, null, null, true); - } - - private void runTest(final Connector c, final String tableName, final int n) throws Exception { - for (int i = countFiles(c, tableName); i < n - 1; i++) - makeFile(c, tableName); - Assert.assertEquals(n - 1, countFiles(c, tableName)); - makeFile(c, tableName); - for (int i = 0; i < 10; i++) { - int count = countFiles(c, tableName); - assertTrue(count == 1 || count == n); - if (count == 1) - break; - UtilWaitThread.sleep(1000); - } - } - - private int countFiles(Connector c, String tableName) throws Exception { - Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); - return Iterators.size(s.iterator()); - } - -}