ACCUMULO-375

git-svn-id: 
https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1241206 
13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/ddab37e8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/ddab37e8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/ddab37e8

Branch: refs/heads/1.4.5-SNAPSHOT
Commit: ddab37e843d68c41a80a39457758cbb009498ca3
Parents: 6a3b419
Author: Adam Fuchs <afu...@apache.org>
Authored: Mon Feb 6 22:01:48 2012 +0000
Committer: Adam Fuchs <afu...@apache.org>
Committed: Mon Feb 6 22:01:48 2012 +0000

----------------------------------------------------------------------
 .../ingest/WikipediaConfiguration.java          |  7 ++
 .../wikisearch/ingest/WikipediaInputFormat.java | 87 ++++++++++++++++++++
 .../wikisearch/ingest/WikipediaMapper.java      | 13 ++-
 .../reader/AggregatingRecordReader.java         |  3 +-
 .../wikisearch/ingest/WikipediaMapperTest.java  |  1 +
 .../reader/AggregatingRecordReaderTest.java     | 15 ++--
 .../wikisearch/logic/TestQueryLogic.java        |  5 +-
 7 files changed, 121 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
index 2f0ea61..d76d713 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
@@ -45,6 +45,9 @@ public class WikipediaConfiguration {
   public final static String ANALYZER = "wikipedia.index.analyzer";
   
   public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+
+  public final static String NUM_GROUPS = "wikipedia.ingest.groups";
+
   
   public static String getUser(Configuration conf) {
     return conf.get(USER);
@@ -110,6 +113,10 @@ public class WikipediaConfiguration {
     return conf.getInt(NUM_PARTITIONS, 25);
   }
   
+  public static int getNumGroups(Configuration conf) {
+    return conf.getInt(NUM_GROUPS, 1);
+  }
+  
   /**
    * Helper method to get properties from Hadoop configuration
    * 

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
index 8a5ed66..4c6a3b8 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
@@ -16,20 +16,107 @@
  */
 package org.apache.accumulo.examples.wikisearch.ingest;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
 
 public class WikipediaInputFormat extends TextInputFormat {
+
+  public static class WikipediaInputSplit extends InputSplit implements 
Writable {
+
+    public WikipediaInputSplit(){}
+    
+    public WikipediaInputSplit(FileSplit fileSplit, int partition)
+    {
+      this.fileSplit = fileSplit;
+      this.partition = partition;
+    }
+    
+    private FileSplit fileSplit = null;
+    private int partition = -1;
+
+    public int getPartition()
+    {
+      return partition;
+    }
+    
+    public FileSplit getFileSplit()
+    {
+      return fileSplit;
+    }
+    
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return fileSplit.getLength();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return fileSplit.getLocations();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      Path file = new Path(in.readUTF());
+      long start = in.readLong();
+      long length = in.readLong();
+      int numHosts = in.readInt();
+      String[] hosts = new String[numHosts];
+      for(int i = 0; i < numHosts; i++)
+        hosts[i] = in.readUTF();
+      fileSplit = new FileSplit(file, start, length, hosts);
+      partition = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(fileSplit.getPath().toString());
+      out.writeLong(fileSplit.getStart());
+      out.writeLong(fileSplit.getLength());
+      String [] hosts = fileSplit.getLocations();
+      out.writeInt(hosts.length);
+      for(String host:hosts)
+        out.writeUTF(host);
+      fileSplit.write(out);
+      out.writeInt(partition);
+    }
+    
+  }
   
   @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    List<InputSplit> superSplits = super.getSplits(job);
+    List<WikipediaInputSplit> splits = new ArrayList<WikipediaInputSplit>();
+    
+    int numGroups = 
WikipediaConfiguration.getNumGroups(job.getConfiguration());
+
+    for(InputSplit split:superSplits)
+    {
+      FileSplit fileSplit = (FileSplit)split;
+      for(int group = 0; group < numGroups; group++)
+      {
+        splits.add(new WikipediaInputSplit(fileSplit,group));
+      }
+    }
+    return super.getSplits(job);
+  }
+
+  @Override
   public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, 
TaskAttemptContext context) {
     return new AggregatingRecordReader();
   }

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
index c343f52..1ec531b 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
+import 
org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import 
org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
 import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
 import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder;
@@ -71,6 +72,9 @@ public class WikipediaMapper extends 
Mapper<LongWritable,Text,Text,Mutation> {
   private String language;
   private int numPartitions = 0;
   private ColumnVisibility cv = null;
+
+  private int myGroup = -1;
+  private int numGroups = -1;
   
   private Text tablename = null;
   private Text indexTableName = null;
@@ -85,7 +89,11 @@ public class WikipediaMapper extends 
Mapper<LongWritable,Text,Text,Mutation> {
     reverseIndexTableName = new Text(tablename + "ReverseIndex");
     metadataTableName = new Text(tablename + "Metadata");
     
-    FileSplit split = (FileSplit) context.getInputSplit();
+    WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit();
+    myGroup = wiSplit.getPartition();
+    numGroups = WikipediaConfiguration.getNumGroups(conf);
+    
+    FileSplit split = wiSplit.getFileSplit();
     String fileName = split.getPath().getName();
     Matcher matcher = languagePattern.matcher(fileName);
     if (matcher.matches()) {
@@ -118,6 +126,9 @@ public class WikipediaMapper extends 
Mapper<LongWritable,Text,Text,Mutation> {
     String colfPrefix = language + NULL_BYTE;
     String indexPrefix = "fi" + NULL_BYTE;
     if (article != null) {
+      int groupId = WikipediaMapper.getPartitionId(article, numGroups);
+      if(groupId != myGroup)
+        return;
       Text partitionId = new 
Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions)));
       
       // Create the mutations for the document.

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
index 0ae0290..09755c0 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.examples.wikisearch.reader;
 import java.io.IOException;
 
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration;
+import 
org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.util.TextUtil;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -60,7 +61,7 @@ public class AggregatingRecordReader extends 
LongLineRecordReader {
   
   @Override
   public void initialize(InputSplit genericSplit, TaskAttemptContext context) 
throws IOException {
-    super.initialize(genericSplit, context);
+    super.initialize(((WikipediaInputSplit)genericSplit).getFileSplit(), 
context);
     this.startToken = 
WikipediaConfiguration.isNull(context.getConfiguration(), START_TOKEN, 
String.class);
     this.endToken = WikipediaConfiguration.isNull(context.getConfiguration(), 
END_TOKEN, String.class);
     this.returnPartialMatches = 
context.getConfiguration().getBoolean(RETURN_PARTIAL_MATCHES, false);

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
 
b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
index 1ed0fa5..a924aee 100644
--- 
a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
+++ 
b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
@@ -100,6 +100,7 @@ public class WikipediaMapperTest {
     conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
     conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
     conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    conf.set(WikipediaConfiguration.NUM_GROUPS, "1");
     
     MockInstance i = new MockInstance();
     c = i.getConnector("root", "pass");

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
 
b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
index 0022a1e..c1cb263 100644
--- 
a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
+++ 
b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
@@ -28,6 +28,7 @@ import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathExpression;
 import javax.xml.xpath.XPathFactory;
 
+import 
org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -140,7 +141,7 @@ public class AggregatingRecordReaderTest {
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, 
f.length(), null),0);
     AggregatingRecordReader reader = new AggregatingRecordReader();
     try {
       // Clear the values for BEGIN and STOP TOKEN
@@ -162,7 +163,7 @@ public class AggregatingRecordReaderTest {
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, 
f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -183,7 +184,7 @@ public class AggregatingRecordReaderTest {
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, 
f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -201,7 +202,7 @@ public class AggregatingRecordReaderTest {
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, 
f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -219,7 +220,7 @@ public class AggregatingRecordReaderTest {
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, 
f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -244,7 +245,7 @@ public class AggregatingRecordReaderTest {
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, 
f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -263,7 +264,7 @@ public class AggregatingRecordReaderTest {
     File f = createFile(xml5);
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, 
f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ddab37e8/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
----------------------------------------------------------------------
diff --git 
a/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
 
b/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
index 1fde533..7276360 100644
--- 
a/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
+++ 
b/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
@@ -37,6 +37,8 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat;
+import 
org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
 import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator;
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
@@ -113,6 +115,7 @@ public class TestQueryLogic {
     conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
     conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
     conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    conf.set(WikipediaConfiguration.NUM_GROUPS, "1");
     
     MockInstance i = new MockInstance();
     c = i.getConnector("root", "pass");
@@ -136,7 +139,7 @@ public class TestQueryLogic {
     Path tmpFile = new Path(data.getAbsolutePath());
     
     // Setup the Mapper
-    InputSplit split = new FileSplit(tmpFile, 0, 
fs.pathToFile(tmpFile).length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 
0, fs.pathToFile(tmpFile).length(), null),0);
     AggregatingRecordReader rr = new AggregatingRecordReader();
     Path ocPath = new Path(tmpFile, "oc");
     OutputCommitter oc = new FileOutputCommitter(ocPath, context);

Reply via email to