ACCUMULO-1854 Get the mapreduce unit tests working again
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e08736d7 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e08736d7 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e08736d7 Branch: refs/heads/1.5.1-SNAPSHOT Commit: e08736d7900b22d83a0e28c93696f6ef20086942 Parents: f4e4c39 Author: Josh Elser <els...@apache.org> Authored: Thu Nov 21 21:26:09 2013 -0500 Committer: Josh Elser <els...@apache.org> Committed: Thu Nov 21 21:26:09 2013 -0500 ---------------------------------------------------------------------- .../core/client/mapreduce/InputFormatBase.java | 38 +- .../core/client/mapreduce/RangeInputSplit.java | 69 --- .../mapreduce/AccumuloInputFormatTest.java | 159 ++++++ .../mapreduce/AccumuloInputFormatTest1.java | 534 ------------------- .../client/mapreduce/RangeInputSplitTest.java | 19 +- 5 files changed, 201 insertions(+), 618 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index e17b46d..4e5b5a8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -814,6 +814,24 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { String tableName = getInputTableName(context); boolean autoAdjust = getAutoAdjustRanges(context); List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context); + Instance instance = getInstance(context); + boolean offline = isOfflineScan(context); + boolean isolated = isIsolated(context); + boolean localIterators = usesLocalIterators(context); + boolean mockInstance = (null != instance && MockInstance.class.equals(instance.getClass())); + Set<Pair<Text,Text>> fetchedColumns = getFetchedColumns(context); + Authorizations auths = getScanAuthorizations(context); + String principal = getPrincipal(context); + + AuthenticationToken token; + try { + token = CredentialHelper.extractToken(getTokenClass(context), getToken(context)); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + + List<IteratorSetting> iterators = getIterators(context); + Level logLevel = getLogLevel(context); if (ranges.isEmpty()) { ranges = new ArrayList<Range>(1); @@ -832,7 +850,6 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { binnedRanges = binOfflineTable(context, tableName, ranges); } } else { - Instance instance = getInstance(context); String tableId = null; tl = getTabletLocator(context); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it @@ -896,6 +913,25 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { if (!autoAdjust) for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) splits.add(new RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0]))); + + for (InputSplit inputSplit : splits) { + RangeInputSplit split = (RangeInputSplit) inputSplit; + + split.setTable(tableName); + split.setOffline(offline); + split.setIsolatedScan(isolated); + split.setUsesLocalIterators(localIterators); + split.setMockInstance(mockInstance); + split.setFetchedColumns(fetchedColumns); + split.setPrincipal(principal); + split.setToken(token); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(iterators); + split.setLogLevel(logLevel); + } + return splits; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java index 67b839b..6decdc6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java @@ -53,7 +53,6 @@ public class RangeInputSplit extends InputSplit implements Writable { private Range range; private String[] locations; private String table, instanceName, zooKeepers, principal; - private String rowRegex, colfamRegex, colqualRegex, valueRegex; private AuthenticationToken token; private Boolean offline, mockInstance, isolatedScan, localIterators; private Authorizations auths; @@ -164,22 +163,6 @@ public class RangeInputSplit extends InputSplit implements Writable { } if (in.readBoolean()) { - rowRegex = in.readUTF(); - } - - if (in.readBoolean()) { - colfamRegex = in.readUTF(); - } - - if (in.readBoolean()) { - colqualRegex = in.readUTF(); - } - - if (in.readBoolean()) { - valueRegex = in.readUTF(); - } - - if (in.readBoolean()) { int numColumns = in.readInt(); List<String> columns = new ArrayList<String>(numColumns); for (int i = 0; i < numColumns; i++) { @@ -248,26 +231,6 @@ public class RangeInputSplit extends InputSplit implements Writable { out.writeBoolean(mockInstance); } - out.writeBoolean(null != rowRegex); - if (null != rowRegex) { - out.writeUTF(rowRegex); - } - - out.writeBoolean(null != colfamRegex); - if (null != colfamRegex) { - out.writeUTF(colfamRegex); - } - - out.writeBoolean(null != colqualRegex); - if (null != colqualRegex) { - out.writeUTF(colqualRegex); - } - - out.writeBoolean(null != valueRegex); - if (null != valueRegex) { - out.writeUTF(valueRegex); - } - out.writeBoolean(null != fetchedColumns); if (null != fetchedColumns) { String[] cols = InputConfigurator.serializeColumns(fetchedColumns); @@ -391,38 +354,6 @@ public class RangeInputSplit extends InputSplit implements Writable { this.locations = locations; } - public String getRowRegex() { - return rowRegex; - } - - public void setRowRegex(String rowRegex) { - this.rowRegex = rowRegex; - } - - public String getColfamRegex() { - return colfamRegex; - } - - public void setColfamRegex(String colfamRegex) { - this.colfamRegex = colfamRegex; - } - - public String getColqualRegex() { - return colqualRegex; - } - - public void setColqualRegex(String colqualRegex) { - this.colqualRegex = colqualRegex; - } - - public String getValueRegex() { - return valueRegex; - } - - public void setValueRegex(String valueRegex) { - this.valueRegex = valueRegex; - } - public Boolean isMockInstance() { return mockInstance; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java index 0ee03a2..f9ccdf1 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java @@ -23,11 +23,14 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; @@ -36,16 +39,24 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.RegExFilter; import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.Pair; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Level; +import org.junit.Assert; import org.junit.Test; public class AccumuloInputFormatTest { @@ -318,4 +329,152 @@ public class AccumuloInputFormatTest { assertNull(e1); assertNull(e2); } + + @SuppressWarnings("deprecation") + @Test + public void testCorrectRangeInputSplits() throws Exception { + Job job = new Job(new Configuration(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + + String username = "user", table = "table", instance = "instance"; + PasswordToken password = new PasswordToken("password"); + Authorizations auths = new Authorizations("foo"); + Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar"))); + boolean isolated = true, localIters = true; + Level level = Level.WARN; + + Instance inst = new MockInstance(instance); + Connector connector = inst.getConnector(username, password); + connector.tableOperations().create(table); + + AccumuloInputFormat.setConnectorInfo(job, username, password); + AccumuloInputFormat.setInputTableName(job, table); + AccumuloInputFormat.setScanAuthorizations(job, auths); + AccumuloInputFormat.setMockInstance(job, instance); + AccumuloInputFormat.setScanIsolation(job, isolated); + AccumuloInputFormat.setLocalIterators(job, localIters); + AccumuloInputFormat.fetchColumns(job, fetchColumns); + AccumuloInputFormat.setLogLevel(job, level); + + AccumuloInputFormat aif = new AccumuloInputFormat(); + + List<InputSplit> splits = aif.getSplits(job); + + Assert.assertEquals(1, splits.size()); + + InputSplit split = splits.get(0); + + Assert.assertEquals(RangeInputSplit.class, split.getClass()); + + RangeInputSplit risplit = (RangeInputSplit) split; + + Assert.assertEquals(username, risplit.getPrincipal()); + Assert.assertEquals(table, risplit.getTable()); + Assert.assertEquals(password, risplit.getToken()); + Assert.assertEquals(auths, risplit.getAuths()); + Assert.assertEquals(instance, risplit.getInstanceName()); + Assert.assertEquals(isolated, risplit.isIsolatedScan()); + Assert.assertEquals(localIters, risplit.usesLocalIterators()); + Assert.assertEquals(fetchColumns, risplit.getFetchedColumns()); + Assert.assertEquals(level, risplit.getLogLevel()); + } + + static class TestMapper extends Mapper<Key,Value,Key,Value> { + Key key = null; + int count = 0; + + @Override + protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { + if (key != null) + assertEquals(key.getRow().toString(), new String(v.get())); + assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); + assertEquals(new String(v.get()), String.format("%09x", count)); + key = new Key(k); + count++; + } + } + + @Test + public void testPartialInputSplitDelegationToConfiguration() throws Exception { + String user = "testPartialInputSplitUser"; + PasswordToken password = new PasswordToken(""); + + MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration"); + Connector c = mockInstance.getConnector(user, password); + c.tableOperations().create("testtable"); + BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + Job job = new Job(new Configuration()); + job.setInputFormatClass(AccumuloInputFormat.class); + job.setMapperClass(TestMapper.class); + job.setNumReduceTasks(0); + AccumuloInputFormat.setConnectorInfo(job, user, password); + AccumuloInputFormat.setInputTableName(job, "testtable"); + AccumuloInputFormat.setScanAuthorizations(job, new Authorizations()); + AccumuloInputFormat.setMockInstance(job, "testPartialInputSplitDelegationToConfiguration"); + + AccumuloInputFormat input = new AccumuloInputFormat(); + List<InputSplit> splits = input.getSplits(job); + assertEquals(splits.size(), 1); + + TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); + + RangeInputSplit emptySplit = new RangeInputSplit(); + + // Using an empty split should fall back to the information in the Job's Configuration + TaskAttemptID id = new TaskAttemptID(); + TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); + RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt); + Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit); + reader.initialize(emptySplit, context); + mapper.run(context); + } + + @Test(expected = IOException.class) + public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception { + String user = "testPartialFailedInputSplit"; + PasswordToken password = new PasswordToken(""); + + MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration"); + Connector c = mockInstance.getConnector(user, password); + c.tableOperations().create("testtable"); + BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + Job job = new Job(new Configuration()); + job.setInputFormatClass(AccumuloInputFormat.class); + job.setMapperClass(TestMapper.class); + job.setNumReduceTasks(0); + AccumuloInputFormat.setConnectorInfo(job, user, password); + AccumuloInputFormat.setInputTableName(job, "testtable"); + AccumuloInputFormat.setMockInstance(job, "testPartialFailedInputSplitDelegationToConfiguration"); + + AccumuloInputFormat input = new AccumuloInputFormat(); + List<InputSplit> splits = input.getSplits(job); + assertEquals(splits.size(), 1); + + TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); + + RangeInputSplit emptySplit = new RangeInputSplit(); + emptySplit.setPrincipal("root"); + emptySplit.setToken(new PasswordToken("anythingelse")); + + // Using an empty split should fall back to the information in the Job's Configuration + TaskAttemptID id = new TaskAttemptID(); + TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); + RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt); + Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit); + reader.initialize(emptySplit, context); + mapper.run(context); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java deleted file mode 100644 index 7239b01..0000000 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java +++ /dev/null @@ -1,534 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.regex.Pattern; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase.RegexType; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.log4j.Level; -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; - -public class AccumuloInputFormatTest { - - @After - public void tearDown() throws Exception {} - - /** - * Test basic setting & getting of max versions. - * - * @throws IOException - * Signals that an I/O exception has occurred. - */ - @Test - public void testMaxVersions() throws IOException { - JobContext job = new JobContext(new Configuration(), new JobID()); - AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1); - int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration()); - assertEquals(1, version); - } - - /** - * Test max versions with an invalid value. - * - * @throws IOException - * Signals that an I/O exception has occurred. - */ - @Test(expected = IOException.class) - public void testMaxVersionsLessThan1() throws IOException { - JobContext job = new JobContext(new Configuration(), new JobID()); - AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0); - } - - /** - * Test no max version configured. - */ - @Test - public void testNoMaxVersion() { - JobContext job = new JobContext(new Configuration(), new JobID()); - assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConfiguration())); - } - - /** - * Check that the iterator configuration is getting stored in the Job conf correctly. - */ - @SuppressWarnings("deprecation") - @Test - public void testSetIterator() { - JobContext job = new JobContext(new Configuration(), new JobID()); - - AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow"); - Configuration conf = job.getConfiguration(); - String iterators = conf.get("AccumuloInputFormat.iterators"); - assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators); - } - - @Test - public void testAddIterator() { - JobContext job = new JobContext(new Configuration(), new JobID()); - - AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); - AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); - IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"); - iter.addOption("v1", "1"); - iter.addOption("junk", "\0omg:!\\xyzzy"); - AccumuloInputFormat.addIterator(job.getConfiguration(), iter); - - List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job.getConfiguration()); - - // Check the list size - assertTrue(list.size() == 3); - - // Walk the list and make sure our settings are correct - AccumuloIterator setting = list.get(0); - assertEquals(1, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass()); - assertEquals("WholeRow", setting.getIteratorName()); - - setting = list.get(1); - assertEquals(2, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); - assertEquals("Versions", setting.getIteratorName()); - - setting = list.get(2); - assertEquals(3, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); - assertEquals("Count", setting.getIteratorName()); - - List<AccumuloIteratorOption> iteratorOptions = AccumuloInputFormat.getIteratorOptions(job.getConfiguration()); - assertEquals(2, iteratorOptions.size()); - assertEquals("Count", iteratorOptions.get(0).getIteratorName()); - assertEquals("Count", iteratorOptions.get(1).getIteratorName()); - assertEquals("v1", iteratorOptions.get(0).getKey()); - assertEquals("1", iteratorOptions.get(0).getValue()); - assertEquals("junk", iteratorOptions.get(1).getKey()); - assertEquals("\0omg:!\\xyzzy", iteratorOptions.get(1).getValue()); - } - - /** - * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There - * should be no exceptions thrown when trying to parse these types of option entries. - * - * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected. - */ - @Test - public void testIteratorOptionEncoding() throws Throwable { - String key = "colon:delimited:key"; - String value = "comma,delimited,value"; - IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class"); - someSetting.addOption(key, value); - Job job = new Job(); - AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting); - - final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString(); - - assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options")); - - List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration()); - assertEquals(1, opts.size()); - assertEquals(opts.get(0).getKey(), key); - assertEquals(opts.get(0).getValue(), value); - - someSetting.addOption(key + "2", value); - someSetting.setPriority(2); - someSetting.setName("it2"); - AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting); - opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration()); - assertEquals(3, opts.size()); - for (AccumuloIteratorOption opt : opts) { - assertEquals(opt.getKey().substring(0, key.length()), key); - assertEquals(opt.getValue(), value); - } - } - - /** - * Test getting iterator settings for multiple iterators set - */ - @SuppressWarnings("deprecation") - @Test - public void testGetIteratorSettings() { - JobContext job = new JobContext(new Configuration(), new JobID()); - - AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow"); - AccumuloInputFormat.setIterator(job, 2, "org.apache.accumulo.core.iterators.VersioningIterator", "Versions"); - AccumuloInputFormat.setIterator(job, 3, "org.apache.accumulo.core.iterators.CountingIterator", "Count"); - - List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job); - - // Check the list size - assertTrue(list.size() == 3); - - // Walk the list and make sure our settings are correct - AccumuloIterator setting = list.get(0); - assertEquals(1, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); - assertEquals("WholeRow", setting.getIteratorName()); - - setting = list.get(1); - assertEquals(2, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); - assertEquals("Versions", setting.getIteratorName()); - - setting = list.get(2); - assertEquals(3, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); - assertEquals("Count", setting.getIteratorName()); - - } - - /** - * Check that the iterator options are getting stored in the Job conf correctly. - */ - @SuppressWarnings("deprecation") - @Test - public void testSetIteratorOption() { - JobContext job = new JobContext(new Configuration(), new JobID()); - AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue"); - - Configuration conf = job.getConfiguration(); - String options = conf.get("AccumuloInputFormat.iterators.options"); - assertEquals(new String("someIterator:aKey:aValue"), options); - } - - /** - * Test getting iterator options for multiple options set - */ - @SuppressWarnings("deprecation") - @Test - public void testGetIteratorOption() { - JobContext job = new JobContext(new Configuration(), new JobID()); - - AccumuloInputFormat.setIteratorOption(job, "iterator1", "key1", "value1"); - AccumuloInputFormat.setIteratorOption(job, "iterator2", "key2", "value2"); - AccumuloInputFormat.setIteratorOption(job, "iterator3", "key3", "value3"); - - List<AccumuloIteratorOption> list = AccumuloInputFormat.getIteratorOptions(job); - - // Check the list size - assertEquals(3, list.size()); - - // Walk the list and make sure all the options are correct - AccumuloIteratorOption option = list.get(0); - assertEquals("iterator1", option.getIteratorName()); - assertEquals("key1", option.getKey()); - assertEquals("value1", option.getValue()); - - option = list.get(1); - assertEquals("iterator2", option.getIteratorName()); - assertEquals("key2", option.getKey()); - assertEquals("value2", option.getValue()); - - option = list.get(2); - assertEquals("iterator3", option.getIteratorName()); - assertEquals("key3", option.getKey()); - assertEquals("value3", option.getValue()); - } - - @SuppressWarnings("deprecation") - @Test - public void testSetRegex() { - JobContext job = new JobContext(new Configuration(), new JobID()); - - String regex = ">\"*%<>\'\\"; - - AccumuloInputFormat.setRegex(job, RegexType.ROW, regex); - - assertTrue(regex.equals(AccumuloInputFormat.getRegex(job, RegexType.ROW))); - } - - static class TestMapper extends Mapper<Key,Value,Key,Value> { - Key key = null; - int count = 0; - - @Override - protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { - if (key != null) - assertEquals(key.getRow().toString(), new String(v.get())); - assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); - assertEquals(new String(v.get()), String.format("%09x", count)); - key = new Key(k); - count++; - } - } - - @Test - public void testMap() throws Exception { - MockInstance mockInstance = new MockInstance("testmapinstance"); - Connector c = mockInstance.getConnector("root", new byte[] {}); - c.tableOperations().create("testtable"); - BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4); - for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); - } - bw.close(); - - Job job = new Job(new Configuration()); - job.setInputFormatClass(AccumuloInputFormat.class); - job.setMapperClass(TestMapper.class); - job.setNumReduceTasks(0); - AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations()); - AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); - - AccumuloInputFormat input = new AccumuloInputFormat(); - List<InputSplit> splits = input.getSplits(job); - assertEquals(splits.size(), 1); - - TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); - for (InputSplit split : splits) { - RangeInputSplit risplit = (RangeInputSplit) split; - Assert.assertEquals("root", risplit.getUsername()); - Assert.assertArrayEquals(new byte[0], risplit.getPassword()); - Assert.assertEquals("testtable", risplit.getTable()); - Assert.assertEquals(new Authorizations(), risplit.getAuths()); - Assert.assertEquals("testmapinstance", risplit.getInstanceName()); - - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); - RecordReader<Key,Value> reader = input.createRecordReader(split, attempt); - Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split); - reader.initialize(split, context); - mapper.run(context); - } - } - - @Test - public void testSimple() throws Exception { - MockInstance mockInstance = new MockInstance("testmapinstance"); - Connector c = mockInstance.getConnector("root", new byte[] {}); - c.tableOperations().create("testtable2"); - BatchWriter bw = c.createBatchWriter("testtable2", 10000L, 1000L, 4); - for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); - } - bw.close(); - - JobContext job = new JobContext(new Configuration(), new JobID()); - AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations()); - AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); - AccumuloInputFormat input = new AccumuloInputFormat(); - RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); - RecordReader<Key,Value> rr = input.createRecordReader(ris, tac); - rr.initialize(ris, tac); - - TestMapper mapper = new TestMapper(); - Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), tac.getTaskAttemptID(), rr, null, null, null, ris); - while (rr.nextKeyValue()) { - mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context); - } - } - - @SuppressWarnings("deprecation") - @Test - public void testRegex() throws Exception { - MockInstance mockInstance = new MockInstance("testmapinstance"); - Connector c = mockInstance.getConnector("root", new byte[] {}); - c.tableOperations().create("testtable3"); - BatchWriter bw = c.createBatchWriter("testtable3", 10000L, 1000L, 4); - for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); - } - bw.close(); - - JobContext job = new JobContext(new Configuration(), new JobID()); - AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable3", new Authorizations()); - AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); - final String regex = ".*1.*"; - AccumuloInputFormat.setRegex(job, RegexType.ROW, regex); - AccumuloInputFormat input = new AccumuloInputFormat(); - RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); - RecordReader<Key,Value> rr = input.createRecordReader(ris, tac); - rr.initialize(ris, tac); - - Pattern p = Pattern.compile(regex); - while (rr.nextKeyValue()) { - Assert.assertTrue(p.matcher(rr.getCurrentKey().getRow().toString()).matches()); - } - } - - @SuppressWarnings("deprecation") - @Test - public void testCorrectRangeInputSplits() throws Exception { - JobContext job = new JobContext(new Configuration(), new JobID()); - - String username = "user", table = "table", rowRegex = "row.*", colfRegex = "colf.*", colqRegex = "colq.*"; - String valRegex = "val.*", instance = "instance"; - byte[] password = "password".getBytes(); - Authorizations auths = new Authorizations("foo"); - Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar"))); - boolean isolated = true, localIters = true; - int maxVersions = 5; - Level level = Level.WARN; - - Instance inst = new MockInstance(instance); - Connector connector = inst.getConnector(username, password); - connector.tableOperations().create(table); - - AccumuloInputFormat.setInputInfo(job, username, password, table, auths); - AccumuloInputFormat.setMockInstance(job, instance); - AccumuloInputFormat.setRegex(job, RegexType.ROW, rowRegex); - AccumuloInputFormat.setRegex(job, RegexType.COLUMN_FAMILY, colfRegex); - AccumuloInputFormat.setRegex(job, RegexType.COLUMN_QUALIFIER, colqRegex); - AccumuloInputFormat.setRegex(job, RegexType.VALUE, valRegex); - AccumuloInputFormat.setIsolated(job, isolated); - AccumuloInputFormat.setLocalIterators(job, localIters); - AccumuloInputFormat.setMaxVersions(job, maxVersions); - AccumuloInputFormat.fetchColumns(job, fetchColumns); - AccumuloInputFormat.setLogLevel(job, level); - - AccumuloInputFormat aif = new AccumuloInputFormat(); - - List<InputSplit> splits = aif.getSplits(job); - - Assert.assertEquals(1, splits.size()); - - InputSplit split = splits.get(0); - - Assert.assertEquals(RangeInputSplit.class, split.getClass()); - - RangeInputSplit risplit = (RangeInputSplit) split; - - Assert.assertEquals(username, risplit.getUsername()); - Assert.assertEquals(table, risplit.getTable()); - Assert.assertArrayEquals(password, risplit.getPassword()); - Assert.assertEquals(auths, risplit.getAuths()); - Assert.assertEquals(instance, risplit.getInstanceName()); - Assert.assertEquals(rowRegex, risplit.getRowRegex()); - Assert.assertEquals(colfRegex, risplit.getColfamRegex()); - Assert.assertEquals(colqRegex, risplit.getColqualRegex()); - Assert.assertEquals(valRegex, risplit.getValueRegex()); - Assert.assertEquals(isolated, risplit.isIsolatedScan()); - Assert.assertEquals(localIters, risplit.usesLocalIterators()); - Assert.assertEquals(maxVersions, risplit.getMaxVersions().intValue()); - Assert.assertEquals(fetchColumns, risplit.getFetchedColumns()); - Assert.assertEquals(level, risplit.getLogLevel()); - } - - @Test - public void testPartialInputSplitDelegationToConfiguration() throws Exception { - MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration"); - Connector c = mockInstance.getConnector("root", new byte[] {}); - c.tableOperations().create("testtable"); - BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4); - for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); - } - bw.close(); - - Job job = new Job(new Configuration()); - job.setInputFormatClass(AccumuloInputFormat.class); - job.setMapperClass(TestMapper.class); - job.setNumReduceTasks(0); - AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations()); - AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testPartialInputSplitDelegationToConfiguration"); - - AccumuloInputFormat input = new AccumuloInputFormat(); - List<InputSplit> splits = input.getSplits(job); - assertEquals(splits.size(), 1); - - TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); - - RangeInputSplit emptySplit = new RangeInputSplit(); - - // Using an empty split should fall back to the information in the Job's Configuration - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); - RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt); - Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit); - reader.initialize(emptySplit, context); - mapper.run(context); - } - - @Test(expected = IOException.class) - public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception { - MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration"); - Connector c = mockInstance.getConnector("root", new byte[] {}); - c.tableOperations().create("testtable"); - BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4); - for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); - } - bw.close(); - - Job job = new Job(new Configuration()); - job.setInputFormatClass(AccumuloInputFormat.class); - job.setMapperClass(TestMapper.class); - job.setNumReduceTasks(0); - AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations()); - AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testPartialFailedInputSplitDelegationToConfiguration"); - - AccumuloInputFormat input = new AccumuloInputFormat(); - List<InputSplit> splits = input.getSplits(job); - assertEquals(splits.size(), 1); - - TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); - - RangeInputSplit emptySplit = new RangeInputSplit(); - emptySplit.setUsername("root"); - emptySplit.setPassword("anythingelse".getBytes()); - - // Using an empty split should fall back to the information in the Job's Configuration - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); - RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt); - Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit); - reader.initialize(emptySplit, context); - mapper.run(context); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java index 22fb6e1..f6c604f 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java @@ -9,6 +9,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; @@ -53,14 +54,9 @@ public class RangeInputSplitTest { split.setOffline(true); split.setIsolatedScan(true); split.setUsesLocalIterators(true); - split.setMaxVersions(5); - split.setRowRegex("row"); - split.setColfamRegex("colf"); - split.setColqualRegex("colq"); - split.setValueRegex("value"); split.setFetchedColumns(fetchedColumns); - split.setPassword("password".getBytes()); - split.setUsername("root"); + split.setToken(new PasswordToken("password")); + split.setPrincipal("root"); split.setInstanceName("instance"); split.setMockInstance(true); split.setZooKeepers("localhost"); @@ -83,14 +79,9 @@ public class RangeInputSplitTest { Assert.assertEquals(split.isOffline(), newSplit.isOffline()); Assert.assertEquals(split.isIsolatedScan(), newSplit.isOffline()); Assert.assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators()); - Assert.assertEquals(split.getMaxVersions(), newSplit.getMaxVersions()); - Assert.assertEquals(split.getRowRegex(), newSplit.getRowRegex()); - Assert.assertEquals(split.getColfamRegex(), newSplit.getColfamRegex()); - Assert.assertEquals(split.getColqualRegex(), newSplit.getColqualRegex()); - Assert.assertEquals(split.getValueRegex(), newSplit.getValueRegex()); Assert.assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns()); - Assert.assertEquals(new String(split.getPassword()), new String(newSplit.getPassword())); - Assert.assertEquals(split.getUsername(), newSplit.getUsername()); + Assert.assertEquals(split.getToken(), newSplit.getToken()); + Assert.assertEquals(split.getPrincipal(), newSplit.getPrincipal()); Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName()); Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance()); Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers());