Hi Subroto,
It's very kind of u to help me.
I really appreciate it.
Now I attach the source code of my program.
It's about find neighbors in a gragh.
Such as gragh " A---B-----C", we define node B is the first neighbor of node A
while C is the second
The program is to find the first and second neighbors of A.
That's the scenario.
Node A is in the key file and the gragh(s) is in the database file.
Key file is given by users. Each line of key file is a nodename like
A
B
C
Database file is in hdfs servers ( in hard disk). Each line is an edge with 2
nodes like
A B 0.5
B C 0.7
0.5 and 0.7 are weights of the edges.
I can get B according to the key A and get C according to B by mapreduce
But I want to get B and C according to the key A.
Now the program attached can finish my job, however I don't think it's good
enough.
It's too time cost. It's not the program I wanted.
Maybe u can help me.
Thanks very much!
--
Regards!
Jun Tan
At 2011-08-09 19:16:17,"Subroto Sanyal" <[email protected]> wrote:
Hi Jun
I mean that if I get some strings in mapper and I want to use them in reducer.
But they are neither keys nor values.
As per my understanding, there is no such way to pass an arbitrary reference
from Mapper to Reducer.
The information written in Output from Mapper is available to Reducer.
Further more, I don’t feel it will be good idea to keep such dependency.
Please let me know more about your scenario…may be we/community can suggest
some solution…
By the way, Can reducer get side files in cache?
Please let me know about “Side Files”…..
Regards,
Subroto Sanyal
From:谭军 [mailto:[email protected]]
Sent: Tuesday, August 09, 2011 12:25 PM
To:[email protected];[email protected]
Subject: Re:RE: Can reducer get parameters from mapper besides key and value?
Hi Subroto,
I mean that if I get some strings in mapper and I want to use them in reducer.
But they are neither keys nor values.
By the way, Can reducer get side files in cache?
--
Regards!
Jun Tan
At 2011-08-09 14:42:10,"Subroto Sanyal" <[email protected]> wrote:
Hi Jun,
What is the file, list, string [] in this context?
I mean to say which file or list or string[].
The new MR (org.apache.hadoop.mapreduce.*) APIs has a parameter “context”.
Request you to browse through the APIs of Context (Inherited from
JobContext->TaskAttemptContext).
The context parameter may provide you the reference you need.
Regards,
Subroto Sanyal
From:谭军 [mailto:[email protected]]
Sent: Tuesday, August 09, 2011 11:58 AM
To: mapreduce
Subject: Can reducer get parameters from mapper besides key and value?
Hi,
Can reducer gets parameters from mapper besides key and value?
Such as files, lists, string[] etc.
Thanks
--
Regards!
Jun Tan
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import java.net.URI;
import java.net.URISyntaxException;
@SuppressWarnings("deprecation")
public class Retrieval {
public static void main(String[] args) throws IOException,
URISyntaxException {
if (args.length != 3) {
System.err
.println("Usage: Retrieval <protein set
path> <database path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(new Configuration(),
Retrieval.class);
conf.setJobName("Retrieval");
DistributedCache.addCacheFile(new URI(args[0]), conf);
DistributedCache.addCacheFile(new URI(args[1]), conf);
FileInputFormat.addInputPath(conf, new Path(args[1]));
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
conf.setMapperClass(RetrievalMapper.class);
conf.setReducerClass(RetrievalReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
JobClient.runJob(conf);
}
}
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
@SuppressWarnings("deprecation")
public class RetrievalMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
private Path[] localFiles;
public void configure(JobConf conf) {
try {
this.localFiles =
DistributedCache.getLocalCacheFiles(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String line = value.toString();
LinkedList<String> list = new LinkedList<String>();//store the
first neighbors
BufferedReader proReader = new BufferedReader(new
FileReader(this.localFiles[0].toString()));
String proID = new String("");
String[] proteinIDs = line.split("\t");
String tmpString = proteinIDs[0] + "\t" + proteinIDs[1];
// find destination proteins and get their first neighbors
// first neighbor as the key,line as the value send to reducer
while ((proID = proReader.readLine()) != null) {// for each
line (protein ID) in key file
if(proID.equalsIgnoreCase(proteinIDs[0])){// hit and
proteinIDs[1] is its first neighbor
output.collect(new Text(tmpString), new
Text(proteinIDs[2]));
list.add(proteinIDs[1]); // add first
neighbor to list
}
if(proID.equalsIgnoreCase(proteinIDs[1])){// hit and
proteinIDs[0] is its first neighbor
output.collect(new Text(tmpString), new
Text(proteinIDs[2]));
list.add(proteinIDs[0]); // add first
neighbor to list
}
}
proReader.close();
// find second neighbors
@SuppressWarnings("rawtypes")
Iterator iter = list.iterator();
String ids;
while(iter.hasNext()){
Object obj = iter.next();
proID = obj.toString();
String tmp;
BufferedReader dbReader = new BufferedReader(new
FileReader(this.localFiles[1].toString()));
while((tmp = dbReader.readLine()) != null){
proteinIDs = tmp.split("\t");
ids = proteinIDs[0] + "\t" + proteinIDs[1];
if(proID.equalsIgnoreCase(proteinIDs[0])){
output.collect(new Text(ids), new
Text(proteinIDs[2]));
}
if(proID.equalsIgnoreCase(proteinIDs[1])){
output.collect(new Text(ids), new
Text(proteinIDs[2]));
}
}
dbReader.close();
}
}
}
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
@SuppressWarnings("deprecation")
public class RetrievalReducer extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter report)
throws IOException {
Object obj = null;
while(values.hasNext()){
obj = values.next();
}
output.collect(key, (Text) obj);
}
}