ACCUMULO-375 git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1241206 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/ddab37e8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/ddab37e8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/ddab37e8 Branch: refs/heads/1.4.5-SNAPSHOT Commit: ddab37e843d68c41a80a39457758cbb009498ca3 Parents: 6a3b419 Author: Adam Fuchs <afu...@apache.org> Authored: Mon Feb 6 22:01:48 2012 +0000 Committer: Adam Fuchs <afu...@apache.org> Committed: Mon Feb 6 22:01:48 2012 +0000 ---------------------------------------------------------------------- .../ingest/WikipediaConfiguration.java | 7 ++ .../wikisearch/ingest/WikipediaInputFormat.java | 87 ++++++++++++++++++++ .../wikisearch/ingest/WikipediaMapper.java | 13 ++- .../reader/AggregatingRecordReader.java | 3 +- .../wikisearch/ingest/WikipediaMapperTest.java | 1 + .../reader/AggregatingRecordReaderTest.java | 15 ++-- .../wikisearch/logic/TestQueryLogic.java | 5 +- 7 files changed, 121 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java index 2f0ea61..d76d713 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java @@ -45,6 +45,9 @@ public class WikipediaConfiguration { public final static String ANALYZER = "wikipedia.index.analyzer"; public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions"; + + public final static String NUM_GROUPS = "wikipedia.ingest.groups"; + public static String getUser(Configuration conf) { return conf.get(USER); @@ -110,6 +113,10 @@ public class WikipediaConfiguration { return conf.getInt(NUM_PARTITIONS, 25); } + public static int getNumGroups(Configuration conf) { + return conf.getInt(NUM_GROUPS, 1); + } + /** * Helper method to get properties from Hadoop configuration * http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java index 8a5ed66..4c6a3b8 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java @@ -16,20 +16,107 @@ */ package org.apache.accumulo.examples.wikisearch.ingest; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class WikipediaInputFormat extends TextInputFormat { + + public static class WikipediaInputSplit extends InputSplit implements Writable { + + public WikipediaInputSplit(){} + + public WikipediaInputSplit(FileSplit fileSplit, int partition) + { + this.fileSplit = fileSplit; + this.partition = partition; + } + + private FileSplit fileSplit = null; + private int partition = -1; + + public int getPartition() + { + return partition; + } + + public FileSplit getFileSplit() + { + return fileSplit; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return fileSplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return fileSplit.getLocations(); + } + + @Override + public void readFields(DataInput in) throws IOException { + Path file = new Path(in.readUTF()); + long start = in.readLong(); + long length = in.readLong(); + int numHosts = in.readInt(); + String[] hosts = new String[numHosts]; + for(int i = 0; i < numHosts; i++) + hosts[i] = in.readUTF(); + fileSplit = new FileSplit(file, start, length, hosts); + partition = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(fileSplit.getPath().toString()); + out.writeLong(fileSplit.getStart()); + out.writeLong(fileSplit.getLength()); + String [] hosts = fileSplit.getLocations(); + out.writeInt(hosts.length); + for(String host:hosts) + out.writeUTF(host); + fileSplit.write(out); + out.writeInt(partition); + } + + } @Override + public List<InputSplit> getSplits(JobContext job) throws IOException { + List<InputSplit> superSplits = super.getSplits(job); + List<WikipediaInputSplit> splits = new ArrayList<WikipediaInputSplit>(); + + int numGroups = WikipediaConfiguration.getNumGroups(job.getConfiguration()); + + for(InputSplit split:superSplits) + { + FileSplit fileSplit = (FileSplit)split; + for(int group = 0; group < numGroups; group++) + { + splits.add(new WikipediaInputSplit(fileSplit,group)); + } + } + return super.getSplits(job); + } + + @Override public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) { return new AggregatingRecordReader(); } http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java index c343f52..1ec531b 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.protobuf.Uid; import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; @@ -71,6 +72,9 @@ public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation> { private String language; private int numPartitions = 0; private ColumnVisibility cv = null; + + private int myGroup = -1; + private int numGroups = -1; private Text tablename = null; private Text indexTableName = null; @@ -85,7 +89,11 @@ public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation> { reverseIndexTableName = new Text(tablename + "ReverseIndex"); metadataTableName = new Text(tablename + "Metadata"); - FileSplit split = (FileSplit) context.getInputSplit(); + WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit(); + myGroup = wiSplit.getPartition(); + numGroups = WikipediaConfiguration.getNumGroups(conf); + + FileSplit split = wiSplit.getFileSplit(); String fileName = split.getPath().getName(); Matcher matcher = languagePattern.matcher(fileName); if (matcher.matches()) { @@ -118,6 +126,9 @@ public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation> { String colfPrefix = language + NULL_BYTE; String indexPrefix = "fi" + NULL_BYTE; if (article != null) { + int groupId = WikipediaMapper.getPartitionId(article, numGroups); + if(groupId != myGroup) + return; Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions))); // Create the mutations for the document. http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java index 0ae0290..09755c0 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java @@ -20,6 +20,7 @@ package org.apache.accumulo.examples.wikisearch.reader; import java.io.IOException; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.util.TextUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -60,7 +61,7 @@ public class AggregatingRecordReader extends LongLineRecordReader { @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { - super.initialize(genericSplit, context); + super.initialize(((WikipediaInputSplit)genericSplit).getFileSplit(), context); this.startToken = WikipediaConfiguration.isNull(context.getConfiguration(), START_TOKEN, String.class); this.endToken = WikipediaConfiguration.isNull(context.getConfiguration(), END_TOKEN, String.class); this.returnPartialMatches = context.getConfiguration().getBoolean(RETURN_PARTIAL_MATCHES, false); http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java ---------------------------------------------------------------------- diff --git a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java index 1ed0fa5..a924aee 100644 --- a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java +++ b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java @@ -100,6 +100,7 @@ public class WikipediaMapperTest { conf.set(AggregatingRecordReader.END_TOKEN, "</page>"); conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME); conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1"); + conf.set(WikipediaConfiguration.NUM_GROUPS, "1"); MockInstance i = new MockInstance(); c = i.getConnector("root", "pass"); http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java index 0022a1e..c1cb263 100644 --- a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java +++ b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java @@ -28,6 +28,7 @@ import javax.xml.xpath.XPath; import javax.xml.xpath.XPathExpression; import javax.xml.xpath.XPathFactory; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -140,7 +141,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); AggregatingRecordReader reader = new AggregatingRecordReader(); try { // Clear the values for BEGIN and STOP TOKEN @@ -162,7 +163,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -183,7 +184,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -201,7 +202,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -219,7 +220,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -244,7 +245,7 @@ public class AggregatingRecordReaderTest { // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -263,7 +264,7 @@ public class AggregatingRecordReaderTest { File f = createFile(xml5); // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java b/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java index 1fde533..7276360 100644 --- a/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java +++ b/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java @@ -37,6 +37,8 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; @@ -113,6 +115,7 @@ public class TestQueryLogic { conf.set(AggregatingRecordReader.END_TOKEN, "</page>"); conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME); conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1"); + conf.set(WikipediaConfiguration.NUM_GROUPS, "1"); MockInstance i = new MockInstance(); c = i.getConnector("root", "pass"); @@ -136,7 +139,7 @@ public class TestQueryLogic { Path tmpFile = new Path(data.getAbsolutePath()); // Setup the Mapper - InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null),0); AggregatingRecordReader rr = new AggregatingRecordReader(); Path ocPath = new Path(tmpFile, "oc"); OutputCommitter oc = new FileOutputCommitter(ocPath, context);