Hi,
The following code creates a cross product between two files. If you for
same file specify the same file in arguments.
package com.example.hadoopexamples.joinnew;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class JoinMapper extends Mapper<LongWritable, Text, Text,
NullWritable> {
private List<String> inputWords;
private String secondFilePath ;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
secondFilePath = context.getConfiguration().get("secondFilePath");
inputWords = new ArrayList<String>();
}
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
List<String> inputWordList = getWords(value.toString());
inputWords.addAll(inputWordList);
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fsDataInputStream = fs.open(new Path(secondFilePath));
BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(fsDataInputStream));
String line;
while((line= bufferedReader.readLine())!=null)
{
System.out.println("inside while");
List<String> words = getWords(line);
for(String word : words)
{
System.out.println("inside first loop");
for(String inputWord : inputWords)
{
if(!inputWord.equals(word))
{
Text pair = new Text(word+","+inputWord);
context.write(pair, NullWritable.get());
}
}
}
}
}
private List<String> getWords(String inputLine)
{
List<String> words = new ArrayList<String>();
StringTokenizer stringTokenizer = new StringTokenizer(inputLine.toString());
while(stringTokenizer.hasMoreTokens())
{
String token = stringTokenizer.nextToken();
words.add(token);
}
return words;
}
}
*Driver class*
*
*
package com.example.hadoopexamples.joinnew;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JoinTester
{
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException
{
Configuration configuration = new Configuration();
configuration.set("secondFilePath", args[1]);
Job job=new Job(configuration);
job.setMapperClass(JoinMapper.class);
job.setJarByClass(JoinTester.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(Reducer.class);
//job.setOutputValueGroupingComparator(FirstComparator.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.waitForCompletion(true);
}
}
This code streams second file through HDFS API's. I am not sure this is the
best way for computing cross join. I just followed the
method described here
http://search-hadoop.com/m/FNqzV1DrOEp/cross+product&subj=Re+Cross+Join.
Can any one point to better cross join implementation ?
On Tue, Apr 3, 2012 at 5:47 AM, Praveen Kumar K J V S <
[email protected]> wrote:
> Hey Phatak,
>
> With your way the mapper o/p will be some thing like
>
> <W1_W2 0>
> <W1_W3 1>
> <W1_W4 0>
> ......
>
> Bu then this way I will miss the pair <W3_W5 0>
>
> Correct me if I am wrong.
>
> Thanks,
> Praveen
>
>
> On Mon, Apr 2, 2012 at 12:52 PM, madhu phatak <[email protected]>wrote:
>
>> Hi,
>> Yes using Map/Reduce its possible .
>>
>> 1. In Mapper,
>> Read the words in line and make pair with calculated distance. Output
>> key of mapper will be the this word pair and value will be distance
>>
>> 2.In Reducer,
>> Just sum the distance for each pair.
>>
>> On Wed, Mar 28, 2012 at 7:42 AM, Praveen Kumar K J V S <
>> [email protected]> wrote:
>>
>>> Hi All,
>>>
>>> I have a file in HDFS spanning across many blocks. Say the file has many
>>> words in it from W1, W2 , W3 ...Wn.
>>>
>>> I want to find the edit distance between all pairs of words. Is this is
>>> possible in Mapreduce.
>>>
>>> For example I have 2 blocks for the file: And an edit distance function
>>> which returns value between 0 to 1(0 means two words are very close t0 each
>>> other). Say edit distance between even and odd words is 1
>>>
>>> Block 1 has the words W1, W2, W3, W4
>>> Block 2 has the words W1, W2, W5, W6
>>>
>>> Now is there a way to find all pairs edit distance. If so how?
>>>
>>> Thanks,
>>> Praveen
>>>
>>
>>
>>
>> --
>> https://github.com/zinnia-phatak-dev/Nectar
>>
>>
>
--
https://github.com/zinnia-phatak-dev/Nectar