I am working on a large application where I need to pass a big array of
intwritables from mapper to reducer. Obviously I want to use combiner and I
am extending ArrayWritable. Here is the signature of my mapper, reducer and
IntArrayWritable
*Mapper:*
public class AppMapper extends Mapper<LongWritable, Text, Text,
IntArrayWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
context.write(outkey, new IntArrayWritable(array_of_intwritables));
}
}
*Reducer:*
public class AppReducer extends Reducer<Text, IntArrayWritable, Text, Text>
{
@Override
public void reduce(Text key, Iterable<IntArrayWritable> values, Context
context) throws IOException, InterruptedException {
context.write(key, new Text(some_stuff));
*IntArrayWritable:*
public class IntArrayWritable extends ArrayWritable
{
public IntArrayWritable() {
super(IntWritable.class);
}
public IntArrayWritable(IntWritable[] values) {
super(IntWritable.class, values);
}
}
*Here is how my Job looks like:*
private Job AppRunner(Path inputPath, Path outputPath, Configuration
conf) throws IOException {
Job job = new Job(conf);
job.setJobName(String.format("App Job: %s => %s", inputPath,
outputPath));
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntArrayWritable.class);
job.setMapperClass(CommonNeighborsMapper.class);
job.setCombinerClass(CommonNeighborsReducer.class);
job.setReducerClass(CommonNeighborsReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(28);
return job;
}
*
Error Trace:
*java.io.IOException: Spill failed
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1070)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1051)
at java.io.DataOutputStream.writeInt(DataOutputStream.java:183)
at org.apache.hadoop.io.IntWritable.write(IntWritable.java:42)
at org.apache.hadoop.io.ArrayWritable.write(ArrayWritable.java:98)
at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:90)
at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:77)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:926)
at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:574)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.test.app.AppMapper.map(AppMapper.java:67)
at com.test.app.neighbors.AppMapper.map(AppMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
Caused by: java.io.IOException: wrong value class: class
org.apache.hadoop.io.Text is not class
com.test.app.mr.writables.IntArrayWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:175)
at
org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1042)
at
org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1363)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.test.app.neighbors.AppReducer.reduce(AppReducer.java:54)
at com.test.app.neighbors.AppReducer.reduce(AppReducer.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at
org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1384)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1291)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:712)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1199)
*
*I think the error is due to using combiner. Since combiner is output data
in Text and Reducer is expecting IntArrayWritable. If I remove combiner
everything works. What am I doing wrong and how can I get the combiner to
work? Any help is greatly appreciated.
*
*--
Vipul Sharma
sharmavipul AT gmail DOT com