First time poster, I hope I'm not breaking any rule and if I am please tell me
:)
I am trying to perform "total order" sorting on data previously stored on a
pseudo-distributed HDFS (running with YARN) into a SequenceFile (keys:
IntWritable, vals: Text). The SequenceFile, stored at /cnts/part-r-00000 to
part-r-00006, contains 44 records altogether.
bin/hdfs dfs -text /cnts* prints 44 lines like:
2 314 | 12 | 21
2 700 | 12 | 21
1 700 | 12 | 28
1 2 | 420 | 11120
2 2 | 11 | 3
2 700 | 11 | 3
1 700 | 12 | 19
(...)
1 314 | 12 | 30
3 314 | 420 | 6
3 700 | 420 | 6
3 2 | 420 | 6
1 2 | 421 | 36
I run the job as follows, within main:
int exitCode = -1;
if ("sort".equals(args[0])) {
exitCode = ToolRunner.run(new Configuration(),
new Sorter(),
Arrays.copyOfRange(args,1,args.length));
}
Sorter extends Configured, and implements Tool as follows:
public int run(String[] args) throws Exception {
(...)
//Set job-specific params
job.setJobName("sort");
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
// Setup and run subset sampling
Double p = .1;
int maxNbSamples = 10;
int maxNbSplitsRead = 3;
InputSampler.Sampler<IntWritable, Text> sampler =
new InputSampler.RandomSampler<IntWritable,Text>(p,
maxNbSamples,
maxNbSplitsRead);
InputSampler.writePartitionFile(job, sampler);
// Submit the job & poll for progress until it completes
return job.waitForCompletion(true) ? 0 : -1;
}
I use the default "identity" Mapper and Reducer classes together with
TotalOrderPartitioner and a RandomInputSampler to configure the partitioning.
It works fine so long as I only use a single reducer task but becomes unstable
when -Dmpareduce.job.reduces=N with N > 1.
By unstable, I mean that it apparently randomly
* terminates successfully;
* fails with ArrayIndexOutOfBoundsException @ InputSampler.writePartitionFile()
and exits;
* or fails with IllegalArgumentException: Can't read partitions file @
TotalOrderPArtitioner.setConf() and crashes my machine.
The probability distribution of these 3 outcomes seems to vary slightly if I
change the values passed to the InputSampler constructor and the number of
reduce tasks, but the tool is never stable.
Can anyone shed light on this at all?