ACCUMULO-286 added context factory stuff git-svn-id: https://svn.apache.org/repos/asf/accumulo/trunk@1328104 13f79535-47bb-0310-9956-ffa450edef68 (cherry picked from commit 0680b04bf03e2d6ad19ae3c368f6cb23f4e30056)
Reason: Testing Author: Billie Rinaldi <bil...@apache.org> Ref: ACCUMULO-1792 Signed-off-by: Eric Newton <eric.new...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7fa0085a Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7fa0085a Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7fa0085a Branch: refs/heads/1.5.1-SNAPSHOT Commit: 7fa0085a48248e0fcb06ee38064f712026ed9609 Parents: 6bba940 Author: Billie Rinaldi <bil...@apache.org> Authored: Thu Apr 19 20:22:16 2012 +0000 Committer: Eric Newton <eric.new...@gmail.com> Committed: Mon Nov 25 16:06:42 2013 -0500 ---------------------------------------------------------------------- .../accumulo/core/util/ContextFactory.java | 12 +++++++--- .../helloworld/InsertWithOutputFormat.java | 4 ++-- .../simple/filedata/ChunkInputFormatTest.java | 6 ++--- .../wikisearch/ingest/WikipediaMapperTest.java | 9 +++----- .../reader/AggregatingRecordReaderTest.java | 24 +++++++++++--------- .../wikisearch/logic/TestQueryLogic.java | 9 ++++---- 6 files changed, 34 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/7fa0085a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java index 61e853d..5a1c2ef 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java +++ b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java @@ -147,15 +147,21 @@ public class ContextFactory { } } - @SuppressWarnings({"unchecked", "rawtypes"}) public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader, RecordWriter<K2,V2> writer, InputSplit split) { + return createMapContext(m, tac, reader, writer, null, null, split); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader, + RecordWriter<K2,V2> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { try { if (useV21) { - Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, null, null, split); + Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, split); return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis); } else { - return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, null, null, split); + return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, + split); } } catch (InstantiationException e) { throw new IllegalArgumentException("Can't create object", e); http://git-wip-us.apache.org/repos/asf/accumulo/blob/7fa0085a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java b/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java index af03470..5c37d6c 100644 --- a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java +++ b/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java @@ -45,8 +45,8 @@ public class InsertWithOutputFormat extends Configured implements Tool { } Text tableName = new Text(args[4]); Job job = new Job(getConf()); - AccumuloOutputFormat.setZooKeeperInstance(job, args[0], args[1]); - AccumuloOutputFormat.setOutputInfo(job, args[3], args[4].getBytes(), true, null); + AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(), args[0], args[1]); + AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), args[2], args[3].getBytes(), true, null); job.setOutputFormatClass(AccumuloOutputFormat.class); // when running a mapreduce, you won't need to instantiate the output http://git-wip-us.apache.org/repos/asf/accumulo/blob/7fa0085a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java ---------------------------------------------------------------------- diff --git a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java index 8937048..53cadac 100644 --- a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java +++ b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java @@ -90,7 +90,7 @@ public class ChunkInputFormatTest extends TestCase { ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance1"); ChunkInputFormat cif = new ChunkInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job.getConfiguration()); RecordReader<List<Entry<Key,Value>>,InputStream> rr = cif.createRecordReader(ris, tac); rr.initialize(ris, tac); @@ -141,7 +141,7 @@ public class ChunkInputFormatTest extends TestCase { ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance2"); ChunkInputFormat cif = new ChunkInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job.getConfiguration()); RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac); crr.initialize(ris, tac); @@ -180,7 +180,7 @@ public class ChunkInputFormatTest extends TestCase { ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance3"); ChunkInputFormat cif = new ChunkInputFormat(); RangeInputSplit ris = new RangeInputSplit(); - TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job); + TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job.getConfiguration()); RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac); crr.initialize(ris, tac); http://git-wip-us.apache.org/repos/asf/accumulo/blob/7fa0085a/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java b/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java index a924aee..c659ec4 100644 --- a/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java +++ b/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java @@ -34,8 +34,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; -import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -47,7 +46,6 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.junit.Before; @@ -118,8 +116,7 @@ public class WikipediaMapperTest { writerMap.put(new Text(INDEX_TABLE_NAME), c.createBatchWriter(INDEX_TABLE_NAME, 1000L, 1000L, 1)); writerMap.put(new Text(RINDEX_TABLE_NAME), c.createBatchWriter(RINDEX_TABLE_NAME, 1000L, 1000L, 1)); - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext context = new TaskAttemptContext(conf, id); + TaskAttemptContext context = ContextFactory.createTaskAttemptContext(conf); RawLocalFileSystem fs = new RawLocalFileSystem(); fs.setConf(conf); @@ -141,7 +138,7 @@ public class WikipediaMapperTest { WikipediaMapper mapper = new WikipediaMapper(); // Load data into Mock Accumulo - Mapper<LongWritable,Text,Text,Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split); + Mapper<LongWritable,Text,Text,Mutation>.Context con = ContextFactory.createMapContext(mapper, context, rr, rw, oc, sr, split); mapper.run(con); // Flush and close record writers. http://git-wip-us.apache.org/repos/asf/accumulo/blob/7fa0085a/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java b/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java index c1cb263..c842da7 100644 --- a/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java +++ b/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java @@ -16,7 +16,10 @@ */ package org.apache.accumulo.examples.wikisearch.reader; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.FileWriter; @@ -28,13 +31,12 @@ import javax.xml.xpath.XPath; import javax.xml.xpath.XPathExpression; import javax.xml.xpath.XPathFactory; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; -import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.junit.Before; import org.junit.Test; @@ -95,7 +97,7 @@ public class AggregatingRecordReaderTest { conf.set(AggregatingRecordReader.START_TOKEN, "<doc"); conf.set(AggregatingRecordReader.END_TOKEN, "</doc>"); conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(true)); - ctx = new TaskAttemptContext(conf, new TaskAttemptID()); + ctx = ContextFactory.createTaskAttemptContext(conf); XPath xp = xpFactory.newXPath(); EXPR_A = xp.compile("/doc/a"); EXPR_B = xp.compile("/doc/b"); @@ -141,7 +143,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); AggregatingRecordReader reader = new AggregatingRecordReader(); try { // Clear the values for BEGIN and STOP TOKEN @@ -163,7 +165,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -184,7 +186,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -202,7 +204,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -220,7 +222,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -245,7 +247,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -264,7 +266,7 @@ public class AggregatingRecordReaderTest { File f = createFile(xml5); // Create FileSplit Path p = new Path(f.toURI().toString()); - WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/7fa0085a/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java b/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java index 4b7aaee..938f01b 100644 --- a/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java +++ b/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ContextFactory; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; @@ -53,7 +54,6 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.log4j.Level; @@ -125,8 +125,7 @@ public class TestQueryLogic { writerMap.put(new Text(table), c.createBatchWriter(table, 1000L, 1000L, 1)); } - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext context = new TaskAttemptContext(conf, id); + TaskAttemptContext context = ContextFactory.createTaskAttemptContext(conf); RawLocalFileSystem fs = new RawLocalFileSystem(); fs.setConf(conf); @@ -137,7 +136,7 @@ public class TestQueryLogic { Path tmpFile = new Path(data.getAbsolutePath()); // Setup the Mapper - WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null),0); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null), 0); AggregatingRecordReader rr = new AggregatingRecordReader(); Path ocPath = new Path(tmpFile, "oc"); OutputCommitter oc = new FileOutputCommitter(ocPath, context); @@ -148,7 +147,7 @@ public class TestQueryLogic { WikipediaMapper mapper = new WikipediaMapper(); // Load data into Mock Accumulo - Mapper<LongWritable,Text,Text,Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split); + Mapper<LongWritable,Text,Text,Mutation>.Context con = ContextFactory.createMapContext(mapper, context, rr, rw, oc, sr, split); mapper.run(con); // Flush and close record writers.