ACCUMULO-375 added an LRUOutputCombiner to combine and reduce the number of 
mutations sent to Accumulo

git-svn-id: 
https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1242117 
13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/842696ee
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/842696ee
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/842696ee

Branch: refs/heads/master
Commit: 842696eebe9a10dc63731c99160d1e1ef23e7787
Parents: fa35931
Author: Adam Fuchs <afu...@apache.org>
Authored: Wed Feb 8 21:43:35 2012 +0000
Committer: Adam Fuchs <afu...@apache.org>
Committed: Wed Feb 8 21:43:35 2012 +0000

----------------------------------------------------------------------
 .../wikisearch/ingest/LRUOutputCombiner.java    |  61 +++++
 .../wikisearch/ingest/WikipediaInputFormat.java |   2 +
 .../wikisearch/ingest/WikipediaMapper.java      |   2 +-
 .../ingest/WikipediaPartitionedMapper.java      | 229 +++++++++++++------
 4 files changed, 229 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java
new file mode 100644
index 0000000..e641f36
--- /dev/null
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java
@@ -0,0 +1,61 @@
+package org.apache.accumulo.examples.wikisearch.ingest;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class LRUOutputCombiner<Key,Value> extends LinkedHashMap<Key,Value> {
+  
+  private static final long serialVersionUID = 1L;
+
+  public static abstract class Fold <Value>
+  {
+    public abstract Value fold(Value oldValue, Value newValue);
+  }
+  
+  public static abstract class Output<Key,Value>
+  {
+    public abstract void output(Key key, Value value);
+  }
+  
+  private final int capacity;
+  private final Fold<Value> fold;
+  private final Output<Key,Value> output;
+  
+  private long cacheHits = 0;
+  private long cacheMisses = 0;
+  
+  public LRUOutputCombiner(int capacity, Fold<Value> fold, Output<Key,Value> 
output) {
+    super(capacity + 1, 1.1f, true);
+    this.capacity = capacity;
+    this.fold = fold;
+    this.output = output;
+  }
+  
+  protected boolean removeEldestEntry(Map.Entry<Key,Value> eldest) {
+    if (size() > capacity) {
+      output.output(eldest.getKey(), eldest.getValue());
+      return true;
+    }
+    return false;
+  }
+  
+  @Override
+  public Value put(Key key, Value value) {
+    Value val = get(key);
+    if (val != null) {
+      value = fold.fold(val, value);
+      cacheHits++;
+    } else {
+      cacheMisses++;
+    }
+    super.put(key, value);
+    return null;
+  }
+  
+  public void flush() {
+    for (Map.Entry<Key,Value> e : entrySet()) {
+      output.output(e.getKey(), e.getValue());
+    }
+    clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
index 731d02c..e682f2f 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
@@ -67,6 +67,8 @@ public class WikipediaInputFormat extends TextInputFormat {
 
     @Override
     public String[] getLocations() throws IOException, InterruptedException {
+      // for highly replicated files, returning all of the locations can lead 
to bunching
+      // TODO replace this with a subset of the locations
       return fileSplit.getLocations();
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
index 1ec531b..a06c57f 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
@@ -205,7 +205,7 @@ public class WikipediaMapper extends 
Mapper<LongWritable,Text,Text,Mutation> {
    * @return
    * @throws IOException
    */
-  private Set<String> getTokens(Article article) throws IOException {
+  static Set<String> getTokens(Article article) throws IOException {
     Set<String> tokenList = new HashSet<String>();
     WikipediaTokenizer tok = new WikipediaTokenizer(new 
StringReader(article.getText()));
     TermAttribute term = tok.addAttribute(TermAttribute.class);

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
index 4d94c24..25bf572 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.examples.wikisearch.ingest;
 
 
 import java.io.IOException;
-import java.io.StringReader;
 import java.nio.charset.Charset;
 import java.util.HashSet;
 import java.util.Map.Entry;
@@ -31,17 +30,15 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
+import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
 import 
org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
 import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
-import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
-import org.apache.lucene.analysis.tokenattributes.TermAttribute;
-import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
@@ -66,17 +63,171 @@ public class WikipediaPartitionedMapper extends 
Mapper<Text,Article,Text,Mutatio
   private Text reverseIndexTableName = null;
   private Text metadataTableName = null;
   
+  private static class MutationInfo {
+    final String row;
+    final String colfam;
+    final String colqual;
+    final ColumnVisibility cv;
+    final long timestamp;
+    
+    public MutationInfo(String row, String colfam, String colqual, 
ColumnVisibility cv, long timestamp) {
+      super();
+      this.row = row;
+      this.colfam = colfam;
+      this.colqual = colqual;
+      this.cv = cv;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      MutationInfo other = (MutationInfo)obj;
+      return (row == other.row || row.equals(other.row)) &&
+          (colfam == other.colfam || colfam.equals(other.colfam)) &&
+          colqual.equals(other.colqual) &&
+          (cv == other.cv || cv.equals(other.cv)) &&
+          timestamp == other.timestamp;
+    }
+
+    @Override
+    public int hashCode() {
+      return row.hashCode() ^ colfam.hashCode() ^ colqual.hashCode() ^ 
cv.hashCode() ^ (int)timestamp;
+    }
+  }
+  
+  private LRUOutputCombiner<MutationInfo,CountAndSet> wikiIndexOutput;
+  private LRUOutputCombiner<MutationInfo,CountAndSet> wikiReverseIndexOutput;
+  private LRUOutputCombiner<MutationInfo,Value> wikiMetadataOutput;
+  
+  private static class CountAndSet
+  {
+    public int count;
+    public HashSet<String> set;
+    
+    public CountAndSet(String entry)
+    {
+      set = new HashSet<String>();
+      set.add(entry);
+      count = 1;
+    }
+  }
+  
+
   @Override
-  public void setup(Context context) {
+  public void setup(final Context context) {
     Configuration conf = context.getConfiguration();
     tablename = new Text(WikipediaConfiguration.getTableName(conf));
     indexTableName = new Text(tablename + "Index");
     reverseIndexTableName = new Text(tablename + "ReverseIndex");
     metadataTableName = new Text(tablename + "Metadata");
     
+    final Text metadataTableNameFinal = metadataTableName;
+    final Text indexTableNameFinal = indexTableName;
+    final Text reverseIndexTableNameFinal = reverseIndexTableName;
+    
     numPartitions = WikipediaConfiguration.getNumPartitions(conf);
+
+    LRUOutputCombiner.Fold<CountAndSet> indexFold = 
+        new LRUOutputCombiner.Fold<CountAndSet>() {
+      @Override
+      public CountAndSet fold(CountAndSet oldValue, CountAndSet newValue) {
+        oldValue.count += newValue.count;
+        if(oldValue.set == null || newValue.set == null)
+        {
+          oldValue.set = null;
+          return oldValue;
+        }
+        oldValue.set.addAll(newValue.set);
+        if(oldValue.set.size() > GlobalIndexUidCombiner.MAX)
+          oldValue.set = null;
+        return oldValue;
+      }
+    };
+    LRUOutputCombiner.Output<MutationInfo,CountAndSet> indexOutput =
+        new 
LRUOutputCombiner.Output<WikipediaPartitionedMapper.MutationInfo,CountAndSet>() 
{
+      
+      @Override
+      public void output(MutationInfo key, CountAndSet value)
+      {
+          Uid.List.Builder builder = Uid.List.newBuilder();
+          builder.setCOUNT(value.count);
+          if (value.set == null) {
+            builder.setIGNORE(true);
+            builder.clearUID();
+          } else {
+            builder.setIGNORE(false);
+            builder.addAllUID(value.set);
+          }
+          Uid.List list = builder.build();
+          Value val = new Value(list.toByteArray());
+          Mutation m = new Mutation(key.row);
+          m.put(key.colfam, key.colqual, key.cv, key.timestamp, val);
+          try {
+            context.write(indexTableNameFinal, m);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+      }
+    };
+    LRUOutputCombiner.Output<MutationInfo,CountAndSet> reverseIndexOutput =
+        new 
LRUOutputCombiner.Output<WikipediaPartitionedMapper.MutationInfo,CountAndSet>() 
{
+      
+      @Override
+      public void output(MutationInfo key, CountAndSet value)
+      {
+          Uid.List.Builder builder = Uid.List.newBuilder();
+          builder.setCOUNT(value.count);
+          if (value.set == null) {
+            builder.setIGNORE(true);
+            builder.clearUID();
+          } else {
+            builder.setIGNORE(false);
+            builder.addAllUID(value.set);
+          }
+          Uid.List list = builder.build();
+          Value val = new Value(list.toByteArray());
+          Mutation m = new Mutation(key.row);
+          m.put(key.colfam, key.colqual, key.cv, key.timestamp, val);
+          try {
+            context.write(reverseIndexTableNameFinal, m);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+      }
+    };
+      
+    wikiIndexOutput = new 
LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,CountAndSet>(10000,indexFold,indexOutput);
+    wikiReverseIndexOutput = new 
LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,CountAndSet>(10000, 
indexFold,reverseIndexOutput);
+    wikiMetadataOutput = new 
LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,Value>(10000,
+        new LRUOutputCombiner.Fold<Value>() {
+          @Override
+          public Value fold(Value oldValue, Value newValue) {
+            return oldValue;
+          }},
+        new LRUOutputCombiner.Output<MutationInfo,Value>() {
+          @Override
+          public void output(MutationInfo key, Value value) {
+            Mutation m = new Mutation(key.row);
+            m.put(key.colfam, key.colqual, key.cv, key.timestamp, value);
+            try {
+              context.write(metadataTableNameFinal, m);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }});
   }
   
+  
+  
+  @Override
+  protected void cleanup(Context context) throws IOException, 
InterruptedException {
+    wikiIndexOutput.flush();
+    wikiMetadataOutput.flush();
+    wikiReverseIndexOutput.flush();
+  }
+
+
+
   @Override
   protected void map(Text language, Article article, Context context) throws 
IOException, InterruptedException {
     String NULL_BYTE = "\u0000";
@@ -93,13 +244,12 @@ public class WikipediaPartitionedMapper extends 
Mapper<Text,Article,Text,Mutatio
       for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
         m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + 
entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE);
         // Create mutations for the metadata table.
-        Mutation mm = new Mutation(entry.getKey());
-        mm.put(METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, 
article.getTimestamp(), NULL_VALUE);
-        context.write(metadataTableName, mm);
+        MutationInfo mm = new MutationInfo(entry.getKey(), 
METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp());
+        wikiMetadataOutput.put(mm, NULL_VALUE);
       }
       
       // Tokenize the content
-      Set<String> tokens = getTokens(article);
+      Set<String> tokens = WikipediaMapper.getTokens(article);
       
       // We are going to put the fields to be indexed into a multimap. This 
allows us to iterate
       // over the entire set once.
@@ -118,30 +268,17 @@ public class WikipediaPartitionedMapper extends 
Mapper<Text,Article,Text,Mutatio
         m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + 
colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE);
         
         // Create mutations for the global index
-        // Create a UID object for the Value
-        Builder uidBuilder = Uid.List.newBuilder();
-        uidBuilder.setIGNORE(false);
-        uidBuilder.setCOUNT(1);
-        uidBuilder.addUID(Integer.toString(article.getId()));
-        Uid.List uidList = uidBuilder.build();
-        Value val = new Value(uidList.toByteArray());
-        
-        // Create mutations for the global index
         // Row is field value, colf is field name, colq is 
partitionid\0language, value is Uid.List object
-        Mutation gm = new Mutation(index.getValue());
-        gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, 
article.getTimestamp(), val);
-        context.write(indexTableName, gm);
+        MutationInfo gm = new 
MutationInfo(index.getValue(),index.getKey(),partitionId + NULL_BYTE + 
language, cv, article.getTimestamp());
+        wikiIndexOutput.put(gm, new 
CountAndSet(Integer.toString(article.getId())));
         
         // Create mutations for the global reverse index
-        Mutation grm = new Mutation(StringUtils.reverse(index.getValue()));
-        grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, 
article.getTimestamp(), val);
-        context.write(reverseIndexTableName, grm);
+        MutationInfo grm = new 
MutationInfo(StringUtils.reverse(index.getValue()),index.getKey(),partitionId + 
NULL_BYTE + language, cv, article.getTimestamp());
+        wikiReverseIndexOutput.put(grm, new 
CountAndSet(Integer.toString(article.getId())));
         
         // Create mutations for the metadata table.
-        Mutation mm = new Mutation(index.getKey());
-        mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + 
LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), 
NULL_VALUE);
-        context.write(metadataTableName, mm);
-        
+        MutationInfo mm = new 
MutationInfo(index.getKey(),METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE 
+ LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp());
+        wikiMetadataOutput.put(mm, NULL_VALUE);
       }
       // Add the entire text to the document section of the table.
       // row is the partition, colf is 'd', colq is language\0articleid, value 
is Base64 encoded GZIP'd document
@@ -153,40 +290,4 @@ public class WikipediaPartitionedMapper extends 
Mapper<Text,Article,Text,Mutatio
     }
     context.progress();
   }
-  
-  /**
-   * Tokenize the wikipedia content
-   * 
-   * @param article
-   * @return
-   * @throws IOException
-   */
-  private Set<String> getTokens(Article article) throws IOException {
-    Set<String> tokenList = new HashSet<String>();
-    WikipediaTokenizer tok = new WikipediaTokenizer(new 
StringReader(article.getText()));
-    TermAttribute term = tok.addAttribute(TermAttribute.class);
-    try {
-      while (tok.incrementToken()) {
-        String token = term.term();
-        if (!StringUtils.isEmpty(token))
-          tokenList.add(token);
-      }
-    } catch (IOException e) {
-      log.error("Error tokenizing text", e);
-    } finally {
-      try {
-        tok.end();
-      } catch (IOException e) {
-        log.error("Error calling end()", e);
-      } finally {
-        try {
-          tok.close();
-        } catch (IOException e) {
-          log.error("Error closing tokenizer", e);
-        }
-      }
-    }
-    return tokenList;
-  }
-  
 }

Reply via email to