Merge branch '1.6.1-SNAPSHOT'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/28f97db1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/28f97db1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/28f97db1 Branch: refs/heads/master Commit: 28f97db1f7c62c4c4134da212a7a1a2da2193271 Parents: a4fce6f 934ca6f Author: Josh Elser <els...@apache.org> Authored: Thu Jun 26 16:09:09 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu Jun 26 16:09:09 2014 -0400 ---------------------------------------------------------------------- .../core/client/mapred/RangeInputSplitTest.java | 105 +++++++++++++++++++ .../client/mapred/AccumuloInputFormatTest.java | 55 ++++++++++ .../mapreduce/AccumuloInputFormatTest.java | 2 +- 3 files changed, 161 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/28f97db1/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --cc mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java index 9e6958a,0000000..9991206 mode 100644,000000..100644 --- a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java +++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java @@@ -1,285 -1,0 +1,340 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; ++import java.util.Collection; ++import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; ++import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; ++import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Base64; ++import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; ++import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; ++import org.apache.log4j.Level; ++import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class AccumuloInputFormatTest { + + private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName(); + private static final String INSTANCE_NAME = PREFIX + "_mapred_instance"; + private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1"; + + private JobConf job; + + @BeforeClass + public static void setupClass() { + System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp"); + } + + @Before + public void createJob() { + job = new JobConf(); + } + + /** + * Check that the iterator configuration is getting stored in the Job conf correctly. + */ + @Test + public void testSetIterator() throws IOException { + IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"); + AccumuloInputFormat.addIterator(job, is); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + is.write(new DataOutputStream(baos)); + String iterators = job.get("AccumuloInputFormat.ScanOpts.Iterators"); + assertEquals(Base64.encodeBase64String(baos.toByteArray()), iterators); + } + + @Test + public void testAddIterator() throws IOException { + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); + AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); + IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"); + iter.addOption("v1", "1"); + iter.addOption("junk", "\0omg:!\\xyzzy"); + AccumuloInputFormat.addIterator(job, iter); + + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); + + // Check the list size + assertTrue(list.size() == 3); + + // Walk the list and make sure our settings are correct + IteratorSetting setting = list.get(0); + assertEquals(1, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass()); + assertEquals("WholeRow", setting.getName()); + assertEquals(0, setting.getOptions().size()); + + setting = list.get(1); + assertEquals(2, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); + assertEquals("Versions", setting.getName()); + assertEquals(0, setting.getOptions().size()); + + setting = list.get(2); + assertEquals(3, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); + assertEquals("Count", setting.getName()); + assertEquals(2, setting.getOptions().size()); + assertEquals("1", setting.getOptions().get("v1")); + assertEquals("\0omg:!\\xyzzy", setting.getOptions().get("junk")); + } + + /** + * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There + * should be no exceptions thrown when trying to parse these types of option entries. + * + * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected. + */ + @Test + public void testIteratorOptionEncoding() throws Throwable { + String key = "colon:delimited:key"; + String value = "comma,delimited,value"; + IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class"); + someSetting.addOption(key, value); + AccumuloInputFormat.addIterator(job, someSetting); + + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); + assertEquals(1, list.size()); + assertEquals(1, list.get(0).getOptions().size()); + assertEquals(list.get(0).getOptions().get(key), value); + + someSetting.addOption(key + "2", value); + someSetting.setPriority(2); + someSetting.setName("it2"); + AccumuloInputFormat.addIterator(job, someSetting); + list = AccumuloInputFormat.getIterators(job); + assertEquals(2, list.size()); + assertEquals(1, list.get(0).getOptions().size()); + assertEquals(list.get(0).getOptions().get(key), value); + assertEquals(2, list.get(1).getOptions().size()); + assertEquals(list.get(1).getOptions().get(key), value); + assertEquals(list.get(1).getOptions().get(key + "2"), value); + } + + /** + * Test getting iterator settings for multiple iterators set + */ + @Test + public void testGetIteratorSettings() throws IOException { + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); + AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); + AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator")); + + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); + + // Check the list size + assertTrue(list.size() == 3); + + // Walk the list and make sure our settings are correct + IteratorSetting setting = list.get(0); + assertEquals(1, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); + assertEquals("WholeRow", setting.getName()); + + setting = list.get(1); + assertEquals(2, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); + assertEquals("Versions", setting.getName()); + + setting = list.get(2); + assertEquals(3, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); + assertEquals("Count", setting.getName()); + + } + + @Test + public void testSetRegex() throws IOException { + String regex = ">\"*%<>\'\\"; + + IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class); + RegExFilter.setRegexs(is, regex, null, null, null, false); + AccumuloInputFormat.addIterator(job, is); + + assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName())); + } + + private static AssertionError e1 = null; + private static AssertionError e2 = null; + + private static class MRTester extends Configured implements Tool { + private static class TestMapper implements Mapper<Key,Value,Key,Value> { + Key key = null; + int count = 0; + + @Override + public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException { + try { + if (key != null) + assertEquals(key.getRow().toString(), new String(v.get())); + assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); + assertEquals(new String(v.get()), String.format("%09x", count)); + } catch (AssertionError e) { + e1 = e; + } + key = new Key(k); + count++; + } + + @Override + public void configure(JobConf job) {} + + @Override + public void close() throws IOException { + try { + assertEquals(100, count); + } catch (AssertionError e) { + e2 = e; + } + } + + } + + @Override + public int run(String[] args) throws Exception { + + if (args.length != 3) { + throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>"); + } + + String user = args[0]; + String pass = args[1]; + String table = args[2]; + + JobConf job = new JobConf(getConf()); + job.setJarByClass(this.getClass()); + + job.setInputFormat(AccumuloInputFormat.class); + + AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); + AccumuloInputFormat.setInputTableName(job, table); + AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME); + + job.setMapperClass(TestMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputFormat(NullOutputFormat.class); + + job.setNumReduceTasks(0); + + return JobClient.runJob(job).isSuccessful() ? 0 : 1; + } + + public static void main(String... args) throws Exception { + assertEquals(0, ToolRunner.run(new Configuration(), new MRTester(), args)); + } + } + + @Test + public void testMap() throws Exception { + MockInstance mockInstance = new MockInstance(INSTANCE_NAME); + Connector c = mockInstance.getConnector("root", new PasswordToken("")); + c.tableOperations().create(TEST_TABLE_1); + BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + MRTester.main("root", "", TEST_TABLE_1); + assertNull(e1); + assertNull(e2); + } ++ ++ @Test ++ public void testCorrectRangeInputSplits() throws Exception { ++ JobConf job = new JobConf(); ++ ++ String username = "user", table = "table", instance = "mapred_testCorrectRangeInputSplits"; ++ PasswordToken password = new PasswordToken("password"); ++ Authorizations auths = new Authorizations("foo"); ++ Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar"))); ++ boolean isolated = true, localIters = true; ++ Level level = Level.WARN; ++ ++ Instance inst = new MockInstance(instance); ++ Connector connector = inst.getConnector(username, password); ++ connector.tableOperations().create(table); ++ ++ AccumuloInputFormat.setConnectorInfo(job, username, password); ++ AccumuloInputFormat.setInputTableName(job, table); ++ AccumuloInputFormat.setScanAuthorizations(job, auths); ++ AccumuloInputFormat.setMockInstance(job, instance); ++ AccumuloInputFormat.setScanIsolation(job, isolated); ++ AccumuloInputFormat.setLocalIterators(job, localIters); ++ AccumuloInputFormat.fetchColumns(job, fetchColumns); ++ AccumuloInputFormat.setLogLevel(job, level); ++ ++ AccumuloInputFormat aif = new AccumuloInputFormat(); ++ ++ InputSplit[] splits = aif.getSplits(job, 1); ++ ++ Assert.assertEquals(1, splits.length); ++ ++ InputSplit split = splits[0]; ++ ++ Assert.assertEquals(RangeInputSplit.class, split.getClass()); ++ ++ RangeInputSplit risplit = (RangeInputSplit) split; ++ ++ Assert.assertEquals(username, risplit.getPrincipal()); ++ Assert.assertEquals(table, risplit.getTableName()); ++ Assert.assertEquals(password, risplit.getToken()); ++ Assert.assertEquals(auths, risplit.getAuths()); ++ Assert.assertEquals(instance, risplit.getInstanceName()); ++ Assert.assertEquals(isolated, risplit.isIsolatedScan()); ++ Assert.assertEquals(localIters, risplit.usesLocalIterators()); ++ Assert.assertEquals(fetchColumns, risplit.getFetchedColumns()); ++ Assert.assertEquals(level, risplit.getLogLevel()); ++ } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/28f97db1/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --cc mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java index 3844cd9,0000000..869ae9d mode 100644,000000..100644 --- a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java +++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java @@@ -1,412 -1,0 +1,412 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Base64; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + +public class AccumuloInputFormatTest { + + private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName(); + + /** + * Check that the iterator configuration is getting stored in the Job conf correctly. + */ + @Test + public void testSetIterator() throws IOException { + @SuppressWarnings("deprecation") + Job job = new Job(); + + IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"); + AccumuloInputFormat.addIterator(job, is); + Configuration conf = job.getConfiguration(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + is.write(new DataOutputStream(baos)); + String iterators = conf.get("AccumuloInputFormat.ScanOpts.Iterators"); + assertEquals(Base64.encodeBase64String(baos.toByteArray()), iterators); + } + + @Test + public void testAddIterator() throws IOException { + @SuppressWarnings("deprecation") + Job job = new Job(); + + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); + AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); + IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"); + iter.addOption("v1", "1"); + iter.addOption("junk", "\0omg:!\\xyzzy"); + AccumuloInputFormat.addIterator(job, iter); + + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); + + // Check the list size + assertTrue(list.size() == 3); + + // Walk the list and make sure our settings are correct + IteratorSetting setting = list.get(0); + assertEquals(1, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass()); + assertEquals("WholeRow", setting.getName()); + assertEquals(0, setting.getOptions().size()); + + setting = list.get(1); + assertEquals(2, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); + assertEquals("Versions", setting.getName()); + assertEquals(0, setting.getOptions().size()); + + setting = list.get(2); + assertEquals(3, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); + assertEquals("Count", setting.getName()); + assertEquals(2, setting.getOptions().size()); + assertEquals("1", setting.getOptions().get("v1")); + assertEquals("\0omg:!\\xyzzy", setting.getOptions().get("junk")); + } + + /** + * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There + * should be no exceptions thrown when trying to parse these types of option entries. + * + * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected. + */ + @Test + public void testIteratorOptionEncoding() throws Throwable { + String key = "colon:delimited:key"; + String value = "comma,delimited,value"; + IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class"); + someSetting.addOption(key, value); + @SuppressWarnings("deprecation") + Job job = new Job(); + AccumuloInputFormat.addIterator(job, someSetting); + + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); + assertEquals(1, list.size()); + assertEquals(1, list.get(0).getOptions().size()); + assertEquals(list.get(0).getOptions().get(key), value); + + someSetting.addOption(key + "2", value); + someSetting.setPriority(2); + someSetting.setName("it2"); + AccumuloInputFormat.addIterator(job, someSetting); + list = AccumuloInputFormat.getIterators(job); + assertEquals(2, list.size()); + assertEquals(1, list.get(0).getOptions().size()); + assertEquals(list.get(0).getOptions().get(key), value); + assertEquals(2, list.get(1).getOptions().size()); + assertEquals(list.get(1).getOptions().get(key), value); + assertEquals(list.get(1).getOptions().get(key + "2"), value); + } + + /** + * Test getting iterator settings for multiple iterators set + */ + @Test + public void testGetIteratorSettings() throws IOException { + @SuppressWarnings("deprecation") + Job job = new Job(); + + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); + AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); + AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator")); + + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); + + // Check the list size + assertTrue(list.size() == 3); + + // Walk the list and make sure our settings are correct + IteratorSetting setting = list.get(0); + assertEquals(1, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); + assertEquals("WholeRow", setting.getName()); + + setting = list.get(1); + assertEquals(2, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); + assertEquals("Versions", setting.getName()); + + setting = list.get(2); + assertEquals(3, setting.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); + assertEquals("Count", setting.getName()); + + } + + @Test + public void testSetRegex() throws IOException { + @SuppressWarnings("deprecation") + Job job = new Job(); + + String regex = ">\"*%<>\'\\"; + + IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class); + RegExFilter.setRegexs(is, regex, null, null, null, false); + AccumuloInputFormat.addIterator(job, is); + + assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName())); + } + + private static AssertionError e1 = null; + private static AssertionError e2 = null; + + private static class MRTester extends Configured implements Tool { + private static class TestMapper extends Mapper<Key,Value,Key,Value> { + Key key = null; + int count = 0; + + @Override + protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { + try { + if (key != null) + assertEquals(key.getRow().toString(), new String(v.get())); + assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); + assertEquals(new String(v.get()), String.format("%09x", count)); + } catch (AssertionError e) { + e1 = e; + } + key = new Key(k); + count++; + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + assertEquals(100, count); + } catch (AssertionError e) { + e2 = e; + } + } + } + + @Override + public int run(String[] args) throws Exception { + + if (args.length != 5) { + throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table> <instanceName> <inputFormatClass>"); + } + + String user = args[0]; + String pass = args[1]; + String table = args[2]; + + String instanceName = args[3]; + String inputFormatClassName = args[4]; + @SuppressWarnings("unchecked") + Class<? extends InputFormat<?,?>> inputFormatClass = (Class<? extends InputFormat<?,?>>) Class.forName(inputFormatClassName); + + @SuppressWarnings("deprecation") + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(inputFormatClass); + + AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); + AccumuloInputFormat.setInputTableName(job, table); + AccumuloInputFormat.setMockInstance(job, instanceName); + + job.setMapperClass(TestMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputFormatClass(NullOutputFormat.class); + + job.setNumReduceTasks(0); + + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + } + + public static int main(String[] args) throws Exception { + return ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args); + } + } + + @Test + public void testMap() throws Exception { + final String INSTANCE_NAME = PREFIX + "_mapreduce_instance"; + final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1"; + + MockInstance mockInstance = new MockInstance(INSTANCE_NAME); + Connector c = mockInstance.getConnector("root", new PasswordToken("")); + c.tableOperations().create(TEST_TABLE_1); + BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + Assert.assertEquals(0, MRTester.main(new String[] {"root", "", TEST_TABLE_1, INSTANCE_NAME, AccumuloInputFormat.class.getCanonicalName()})); + assertNull(e1); + assertNull(e2); + } + + @Test + public void testCorrectRangeInputSplits() throws Exception { + @SuppressWarnings("deprecation") + Job job = new Job(new Configuration(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + - String username = "user", table = "table", instance = "instance"; ++ String username = "user", table = "table", instance = "mapreduce_testCorrectRangeInputSplits"; + PasswordToken password = new PasswordToken("password"); + Authorizations auths = new Authorizations("foo"); + Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar"))); + boolean isolated = true, localIters = true; + Level level = Level.WARN; + + Instance inst = new MockInstance(instance); + Connector connector = inst.getConnector(username, password); + connector.tableOperations().create(table); + + AccumuloInputFormat.setConnectorInfo(job, username, password); + AccumuloInputFormat.setInputTableName(job, table); + AccumuloInputFormat.setScanAuthorizations(job, auths); + AccumuloInputFormat.setMockInstance(job, instance); + AccumuloInputFormat.setScanIsolation(job, isolated); + AccumuloInputFormat.setLocalIterators(job, localIters); + AccumuloInputFormat.fetchColumns(job, fetchColumns); + AccumuloInputFormat.setLogLevel(job, level); + + AccumuloInputFormat aif = new AccumuloInputFormat(); + + List<InputSplit> splits = aif.getSplits(job); + + Assert.assertEquals(1, splits.size()); + + InputSplit split = splits.get(0); + + Assert.assertEquals(RangeInputSplit.class, split.getClass()); + + RangeInputSplit risplit = (RangeInputSplit) split; + + Assert.assertEquals(username, risplit.getPrincipal()); + Assert.assertEquals(table, risplit.getTableName()); + Assert.assertEquals(password, risplit.getToken()); + Assert.assertEquals(auths, risplit.getAuths()); + Assert.assertEquals(instance, risplit.getInstanceName()); + Assert.assertEquals(isolated, risplit.isIsolatedScan()); + Assert.assertEquals(localIters, risplit.usesLocalIterators()); + Assert.assertEquals(fetchColumns, risplit.getFetchedColumns()); + Assert.assertEquals(level, risplit.getLogLevel()); + } + + @Test + public void testPartialInputSplitDelegationToConfiguration() throws Exception { + String user = "testPartialInputSplitUser"; + PasswordToken password = new PasswordToken(""); + + MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration"); + Connector c = mockInstance.getConnector(user, password); + c.tableOperations().create("testtable"); + BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + Assert.assertEquals( + 0, + MRTester.main(new String[] {user, "", "testtable", "testPartialInputSplitDelegationToConfiguration", + EmptySplitsAccumuloInputFormat.class.getCanonicalName()})); + assertNull(e1); + assertNull(e2); + } + + @Test + public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception { + String user = "testPartialFailedInputSplit"; + PasswordToken password = new PasswordToken(""); + + MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration"); + Connector c = mockInstance.getConnector(user, password); + c.tableOperations().create("testtable"); + BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + // We should fail before we even get into the Mapper because we can't make the RecordReader + Assert.assertEquals( + 1, + MRTester.main(new String[] {user, "", "testtable", "testPartialFailedInputSplitDelegationToConfiguration", + BadPasswordSplitsAccumuloInputFormat.class.getCanonicalName()})); + assertNull(e1); + assertNull(e2); + } + + @Test + public void testEmptyColumnFamily() throws IOException { + @SuppressWarnings("deprecation") + Job job = new Job(); + Set<Pair<Text,Text>> cols = new HashSet<Pair<Text,Text>>(); + cols.add(new Pair<Text,Text>(new Text(""), null)); + cols.add(new Pair<Text,Text>(new Text("foo"), new Text("bar"))); + cols.add(new Pair<Text,Text>(new Text(""), new Text("bar"))); + cols.add(new Pair<Text,Text>(new Text(""), new Text(""))); + cols.add(new Pair<Text,Text>(new Text("foo"), new Text(""))); + AccumuloInputFormat.fetchColumns(job, cols); + Set<Pair<Text,Text>> setCols = AccumuloInputFormat.getFetchedColumns(job); + assertEquals(cols, setCols); + } +}