Hi JunTun,
1. Distributed Cache in new API usage:
// Setting up the cache for the application
1. Copy the requisite files to the FileSystem:
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
2. Setup the application's JobConf:
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. Use the cached files in the Mapper
<http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Mapper.html>
or Reducer
<http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Reducer.html>:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {
private Path[] localArchives;
private Path[] localFiles;
public void configure(JobConf job) {
// Get the cached archives/files
localArchives = DistributedCache.getLocalCacheArchives(job);
localFiles = DistributedCache.getLocalCacheFiles(job);
}
public void map(K key, V value,
OutputCollector<K, V> output, Reporter reporter)
throws IOException {
// Use data from the cached archives/files here
// ...
// ...
output.collect(k, v);
}
}
2. without distributed cache in simple terms if you are interested i can
help you with the code.
2011/9/23 谭军 <[email protected]>
> Hi Swathi.V.,
> I think my code below would work:
>
> Configuration conf1 = new Configuration();
> Job job1 = new Job(conf1, "Retrieval1");
> job1.setJarByClass(Retrieval.class);
> job1.addCacheFile(new URI(args[0])); // problem here
> conf1.set("keyNodeFile", args[0]); //try to set key node
> file path and get file path in mapper1
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(Text.class);
> job1.setMapperClass(RetrievalMapper.class);
> job1.setReducerClass(RetrievalReducer.class);
> FileInputFormat.addInputPath(job1, new Path(args[1]));
> String out = args[2] + System.nanoTime();
>
> FileOutputFormat.setOutputPath(job1, new Path(out));
> job1.waitForCompletion(true);
>
> Configuration conf2 = new Configuration();
> Job job2 = new Job(conf2, "Retrieval2");
> job2.setJarByClass(Retrieval.class);
> conf2.set("newKeyNodeFile", out); // try to set new key node file
> path and get it in mapper2
> DistributedCache.addCacheFile(new URI(out)); // problem here
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(Text.class);
> job2.setMapperClass(RetrievalMapper2.class);
> job2.setReducerClass(RetrievalReducer2.class);
> FileInputFormat.addInputPath(job2, new Path(args[1]));
> FileOutputFormat.setOutputPath(job2, new Path(args[2]));
> System.exit(job2.waitForCompletion(true) ? 0 : 1);
>
> But nullpointer exception was reported when I tried to get file by using
> distributed cache file.
> How to use distributed cache file in new APIs ?
> I also tried to deliver file path by setting global parameters, however,
> failed either.
> How can I read "args[0]" file in mapper1 and intermediate file in mapper2
> use new APIs?
> Thanks!
>
>
> --
>
> Regards!
>
> Jun Tan
>
> At 2011-09-23 19:06:50,"Swathi V" <[email protected]> wrote:
>
> Hi Jun Tan,
>
> Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive
> Guide has job dependency examples for 0.20.x.
>
> Thank You,
>
> 2011/9/23 谭军 <[email protected]>
>
>> Swathi.V.,
>> ControlledJob cannot be resolved in my eclipse.
>> My hadoop version is 0.20.2
>> ControlledJob can only be resolved in hadoop 0.21.0 (+)?
>> Or I need some certain plugins?
>> Thanks
>>
>> --
>>
>> Regards!
>>
>> Jun Tan
>>
>> At 2011-09-22 00:56:54,"Swathi V" <[email protected]> wrote:
>>
>>
>> Hi,
>>
>> This code might help you
>> //JobDependancies.java snippet
>>
>> Configuration conf = new Configuration();
>> Job job1 = new Job(conf, "job1");
>> job1.setJarByClass(JobDependancies.class);
>> job1.setMapperClass(WordMapper.class);
>> job1.setReducerClass(WordReducer.class);
>> job1.setOutputKeyClass(Text.class);
>> job1.setOutputValueClass(IntWritable.class);
>> FileInputFormat.addInputPath(job1, new Path(args[0]));
>> String out=args[1]+System.nanoTime();
>> FileOutputFormat.setOutputPath(job1, new Path(out));
>>
>>
>>
>> Configuration conf2 = new Configuration();
>> Job job2 = new Job(conf2, "job2");
>> job2.setJarByClass(JobDependancies.class);
>> job2.setOutputKeyClass(IntWritable.class);
>> job2.setOutputValueClass(Text.class);
>> job2.setMapperClass(SortWordMapper.class);
>> job2.setReducerClass(Reducer.class);
>> FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
>> FileOutputFormat.setOutputPath(job2, new Path(args[1]));
>>
>> ControlledJob controlledJob1 = new
>> ControlledJob(job1.getConfiguration());
>> ControlledJob controlledJob2 = new
>> ControlledJob(job2.getConfiguration());
>> controlledJob2.addDependingJob(controlledJob1);
>> JobControl jobControl= new JobControl("control");
>>
>> jobControl.addJob(controlledJob1);
>> jobControl.addJob(controlledJob2);
>>
>> Thread thread = new Thread(jobControl);
>> thread.start();
>> while(!jobControl.allFinished())
>> {
>> try {
>> Thread.sleep(10000);
>> } catch (InterruptedException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>> jobControl.stop();
>> }
>> }
>>
>>
>> wordcount output => job1 is given to sort=> job2
>> Irrespective of mappers and reducers, above mentioned is the way to handle
>> many jobs.
>>
>> 2011/9/21 谭军 <[email protected]>
>>
>>> Hi,
>>> I want to use 2 MR jobs sequentially.
>>> And the first job produces intermediate result to a temp file.
>>> The second job reads the result in temp file but not the FileInputPath.
>>> I tried, but FileNotFoundException reported.
>>> Then I checked the datanodes, temp file was created.
>>> The first job was executed correctly.
>>> Why the second job cannot find the file? The file was created before the
>>> second job was executed.
>>> Thanks!
>>>
>>> --
>>>
>>> Regards!
>>>
>>> Jun Tan
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Swathi.V.
>>
>>
>>
>>
>
>
> --
> Regards,
> Swathi.V.
>
>
>
>
--
Regards,
Swathi.V.