ACCUMULO-375 git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1243961 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/0e1e67db Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/0e1e67db Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/0e1e67db Branch: refs/heads/master Commit: 0e1e67dba93a200297da959823526d2264a85eab Parents: b4f3087 Author: Adam Fuchs <afu...@apache.org> Authored: Tue Feb 14 14:46:37 2012 +0000 Committer: Adam Fuchs <afu...@apache.org> Committed: Tue Feb 14 14:46:37 2012 +0000 ---------------------------------------------------------------------- .../ingest/WikipediaPartitionedIngester.java | 30 ++++++++++++++------ .../output/SortingRFileOutputFormat.java | 6 +++- 2 files changed, 26 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/0e1e67db/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java index ca9af6a..bcdee43 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner; @@ -58,9 +59,12 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; public class WikipediaPartitionedIngester extends Configured implements Tool { - + + private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class); + public final static String INGEST_LANGUAGE = "wikipedia.ingest_language"; public final static String SPLIT_FILE = "wikipedia.split_file"; public final static String TABLE_NAME = "wikipedia.table"; @@ -150,7 +154,7 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { return 0; } - public int runPartitionerJob() throws Exception + private int runPartitionerJob() throws Exception { Job partitionerJob = new Job(getConf(), "Partition Wikipedia"); Configuration partitionerConf = partitionerJob.getConfiguration(); @@ -191,7 +195,7 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { return partitionerJob.waitForCompletion(true) ? 0 : 1; } - public int runIngestJob() throws Exception + private int runIngestJob() throws Exception { Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia"); Configuration ingestConf = ingestJob.getConfiguration(); @@ -221,6 +225,16 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { if(WikipediaConfiguration.bulkIngest(ingestConf)) { + ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class); + SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf)); + String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf); + if(bulkIngestDir == null) + { + log.error("Bulk ingest dir not set"); + return 1; + } + SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf)); + } else { ingestJob.setOutputFormatClass(AccumuloOutputFormat.class); String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf); String instanceName = WikipediaConfiguration.getInstanceName(ingestConf); @@ -228,16 +242,12 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { byte[] password = WikipediaConfiguration.getPassword(ingestConf); AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename); AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers); - } else { - ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class); - SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf)); - SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf)); } return ingestJob.waitForCompletion(true) ? 0 : 1; } - public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException + private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { Configuration conf = getConf(); @@ -253,7 +263,9 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { if(status.isDir() == false) continue; Path dir = status.getPath(); - connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(), true); + Path failPath = new Path(failureDirectory+"/"+dir.getName()); + fs.mkdirs(failPath); + connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(), true); } return 0; http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/0e1e67db/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java index f556287..d8c57c2 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java @@ -4,6 +4,7 @@ import java.io.IOException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -12,9 +13,12 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> { - + + private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class); + public static final String PATH_NAME = "sortingrfileoutputformat.path"; public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";