ACCUMULO-375 added an LRUOutputCombiner to combine and reduce the number of mutations sent to Accumulo
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1242117 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/842696ee Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/842696ee Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/842696ee Branch: refs/heads/1.4.5-SNAPSHOT Commit: 842696eebe9a10dc63731c99160d1e1ef23e7787 Parents: fa35931 Author: Adam Fuchs <afu...@apache.org> Authored: Wed Feb 8 21:43:35 2012 +0000 Committer: Adam Fuchs <afu...@apache.org> Committed: Wed Feb 8 21:43:35 2012 +0000 ---------------------------------------------------------------------- .../wikisearch/ingest/LRUOutputCombiner.java | 61 +++++ .../wikisearch/ingest/WikipediaInputFormat.java | 2 + .../wikisearch/ingest/WikipediaMapper.java | 2 +- .../ingest/WikipediaPartitionedMapper.java | 229 +++++++++++++------ 4 files changed, 229 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java new file mode 100644 index 0000000..e641f36 --- /dev/null +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java @@ -0,0 +1,61 @@ +package org.apache.accumulo.examples.wikisearch.ingest; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRUOutputCombiner<Key,Value> extends LinkedHashMap<Key,Value> { + + private static final long serialVersionUID = 1L; + + public static abstract class Fold <Value> + { + public abstract Value fold(Value oldValue, Value newValue); + } + + public static abstract class Output<Key,Value> + { + public abstract void output(Key key, Value value); + } + + private final int capacity; + private final Fold<Value> fold; + private final Output<Key,Value> output; + + private long cacheHits = 0; + private long cacheMisses = 0; + + public LRUOutputCombiner(int capacity, Fold<Value> fold, Output<Key,Value> output) { + super(capacity + 1, 1.1f, true); + this.capacity = capacity; + this.fold = fold; + this.output = output; + } + + protected boolean removeEldestEntry(Map.Entry<Key,Value> eldest) { + if (size() > capacity) { + output.output(eldest.getKey(), eldest.getValue()); + return true; + } + return false; + } + + @Override + public Value put(Key key, Value value) { + Value val = get(key); + if (val != null) { + value = fold.fold(val, value); + cacheHits++; + } else { + cacheMisses++; + } + super.put(key, value); + return null; + } + + public void flush() { + for (Map.Entry<Key,Value> e : entrySet()) { + output.output(e.getKey(), e.getValue()); + } + clear(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java index 731d02c..e682f2f 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java @@ -67,6 +67,8 @@ public class WikipediaInputFormat extends TextInputFormat { @Override public String[] getLocations() throws IOException, InterruptedException { + // for highly replicated files, returning all of the locations can lead to bunching + // TODO replace this with a subset of the locations return fileSplit.getLocations(); } http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java index 1ec531b..a06c57f 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java @@ -205,7 +205,7 @@ public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation> { * @return * @throws IOException */ - private Set<String> getTokens(Article article) throws IOException { + static Set<String> getTokens(Article article) throws IOException { Set<String> tokenList = new HashSet<String>(); WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); TermAttribute term = tok.addAttribute(TermAttribute.class); http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java index 4d94c24..25bf572 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java @@ -21,7 +21,6 @@ package org.apache.accumulo.examples.wikisearch.ingest; import java.io.IOException; -import java.io.StringReader; import java.nio.charset.Charset; import java.util.HashSet; import java.util.Map.Entry; @@ -31,17 +30,15 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.protobuf.Uid; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; -import org.apache.lucene.analysis.tokenattributes.TermAttribute; -import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -66,17 +63,171 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio private Text reverseIndexTableName = null; private Text metadataTableName = null; + private static class MutationInfo { + final String row; + final String colfam; + final String colqual; + final ColumnVisibility cv; + final long timestamp; + + public MutationInfo(String row, String colfam, String colqual, ColumnVisibility cv, long timestamp) { + super(); + this.row = row; + this.colfam = colfam; + this.colqual = colqual; + this.cv = cv; + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object obj) { + MutationInfo other = (MutationInfo)obj; + return (row == other.row || row.equals(other.row)) && + (colfam == other.colfam || colfam.equals(other.colfam)) && + colqual.equals(other.colqual) && + (cv == other.cv || cv.equals(other.cv)) && + timestamp == other.timestamp; + } + + @Override + public int hashCode() { + return row.hashCode() ^ colfam.hashCode() ^ colqual.hashCode() ^ cv.hashCode() ^ (int)timestamp; + } + } + + private LRUOutputCombiner<MutationInfo,CountAndSet> wikiIndexOutput; + private LRUOutputCombiner<MutationInfo,CountAndSet> wikiReverseIndexOutput; + private LRUOutputCombiner<MutationInfo,Value> wikiMetadataOutput; + + private static class CountAndSet + { + public int count; + public HashSet<String> set; + + public CountAndSet(String entry) + { + set = new HashSet<String>(); + set.add(entry); + count = 1; + } + } + + @Override - public void setup(Context context) { + public void setup(final Context context) { Configuration conf = context.getConfiguration(); tablename = new Text(WikipediaConfiguration.getTableName(conf)); indexTableName = new Text(tablename + "Index"); reverseIndexTableName = new Text(tablename + "ReverseIndex"); metadataTableName = new Text(tablename + "Metadata"); + final Text metadataTableNameFinal = metadataTableName; + final Text indexTableNameFinal = indexTableName; + final Text reverseIndexTableNameFinal = reverseIndexTableName; + numPartitions = WikipediaConfiguration.getNumPartitions(conf); + + LRUOutputCombiner.Fold<CountAndSet> indexFold = + new LRUOutputCombiner.Fold<CountAndSet>() { + @Override + public CountAndSet fold(CountAndSet oldValue, CountAndSet newValue) { + oldValue.count += newValue.count; + if(oldValue.set == null || newValue.set == null) + { + oldValue.set = null; + return oldValue; + } + oldValue.set.addAll(newValue.set); + if(oldValue.set.size() > GlobalIndexUidCombiner.MAX) + oldValue.set = null; + return oldValue; + } + }; + LRUOutputCombiner.Output<MutationInfo,CountAndSet> indexOutput = + new LRUOutputCombiner.Output<WikipediaPartitionedMapper.MutationInfo,CountAndSet>() { + + @Override + public void output(MutationInfo key, CountAndSet value) + { + Uid.List.Builder builder = Uid.List.newBuilder(); + builder.setCOUNT(value.count); + if (value.set == null) { + builder.setIGNORE(true); + builder.clearUID(); + } else { + builder.setIGNORE(false); + builder.addAllUID(value.set); + } + Uid.List list = builder.build(); + Value val = new Value(list.toByteArray()); + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); + try { + context.write(indexTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + LRUOutputCombiner.Output<MutationInfo,CountAndSet> reverseIndexOutput = + new LRUOutputCombiner.Output<WikipediaPartitionedMapper.MutationInfo,CountAndSet>() { + + @Override + public void output(MutationInfo key, CountAndSet value) + { + Uid.List.Builder builder = Uid.List.newBuilder(); + builder.setCOUNT(value.count); + if (value.set == null) { + builder.setIGNORE(true); + builder.clearUID(); + } else { + builder.setIGNORE(false); + builder.addAllUID(value.set); + } + Uid.List list = builder.build(); + Value val = new Value(list.toByteArray()); + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); + try { + context.write(reverseIndexTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + wikiIndexOutput = new LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,CountAndSet>(10000,indexFold,indexOutput); + wikiReverseIndexOutput = new LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,CountAndSet>(10000, indexFold,reverseIndexOutput); + wikiMetadataOutput = new LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,Value>(10000, + new LRUOutputCombiner.Fold<Value>() { + @Override + public Value fold(Value oldValue, Value newValue) { + return oldValue; + }}, + new LRUOutputCombiner.Output<MutationInfo,Value>() { + @Override + public void output(MutationInfo key, Value value) { + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, value); + try { + context.write(metadataTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + }}); } + + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + wikiIndexOutput.flush(); + wikiMetadataOutput.flush(); + wikiReverseIndexOutput.flush(); + } + + + @Override protected void map(Text language, Article article, Context context) throws IOException, InterruptedException { String NULL_BYTE = "\u0000"; @@ -93,13 +244,12 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio for (Entry<String,Object> entry : article.getFieldValues().entrySet()) { m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE); // Create mutations for the metadata table. - Mutation mm = new Mutation(entry.getKey()); - mm.put(METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); + MutationInfo mm = new MutationInfo(entry.getKey(), METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp()); + wikiMetadataOutput.put(mm, NULL_VALUE); } // Tokenize the content - Set<String> tokens = getTokens(article); + Set<String> tokens = WikipediaMapper.getTokens(article); // We are going to put the fields to be indexed into a multimap. This allows us to iterate // over the entire set once. @@ -118,30 +268,17 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE); // Create mutations for the global index - // Create a UID object for the Value - Builder uidBuilder = Uid.List.newBuilder(); - uidBuilder.setIGNORE(false); - uidBuilder.setCOUNT(1); - uidBuilder.addUID(Integer.toString(article.getId())); - Uid.List uidList = uidBuilder.build(); - Value val = new Value(uidList.toByteArray()); - - // Create mutations for the global index // Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object - Mutation gm = new Mutation(index.getValue()); - gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val); - context.write(indexTableName, gm); + MutationInfo gm = new MutationInfo(index.getValue(),index.getKey(),partitionId + NULL_BYTE + language, cv, article.getTimestamp()); + wikiIndexOutput.put(gm, new CountAndSet(Integer.toString(article.getId()))); // Create mutations for the global reverse index - Mutation grm = new Mutation(StringUtils.reverse(index.getValue())); - grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val); - context.write(reverseIndexTableName, grm); + MutationInfo grm = new MutationInfo(StringUtils.reverse(index.getValue()),index.getKey(),partitionId + NULL_BYTE + language, cv, article.getTimestamp()); + wikiReverseIndexOutput.put(grm, new CountAndSet(Integer.toString(article.getId()))); // Create mutations for the metadata table. - Mutation mm = new Mutation(index.getKey()); - mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); - + MutationInfo mm = new MutationInfo(index.getKey(),METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp()); + wikiMetadataOutput.put(mm, NULL_VALUE); } // Add the entire text to the document section of the table. // row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document @@ -153,40 +290,4 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio } context.progress(); } - - /** - * Tokenize the wikipedia content - * - * @param article - * @return - * @throws IOException - */ - private Set<String> getTokens(Article article) throws IOException { - Set<String> tokenList = new HashSet<String>(); - WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); - TermAttribute term = tok.addAttribute(TermAttribute.class); - try { - while (tok.incrementToken()) { - String token = term.term(); - if (!StringUtils.isEmpty(token)) - tokenList.add(token); - } - } catch (IOException e) { - log.error("Error tokenizing text", e); - } finally { - try { - tok.end(); - } catch (IOException e) { - log.error("Error calling end()", e); - } finally { - try { - tok.close(); - } catch (IOException e) { - log.error("Error closing tokenizer", e); - } - } - } - return tokenList; - } - }