ACCUMULO-286 introduced ContextFactory to assist with testing InputFormats and OutputFormats under different versions of hadoop
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/trunk@1229596 13f79535-47bb-0310-9956-ffa450edef68 (cherry picked from commit c0a0afdcceedba2947949761ca13ecc42ff8f9c1) Reason: Testing Author: Billie Rinaldi <bil...@apache.org> Ref: ACCUMULO-1792 Expands change to CoordinateRecoveryTask to remove previous addition of InterruptedException catching in CoordinateRecoveryTask to maintain compile compatibility with non-2.0.2-alpha versions. Differs from upstream by expanding test modifications to include depecrated ones removed in 1.5.x and tests only found in the 1.4.x branch. Author: Sean Busbey <bus...@cloudera.com> Signed-off-by: Eric Newton <eric.new...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a3264e4f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a3264e4f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a3264e4f Branch: refs/heads/1.4.5-SNAPSHOT Commit: a3264e4f1b188e0f4808120fa39d5c1695b6b01c Parents: 5d22af4 Author: Billie Rinaldi <bil...@apache.org> Authored: Tue Jan 10 15:27:57 2012 +0000 Committer: Eric Newton <eric.new...@gmail.com> Committed: Mon Nov 25 16:06:42 2013 -0500 ---------------------------------------------------------------------- .../accumulo/core/util/ContextFactory.java | 169 +++++++++++++++++++ .../mapreduce/AccumuloFileOutputFormatTest.java | 8 +- .../mapreduce/AccumuloInputFormatTest.java | 71 ++++---- .../mapreduce/AccumuloRowInputFormatTest.java | 8 +- .../lib/partition/RangePartitionerTest.java | 5 +- .../helloworld/InsertWithOutputFormat.java | 10 +- .../simple/filedata/ChunkInputFormatTest.java | 16 +- .../server/master/CoordinateRecoveryTask.java | 5 +- .../apache/accumulo/server/master/LogSort.java | 2 +- 9 files changed, 224 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java new file mode 100644 index 0000000..67819da --- /dev/null +++ b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.util; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * A factory to allow applications to deal with inconsistencies between MapReduce Context Objects API between hadoop-0.20 and later versions. This code is based + * on org.apache.hadoop.mapreduce.ContextFactory in hadoop-mapred-0.22.0. + */ +public class ContextFactory { + + private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR; + private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR; + private static final Constructor<?> TASK_ID_CONSTRUCTOR; + private static final Constructor<?> MAP_CONSTRUCTOR; + private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR; + private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR; + private static final Class<?> TASK_TYPE_CLASS; + private static final boolean useV21; + + static { + boolean v21 = true; + final String PACKAGE = "org.apache.hadoop.mapreduce"; + try { + Class.forName(PACKAGE + ".task.JobContextImpl"); + } catch (ClassNotFoundException cnfe) { + v21 = false; + } + useV21 = v21; + Class<?> jobContextCls; + Class<?> taskContextCls; + Class<?> mapCls; + Class<?> mapContextCls; + Class<?> innerMapContextCls; + try { + if (v21) { + jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl"); + taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl"); + TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType"); + mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl"); + mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper"); + innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context"); + } else { + jobContextCls = Class.forName(PACKAGE + ".JobContext"); + taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext"); + TASK_TYPE_CLASS = null; + mapContextCls = Class.forName(PACKAGE + ".MapContext"); + mapCls = Class.forName(PACKAGE + ".Mapper"); + innerMapContextCls = Class.forName(PACKAGE + ".Mapper$Context"); + } + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class", e); + } + try { + JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class); + JOB_CONTEXT_CONSTRUCTOR.setAccessible(true); + TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class); + TASK_CONTEXT_CONSTRUCTOR.setAccessible(true); + if (useV21) { + TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, TASK_TYPE_CLASS, int.class, int.class); + TASK_ID_CONSTRUCTOR.setAccessible(true); + MAP_CONSTRUCTOR = mapCls.getConstructor(); + MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, MapContext.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR = mapContextCls.getDeclaredConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, + OutputCommitter.class, StatusReporter.class, InputSplit.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true); + } else { + TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, boolean.class, int.class, int.class); + TASK_ID_CONSTRUCTOR.setAccessible(true); + MAP_CONSTRUCTOR = null; + MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, + OutputCommitter.class, StatusReporter.class, InputSplit.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR = null; + } + MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); + } catch (SecurityException e) { + throw new IllegalArgumentException("Can't run constructor ", e); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Can't find constructor ", e); + } + } + + public static JobContext createJobContext() { + return createJobContext(new Configuration()); + } + + public static JobContext createJobContext(Configuration conf) { + try { + return (JobContext) JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, new JobID("local", 0)); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't create object", e); + } + } + + public static TaskAttemptContext createTaskAttemptContext(JobContext job) { + return createTaskAttemptContext(job.getConfiguration()); + } + + public static TaskAttemptContext createTaskAttemptContext(Configuration conf) { + try { + if (useV21) + return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, + TASK_ID_CONSTRUCTOR.newInstance("local", 0, TASK_TYPE_CLASS.getEnumConstants()[0], 0, 0)); + else + return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, TASK_ID_CONSTRUCTOR.newInstance("local", 0, true, 0, 0)); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't create object", e); + } + } + + @SuppressWarnings("unchecked") + public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader, + RecordWriter<K2,V2> writer, InputSplit split) { + try { + if (useV21) { + Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, null, null, split); + return (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis); + } else { + return (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, null, null, + split); + } + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't create object", e); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java index 342455f..84dce27 100644 --- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java +++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java @@ -26,13 +26,11 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.conf.Configuration; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -44,13 +42,13 @@ public class AccumuloFileOutputFormatTest { @Before public void setup() { - job = new JobContext(new Configuration(), new JobID()); + job = ContextFactory.createJobContext(); Path file = new Path(System.getenv("ACCUMULO_HOME") + "/target/"); f = new Path(file, "_temporary"); job.getConfiguration().set("mapred.output.dir", file.toString()); - tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + tac = ContextFactory.createTaskAttemptContext(job); } @After http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java index 7239b01..f2e2f2c 100644 --- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java +++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java @@ -39,16 +39,15 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; @@ -67,7 +66,7 @@ public class AccumuloInputFormatTest { */ @Test public void testMaxVersions() throws IOException { - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1); int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration()); assertEquals(1, version); @@ -81,7 +80,7 @@ public class AccumuloInputFormatTest { */ @Test(expected = IOException.class) public void testMaxVersionsLessThan1() throws IOException { - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0); } @@ -90,7 +89,7 @@ public class AccumuloInputFormatTest { */ @Test public void testNoMaxVersion() { - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConfiguration())); } @@ -100,8 +99,8 @@ public class AccumuloInputFormatTest { @SuppressWarnings("deprecation") @Test public void testSetIterator() { - JobContext job = new JobContext(new Configuration(), new JobID()); - + JobContext job = ContextFactory.createJobContext(); + AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow"); Configuration conf = job.getConfiguration(); String iterators = conf.get("AccumuloInputFormat.iterators"); @@ -110,8 +109,8 @@ public class AccumuloInputFormatTest { @Test public void testAddIterator() { - JobContext job = new JobContext(new Configuration(), new JobID()); - + JobContext job = ContextFactory.createJobContext(); + AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"); @@ -192,8 +191,8 @@ public class AccumuloInputFormatTest { @SuppressWarnings("deprecation") @Test public void testGetIteratorSettings() { - JobContext job = new JobContext(new Configuration(), new JobID()); - + JobContext job = ContextFactory.createJobContext(); + AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow"); AccumuloInputFormat.setIterator(job, 2, "org.apache.accumulo.core.iterators.VersioningIterator", "Versions"); AccumuloInputFormat.setIterator(job, 3, "org.apache.accumulo.core.iterators.CountingIterator", "Count"); @@ -227,7 +226,7 @@ public class AccumuloInputFormatTest { @SuppressWarnings("deprecation") @Test public void testSetIteratorOption() { - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue"); Configuration conf = job.getConfiguration(); @@ -241,8 +240,8 @@ public class AccumuloInputFormatTest { @SuppressWarnings("deprecation") @Test public void testGetIteratorOption() { - JobContext job = new JobContext(new Configuration(), new JobID()); - + JobContext job = ContextFactory.createJobContext(); + AccumuloInputFormat.setIteratorOption(job, "iterator1", "key1", "value1"); AccumuloInputFormat.setIteratorOption(job, "iterator2", "key2", "value2"); AccumuloInputFormat.setIteratorOption(job, "iterator3", "key3", "value3"); @@ -272,8 +271,8 @@ public class AccumuloInputFormatTest { @SuppressWarnings("deprecation") @Test public void testSetRegex() { - JobContext job = new JobContext(new Configuration(), new JobID()); - + JobContext job = ContextFactory.createJobContext(); + String regex = ">\"*%<>\'\\"; AccumuloInputFormat.setRegex(job, RegexType.ROW, regex); @@ -329,10 +328,9 @@ public class AccumuloInputFormatTest { Assert.assertEquals(new Authorizations(), risplit.getAuths()); Assert.assertEquals("testmapinstance", risplit.getInstanceName()); - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); - RecordReader<Key,Value> reader = input.createRecordReader(split, attempt); - Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); + RecordReader<Key,Value> reader = input.createRecordReader(split, tac); + Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, reader, null, split); reader.initialize(split, context); mapper.run(context); } @@ -350,20 +348,21 @@ public class AccumuloInputFormatTest { bw.addMutation(m); } bw.close(); - - JobContext job = new JobContext(new Configuration(), new JobID()); + + JobContext job = ContextFactory.createJobContext(); AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations()); AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); AccumuloInputFormat input = new AccumuloInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); RecordReader<Key,Value> rr = input.createRecordReader(ris, tac); rr.initialize(ris, tac); TestMapper mapper = new TestMapper(); - Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), tac.getTaskAttemptID(), rr, null, null, null, ris); + Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, rr, null, ris); + rr.initialize(ris, tac); while (rr.nextKeyValue()) { - mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context); + mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), (TestMapper.Context) context); } } @@ -380,15 +379,15 @@ public class AccumuloInputFormatTest { bw.addMutation(m); } bw.close(); - - JobContext job = new JobContext(new Configuration(), new JobID()); + + JobContext job = ContextFactory.createJobContext(); AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable3", new Authorizations()); AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); final String regex = ".*1.*"; AccumuloInputFormat.setRegex(job, RegexType.ROW, regex); AccumuloInputFormat input = new AccumuloInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); RecordReader<Key,Value> rr = input.createRecordReader(ris, tac); rr.initialize(ris, tac); @@ -401,7 +400,7 @@ public class AccumuloInputFormatTest { @SuppressWarnings("deprecation") @Test public void testCorrectRangeInputSplits() throws Exception { - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); String username = "user", table = "table", rowRegex = "row.*", colfRegex = "colf.*", colqRegex = "colq.*"; String valRegex = "val.*", instance = "instance"; @@ -485,10 +484,9 @@ public class AccumuloInputFormatTest { RangeInputSplit emptySplit = new RangeInputSplit(); // Using an empty split should fall back to the information in the Job's Configuration - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); - RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt); - Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); + RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, tac); + Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, reader, null, emptySplit); reader.initialize(emptySplit, context); mapper.run(context); } @@ -524,10 +522,9 @@ public class AccumuloInputFormatTest { emptySplit.setPassword("anythingelse".getBytes()); // Using an empty split should fall back to the information in the Job's Configuration - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id); - RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt); - Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); + RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, tac); + Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, reader, null, emptySplit); reader.initialize(emptySplit, context); mapper.run(context); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java ---------------------------------------------------------------------- diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java index d9f9da0..5199352 100644 --- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java +++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java @@ -34,14 +34,12 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.accumulo.core.util.PeekingIterator; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.junit.Test; public class AccumuloRowInputFormatTest { @@ -99,12 +97,12 @@ public class AccumuloRowInputFormatTest { insertList(bw, row3); bw.close(); - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); AccumuloRowInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations()); AccumuloRowInputFormat.setMockInstance(job.getConfiguration(), "instance1"); AccumuloRowInputFormat crif = new AccumuloRowInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); RecordReader<Text,PeekingIterator<Entry<Key,Value>>> rr = crif.createRecordReader(ris, tac); rr.initialize(ris, tac); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java ---------------------------------------------------------------------- diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java index 8b95c57..87059c1 100644 --- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java +++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java @@ -18,10 +18,9 @@ package org.apache.accumulo.core.client.mapreduce.lib.partition; import static org.junit.Assert.assertTrue; -import org.apache.hadoop.conf.Configuration; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; import org.junit.Test; public class RangePartitionerTest { @@ -53,7 +52,7 @@ public class RangePartitionerTest { } private RangePartitioner prepPartitioner(int numSubBins) { - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); RangePartitioner.setNumSubBins(job, numSubBins); RangePartitioner rp = new RangePartitioner(); rp.setConf(job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java b/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java index 7583494..af03470 100644 --- a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java +++ b/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java @@ -20,13 +20,12 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.hadoop.conf.Configuration; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -46,16 +45,15 @@ public class InsertWithOutputFormat extends Configured implements Tool { } Text tableName = new Text(args[4]); Job job = new Job(getConf()); - Configuration conf = job.getConfiguration(); - AccumuloOutputFormat.setZooKeeperInstance(conf, args[0], args[1]); - AccumuloOutputFormat.setOutputInfo(conf, args[2], args[3].getBytes(), true, null); + AccumuloOutputFormat.setZooKeeperInstance(job, args[0], args[1]); + AccumuloOutputFormat.setOutputInfo(job, args[3], args[4].getBytes(), true, null); job.setOutputFormatClass(AccumuloOutputFormat.class); // when running a mapreduce, you won't need to instantiate the output // format and record writer // mapreduce will do that for you, and you will just use // output.collect(tableName, mutation) - TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID()); + TaskAttemptContext context = ContextFactory.createTaskAttemptContext(job); RecordWriter<Text,Mutation> rw = new AccumuloOutputFormat().getRecordWriter(context); Text colf = new Text("colfam"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java ---------------------------------------------------------------------- diff --git a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java index af12302..8937048 100644 --- a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java +++ b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java @@ -37,12 +37,10 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.conf.Configuration; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.log4j.Logger; public class ChunkInputFormatTest extends TestCase { @@ -87,12 +85,12 @@ public class ChunkInputFormatTest extends TestCase { } bw.close(); - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D")); ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance1"); ChunkInputFormat cif = new ChunkInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); RecordReader<List<Entry<Key,Value>>,InputStream> rr = cif.createRecordReader(ris, tac); rr.initialize(ris, tac); @@ -138,12 +136,12 @@ public class ChunkInputFormatTest extends TestCase { } bw.close(); - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D")); ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance2"); ChunkInputFormat cif = new ChunkInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac); crr.initialize(ris, tac); @@ -177,12 +175,12 @@ public class ChunkInputFormatTest extends TestCase { } bw.close(); - JobContext job = new JobContext(new Configuration(), new JobID()); + JobContext job = ContextFactory.createJobContext(); ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D")); ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance3"); ChunkInputFormat cif = new ChunkInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac); crr.initialize(ris, tac); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java b/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java index 64ed42e..b98f29d 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java +++ b/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java @@ -257,10 +257,7 @@ public class CoordinateRecoveryTask implements Runnable { return new RecoveryStatus(logFile.server, logFile.file, (sortJob == null ? 0. : sortJob.mapProgress()), (sortJob == null ? 0. : sortJob.reduceProgress()), (int) (System.currentTimeMillis() - copyStartTime), (sortJob != null) ? 1. : (copySize == 0 ? 0 : copiedSoFar() / (double) copySize)); - } catch (InterruptedException ie) { - // Hadoop 2.0.2-alpha's Job.mapProgress throws Interrupted Exception. 1.x and 2.0.4 do not. - return new RecoveryStatus(logFile.server, logFile.file, 1.0, 1.0, (int) (System.currentTimeMillis() - copyStartTime), 1.0); - } catch (NullPointerException npe) { + } catch (Exception e) { return new RecoveryStatus(logFile.server, logFile.file, 1.0, 1.0, (int) (System.currentTimeMillis() - copyStartTime), 1.0); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java b/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java index 006d06e..4666331 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java +++ b/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java @@ -31,9 +31,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapreduce.Job;