Hello hadoop users.
I am trying to implement a mapreduce KMeans algorithm using hadoop.
The problem i have is that the code does not enter the map and reduce
class. I'm running the application from Intellij Idea not using hadoop
binary.
The rest of the email is a sample of my code. If someone can see
something that could help that would be greatly appreciated.
Thanks in advance.
Here is my driver code:
Job job = Job.getInstance(conf); job.setJobName("kmeans");
job.setJarByClass(KMeans.class); FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output); job.setMapperClass(KMeansMapper.class);
job.setReducerClass(KMeansReducer.class);job.setMapOutputKeyClass(PointVector.class);
job.setMapOutputValueClass(PointVector.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);job.waitForCompletion(true);
And below are my map and reduce classes:
public class KMeansMapperextends Mapper<LongWritable, Text, PointVector,
PointVector> {
private int clusters; private List<ImmutableTriple<Integer, String,
PointVector>>centers; @Override protected void setup(Context context)throws
IOException, InterruptedException {
System.out.println("Inside setup"); this.clusters =
Integer.valueOf(context.getConfiguration().get("clusters")); this.centers =new ArrayList<>();
BufferedReader br =new BufferedReader(new FileReader("/home/denis/centers")); for(int i =0; i <clusters;
i++) {
centers.add(DocumentRecordParser.parse(br.readLine())); }
br.close(); }
@Override public void map(LongWritable key, Text value, Context
context)throws IOException, InterruptedException {
PointVector line = DocumentRecordParser.returnPointVector(value.toString());
System.out.println("Inside map!"); double minDist = Double.MAX_VALUE; double
dist;PointVector index =null; EuclideanDistance ed =new EuclideanDistance(); for
(ImmutableTriple<Integer, String, PointVector> c :centers) {
dist = ed.compute(line.points(), c.right.points()); if (dist <
minDist) {
minDist = dist; index = c.right; }
}
context.write(index, line); }
}
public class KMeansReducerextends Reducer<PointVector, PointVector, Text, Text>
{
private double min_dist = Double.MAX_VALUE; @Override public void
reduce(PointVector center, Iterable<PointVector> points, Context context)throws
IOException, InterruptedException {
EuclideanDistance measure =new EuclideanDistance(); double distance
=0.0; int numOfPoints =0;double diff =0.0; PointVector newCenter =null; double
[] sums =new double[center.size()]; for (PointVector p : points) {
distance += measure.compute(center.points(), p.points()); if (distance
<min_dist) {
min_dist = distance; newCenter = p; }
numOfPoints++; sums = MathArrays.ebeAdd(p.points(), sums);}
for (int i =0; i < sums.length; i++) {
sums[i] = sums[i] / numOfPoints; }System.out.println("Old center " + center
+" new center: " + newCenter); context.write(new Text(newCenter.toString()), new Text(new
PointVector(sums).toString())); }
}
Last but not least my custom data structure class PointVector
public class PointVectorimplements WritableComparable<PointVector> {
/** * Keep the tfIdf values of the terms of a document */ private
Vector<Double>data =new Vector<>(); public PointVector(double [] values) {
this.data =new Vector<>(values.length);
this.data.addAll(Doubles.asList(values)); }
public PointVector(List<Double> values) {
this.data =new Vector<>(values.size()); this.data.addAll(values); }
public PointVector(String [] values) {
this.data =new Vector<>(values.length); for (String s: values) {
this.data.add(Double.valueOf(s)); }
}
public PointVector() {
this.data =new Vector<>(); }
public double[]points() {
return Doubles.toArray(data); }
/** * Subtract the values of this vector from the PointVector passed as
argument * @param subtracted * @return */ public PointVectorsub(PointVector subtracted) {
int N =this.data.size(); double [] vals =new double[N]; for (int i =0; i
< N; i++) {
vals[i] =this.data.get(i) - subtracted.get(i); }
return new PointVector(vals); }
public PointVectoradd(PointVector vec) {
int N =this.data.size(); double [] vals =new double[N]; for (int i =0; i
< N; i++) {
vals[i] =this.data.get(i) + vec.get(i); }
return new PointVector(vals); }
/** * Compute the dot product of this vector with the one passed as
argument * @param vector * @return */ public double dotProduct(PointVector vector) {
int N =this.data.size(); double sum =0.0; for (int i =0; i < N; i++) {
sum +=this.data.get(i) * vector.get(i); }
return sum; }
@Override public int compareTo(PointVector pointVector) {
return 0; }
@Override public void write(DataOutput dataOutput)throws IOException {
dataOutput.writeInt(data.size()); for (double d :data) {
dataOutput.writeDouble(d); }
}
@Override public void readFields(DataInput dataInput)throws IOException {
int s = dataInput.readInt(); // read the size of the vector }
public Doubleget(int i) {
return this.data.get(i); }
public int size() {
return this.data.size(); }
@Override public StringtoString() {
if (data.isEmpty()) {
return "[]"; }
StringBuilder sb =new StringBuilder(); sb.append("["); for (double d
:data) {
sb.append(d); sb.append(", "); }
final int pos = sb.lastIndexOf(","); sb.delete(pos, pos +1);
sb.append("]"); return sb.toString(); }
}