Hi,
We read something similar but donot use FileSystem api.
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(jobConf);
if (cacheFiles != null)
{
for (Path cacheFile : cacheFiles)
{
FileInputStream fis = new FileInputStream(cacheFile.toString());
//Logic to process on top of the input stream
}
}
Thanks
Sudhan S
On Tue, Nov 8, 2011 at 2:30 PM, Uma Maheswara Rao G 72686 <
[email protected]> wrote:
> ----- Original Message -----
> From: Arko Provo Mukherjee <[email protected]>
> Date: Tuesday, November 8, 2011 1:26 pm
> Subject: Issues with Distributed Caching
> To: [email protected]
>
> > Hello,
> >
> > I am having the following problem with Distributed Caching.
> >
> > *In the driver class, I am doing the following:
> > (/home/arko/MyProgram/datais a directory created as an output of
> > another map-reduce)*
> >
> > *FileSystem fs = FileSystem.get(jobconf_seed);
> >
> > String init_path = "/home/arko/MyProgram/data";
> >
> > System.out.println("Caching files in " + init_path);
> >
> > FileStatus[] init_files = fs.listStatus(new Path(init_path));
> >
> > for ( int i = 0; i < init_files.length; i++ ) {
> >
> > Path p = init_files[i].getPath();
> > DistributedCache.addCacheFile ( p.toUri(), jobconf );
> > }*
> >
> I am not clearly sure about this. But looking at this, if you do
> addCacheFile, it will set the files to mapred.cache.files.
> I think you are getting localCacheFiles ( it will try to get the value
> with ,apred.cache.localFiles) . Looks that value is coming as null. Please
> check whether you are setting that values correctly or not.
> > This is executing fine.
> >
> > *I have the following code in the configure method of the Map class:*
> >
> > *public void configure(JobConf job) {
> >
> > try {
> >
> > fs = FileSystem.getLocal(new Configuration());
> > Path [] localFiles = DistributedCache.getLocalCacheFiles(job);
> >
> > for ( Path p:localFiles ) {
> >
> > BufferedReader file_reader = new BufferedReader(new
> > InputStreamReader(fs.open(p)));
> >
> > String line = file_reader.readLine();
> >
> > while ( line != null ) {
> >
> > // Do something with the data
> >
> > line = C0_file.readLine();
> >
> > }
> >
> > }
> >
> > } catch (java.io.IOException e) {
> >
> > System.err.println("ERROR!! Cannot open filesystem from Map for
> > reading!!");e.printStackTrace();
> > }
> > }*
> >
> > This is giving me a java.lang.NullPointerException:
> > 11/11/08 01:36:17 INFO mapred.JobClient: Task Id :
> > attempt_201106271322_12775_m_000003_1, Status : FAILED
> > java.lang.NullPointerException
> > at Map.configure(Map.java:57)
> > at
> > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:58)at
> >
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:83)
> > at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:34)
> > at
> > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:58)at
> >
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:83)
> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:328)
> > at org.apache.hadoop.mapred.Child.main(Child.java:155)
> >
> >
> > I am doing it in a wrong way? I followed a lot of links and this
> > seems to
> > be the way to go about it. Please help!
> >
> > Thanks a lot in advance!
> >
> > Warm regards
> > Arko
> >
>