ACCUMULO-375 hybridized ingest to use some bulk and some streaming git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1245142 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/ec56d2d4 Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/ec56d2d4 Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/ec56d2d4 Branch: refs/heads/master Commit: ec56d2d429ebf4bd849daf71804021536f4d21ee Parents: 0e1e67d Author: Adam Fuchs <afu...@apache.org> Authored: Thu Feb 16 19:54:31 2012 +0000 Committer: Adam Fuchs <afu...@apache.org> Committed: Thu Feb 16 19:54:31 2012 +0000 ---------------------------------------------------------------------- .../ingest/WikipediaPartitionedMapper.java | 18 +++++++++++++++--- .../output/BufferingRFileRecordWriter.java | 9 +++++---- 2 files changed, 20 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ec56d2d4/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java index 25bf572..7816b03 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java @@ -26,6 +26,9 @@ import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; @@ -112,6 +115,7 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio } } + MultiTableBatchWriter mtbw; @Override public void setup(final Context context) { @@ -121,6 +125,14 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio reverseIndexTableName = new Text(tablename + "ReverseIndex"); metadataTableName = new Text(tablename + "Metadata"); + try { + mtbw = WikipediaConfiguration.getConnector(conf).createMultiTableBatchWriter(10000000, 1000, 10); + } catch (AccumuloException e) { + throw new RuntimeException(e); + } catch (AccumuloSecurityException e) { + throw new RuntimeException(e); + } + final Text metadataTableNameFinal = metadataTableName; final Text indexTableNameFinal = indexTableName; final Text reverseIndexTableNameFinal = reverseIndexTableName; @@ -163,7 +175,7 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio Mutation m = new Mutation(key.row); m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); try { - context.write(indexTableNameFinal, m); + mtbw.getBatchWriter(indexTableNameFinal.toString()).addMutation(m); } catch (Exception e) { throw new RuntimeException(e); } @@ -189,7 +201,7 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio Mutation m = new Mutation(key.row); m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); try { - context.write(reverseIndexTableNameFinal, m); + mtbw.getBatchWriter(reverseIndexTableNameFinal.toString()).addMutation(m); } catch (Exception e) { throw new RuntimeException(e); } @@ -210,7 +222,7 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio Mutation m = new Mutation(key.row); m.put(key.colfam, key.colqual, key.cv, key.timestamp, value); try { - context.write(metadataTableNameFinal, m); + mtbw.getBatchWriter(metadataTableNameFinal.toString()).addMutation(m); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ec56d2d4/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java index a7e7dcf..579bbe1 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java @@ -69,8 +69,8 @@ final class BufferingRFileRecordWriter extends RecordWriter<Text,Mutation> { if (buffer.size() == 0) return; - // TODO fix the filename String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf"; + // TODO get the table configuration for the given table? FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf); // forget locality groups for now, just write everything to the default @@ -110,17 +110,18 @@ final class BufferingRFileRecordWriter extends RecordWriter<Text,Mutation> { { Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted()); Value v = new Value(update.getValue()); + // TODO account for object overhead mutationSize += k.getSize(); mutationSize += v.getSize(); buffer.put(k, v); } size += mutationSize; long bufferSize = bufferSizes.get(table); + + // TODO use a MutableLong instead bufferSize += mutationSize; bufferSizes.put(table, bufferSize); - - // TODO add object overhead size - + while (size >= maxSize) { flushLargestTable(); }