Henry,
Something like:
@Override
public void run(Context context) throws IOException, InterruptedException {
// Do stuff here you'd do in setup(…) otherwise.
// Now begin iterating.
while (context.nextKey()) {
// Run your reducing function here. Like the following maybe.
reduce(context.getCurrentKey(), context.getValues(), context);
// Since you are now in a regular loop. Break as necessary
whenever you want. Your logic.
}
// Do stuff here you'd otherwise do in cleanup(context);
}
On Fri, Mar 9, 2012 at 11:32 PM, Henry Helgen <[email protected]> wrote:
> Thanks, the presentation is helpful. I am using the new API's context
> objects, but am not familiar with how to use the run method to accomplish
> this. Do you have some code ideas or examples?
>
> job code follows. Is this right?
> -------------------------------------------------------------------------------------------
> public static void main(String[] args) throws Exception {
> /**
> * The main accepts the input and output directories as command line
> * parameters. Then it defines the job classes. The main submits two
> jobs
> * The first job filters out languages and sums the pagecounts. The
> * second job sorts by pagecount descending then selects the top 50.
> * @param key, value, context provided by Job.setReducerClass()
> * @return none
> * @throws IOException, InterruptedException
> */
>
> //configure the first job
> Configuration conf1 = new Configuration();
> String[] otherArgs1 = new GenericOptionsParser(conf1,
> args).getRemainingArgs();
> if (otherArgs1.length != 2) {
> System.err.println("Usage: programname <in> <out>");
> System.exit(2);
> }//end if args
> Job job1 = new Job(conf1, "Wiki Counts part 1");
> job1.setJarByClass(WikiCounts.class);
> job1.setMapperClass(LineTokenMapper.class); //custom mapper job 1
> job1.setCombinerClass(LongSumReducer.class);
> job1.setReducerClass(LongSumReducer.class);
>
> //Set reducers White p193
> job1.setNumReduceTasks(32);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(LongWritable.class);
>
> //Sequence File and Compression White p233.
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> SequenceFileOutputFormat.setCompressOutput(job1, true);
> SequenceFileOutputFormat.setOutputCompressorClass(job1,
> GzipCodec.class);
> SequenceFileOutputFormat.setOutputCompressionType(job1,
> CompressionType.BLOCK);
>
> FileInputFormat.addInputPath(job1, new Path(otherArgs1[0]));
> FileOutputFormat.setOutputPath(job1, new Path("out1in2batch"));
>
>
> //configure the second job
> Configuration conf2 = new Configuration();
> String[] otherArgs2 = new GenericOptionsParser(conf2,
> args).getRemainingArgs();
> if (otherArgs2.length != 2) {
> System.err.println("Usage: programname <in> <out>");
> System.exit(2);
> }//end if args
> Job job2 = new Job(conf2, "Wiki Counts part 2");
> job2.setJarByClass(WikiCounts.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> job2.setMapperClass(InverseMapper.class);
> job2.setReducerClass(InverseTopReducer.class); //custom reducer job 2
> job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
> FileInputFormat.addInputPath(job2, new Path("out1in2batch"));
> FileOutputFormat.setOutputPath(job2, new Path(otherArgs2[1]));
>
> //run the jobs in order
> job1.waitForCompletion(true);
> job2.waitForCompletion(true);
> System.exit(job2.waitForCompletion(true) ? 0 : 1);
> }//end main
>
> }//end class
> ********************************************************************
>
> ----------------------------------------------------------------------
>
> Hello Henry,
>
> Per the older conversation, what Owen was pointing to were the new API
> Mapper/Reducer classes, and its run(�) method override specifically:
> http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/mapreduce/Reducer.html#run(org.apache.hadoop.mapreduce.Reducer.Context)
>
> You'll need to port your job to the new (still a bit unstable) API to
> leverage this. Here are some slides to aid you in that task:
> http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api (The
> first part, from Owen).
>
>
>
> On Thu, Mar 8, 2012 at 5:02 PM, Henry Helgen <[email protected]> wrote:
>>
>> I am using hadoop 0.20.2 mapreduce API. The program is running fine, just
>> slower than it could.
>>
>> I sum values and then use
>> job.setSortComparatorClass(LongWritable.DecreasingComparator.class) to sort
>> descending by sum. I need to stop the reducer after outputting the first N
>> records. This would save the reducer from running over thousands of records
>> when it only needs the first few records. Is there a solution with the new
>> mapreduce 0.20.2 API?
>>
>> -------------------------------------------------------------------
>> I notice messages from 2008 about this topic:
>>
>> http://grokbase.com/t/hadoop/common-user/089420wvkx/stop-mr-jobs-after-n-records-have-been-produced
>>
>> https://issues.apache.org/jira/browse/HADOOP-3973
>>
>> The last statement follows, but the link is broken.
>> "You could do this pretty easily by implementing a custom MapRunnable.
>> There is no equivalent for reduces. The interface proposed in
>> HADOOP-1230 would support that kind of application. See:
>>
>> http://svn.apache.org/repos/asf/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/
>> Look at the new Mapper and Reducer interfaces."
>>
>
--
Harsh J