ACCUMULO-375 fixed bugs in job setup git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1241624 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/410c1d5b Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/410c1d5b Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/410c1d5b Branch: refs/heads/1.4.5-SNAPSHOT Commit: 410c1d5bfc15d63547d6394e569762e3310f2a36 Parents: a673727 Author: Adam Fuchs <afu...@apache.org> Authored: Tue Feb 7 21:02:36 2012 +0000 Committer: Adam Fuchs <afu...@apache.org> Committed: Tue Feb 7 21:02:36 2012 +0000 ---------------------------------------------------------------------- .../wikisearch/ingest/WikipediaPartitionedIngester.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/410c1d5b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java index e7493dc..43f5e29 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java @@ -173,6 +173,8 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { // setup output format partitionerJob.setMapOutputKeyClass(Text.class); partitionerJob.setMapOutputValueClass(Article.class); + partitionerJob.setOutputKeyClass(Text.class); + partitionerJob.setOutputValueClass(Article.class); partitionerJob.setOutputFormatClass(SequenceFileOutputFormat.class); Path outputDir = WikipediaConfiguration.getPartitionedArticlesPath(partitionerConf); SequenceFileOutputFormat.setOutputPath(partitionerJob, outputDir); @@ -186,6 +188,8 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { Configuration ingestConf = ingestJob.getConfiguration(); ingestConf.set("mapred.map.tasks.speculative.execution", "false"); + configureIngestJob(ingestJob); + String tablename = WikipediaConfiguration.getTableName(ingestConf); String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf); @@ -199,6 +203,9 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { createTables(tops, tablename); + ingestJob.setMapperClass(WikipediaPartitionedMapper.class); + ingestJob.setNumReduceTasks(0); + // setup input format ingestJob.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.setInputPaths(ingestJob, WikipediaConfiguration.getPartitionedArticlesPath(ingestConf)); @@ -227,6 +234,11 @@ public class WikipediaPartitionedIngester extends Configured implements Tool { conf.set(AggregatingRecordReader.START_TOKEN, "<page>"); conf.set(AggregatingRecordReader.END_TOKEN, "</page>"); } + + protected void configureIngestJob(Job job) { + job.setJarByClass(WikipediaPartitionedIngester.class); + job.setInputFormatClass(WikipediaInputFormat.class); + } protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");