Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-218 65132a6cb -> 27ad40871


[IGNITE-218]: intermediate commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7db58f9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7db58f9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7db58f9e

Branch: refs/heads/ignite-218
Commit: 7db58f9ec1c021ad38df7bdf879605f7e9babfec
Parents: 65132a6
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Thu Apr 16 22:01:11 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Thu Apr 16 22:01:11 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsIpcHandler.java         | 185 ++++++++---------
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  18 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  51 ++---
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   5 +-
 .../hadoop/igfs/HadoopIgfsInProc.java           | 196 +++++++++++++------
 .../hadoop/igfs/HadoopIgfsOutProc.java          |   2 +-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |   1 +
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  16 +-
 .../processors/hadoop/v2/HadoopV2Job.java       |  23 +--
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  33 ++--
 .../hadoop/v2/HadoopV2TaskContext.java          |   8 +-
 parent/pom.xml                                  |   2 +-
 12 files changed, 293 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index d8a8bdf..3ba99fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -31,6 +31,7 @@ import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 /**
@@ -241,153 +242,155 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Response message.
      * @throws IgniteCheckedException If failed.
      */
-    private IgfsMessage processPathControlRequest(IgfsClientSession ses, 
IgfsIpcCommand cmd,
+    private IgfsMessage processPathControlRequest(final IgfsClientSession ses, 
final IgfsIpcCommand cmd,
         IgfsMessage msg) throws IgniteCheckedException {
-        IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+        final IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
         if (log.isDebugEnabled())
             log.debug("Processing path control request [igfsName=" + 
igfs.name() + ", req=" + req + ']');
 
-        IgfsControlResponse res = new IgfsControlResponse();
+        final IgfsControlResponse res = new IgfsControlResponse();
 
         final String userName = req.userName();
 
         assert userName != null;
 
-        final IgfsEx userIgfs = igfs; //.forUser(userName);
-
-        //IgfsUtils.setContextUser(userName);
-
         try {
-            switch (cmd) {
-                case EXISTS:
-                    res.response(userIgfs.exists(req.path()));
+            IgfsUserContext.doAs(userName, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    switch (cmd) {
+                        case EXISTS:
+                            res.response(igfs.exists(req.path()));
 
-                    break;
+                            break;
 
-                case INFO:
-                    res.response(userIgfs.info(req.path()));
+                        case INFO:
+                            res.response(igfs.info(req.path()));
 
-                    break;
+                            break;
 
-                case PATH_SUMMARY:
-                    res.response(userIgfs.summary(req.path()));
+                        case PATH_SUMMARY:
+                            res.response(igfs.summary(req.path()));
 
-                    break;
+                            break;
 
-                case UPDATE:
-                    res.response(userIgfs.update(req.path(), 
req.properties()));
+                        case UPDATE:
+                            res.response(igfs.update(req.path(), 
req.properties()));
 
-                    break;
+                            break;
 
-                case RENAME:
-                    userIgfs.rename(req.path(), req.destinationPath());
+                        case RENAME:
+                            igfs.rename(req.path(), req.destinationPath());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case DELETE:
-                    res.response(userIgfs.delete(req.path(), req.flag()));
+                        case DELETE:
+                            res.response(igfs.delete(req.path(), req.flag()));
 
-                    break;
+                            break;
 
-                case MAKE_DIRECTORIES:
-                    userIgfs.mkdirs(req.path(), req.properties());
+                        case MAKE_DIRECTORIES:
+                            igfs.mkdirs(req.path(), req.properties());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case LIST_PATHS:
-                    res.paths(userIgfs.listPaths(req.path()));
+                        case LIST_PATHS:
+                            res.paths(igfs.listPaths(req.path()));
 
-                    break;
+                            break;
 
-                case LIST_FILES:
-                    res.files(userIgfs.listFiles(req.path()));
+                        case LIST_FILES:
+                            res.files(igfs.listFiles(req.path()));
 
-                    break;
+                            break;
 
-                case SET_TIMES:
-                    userIgfs.setTimes(req.path(), req.accessTime(), 
req.modificationTime());
+                        case SET_TIMES:
+                            igfs.setTimes(req.path(), req.accessTime(), 
req.modificationTime());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case AFFINITY:
-                    res.locations(userIgfs.affinity(req.path(), req.start(), 
req.length()));
+                        case AFFINITY:
+                            res.locations(igfs.affinity(req.path(), 
req.start(), req.length()));
 
-                    break;
+                            break;
 
-                case OPEN_READ: {
-                    IgfsInputStreamAdapter igfsIn = !req.flag() ? 
userIgfs.open(req.path(), bufSize) :
-                        userIgfs.open(req.path(), bufSize, 
req.sequentialReadsBeforePrefetch());
+                        case OPEN_READ: {
+                            IgfsInputStreamAdapter igfsIn = !req.flag() ? 
igfs.open(req.path(), bufSize) :
+                                igfs.open(req.path(), bufSize, 
req.sequentialReadsBeforePrefetch());
 
-                    long streamId = registerResource(ses, igfsIn);
+                            long streamId = registerResource(ses, igfsIn);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS input stream for file read 
[igfsName=" + userIgfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + 
ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS input stream for file 
read [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", 
ses=" + ses + ']');
 
-                    IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), 
null,
-                        igfsIn.fileInfo().modificationTime());
+                            IgfsFileInfo info = new 
IgfsFileInfo(igfsIn.fileInfo(), null,
+                                igfsIn.fileInfo().modificationTime());
 
-                    res.response(new IgfsInputStreamDescriptor(streamId, 
info.length()));
+                            res.response(new 
IgfsInputStreamDescriptor(streamId, info.length()));
 
-                    break;
-                }
+                            break;
+                        }
 
-                case OPEN_CREATE: {
-                    long streamId = registerResource(ses, userIgfs.create(
-                        req.path(),       // Path.
-                        bufSize,          // Buffer size.
-                        req.flag(),       // Overwrite if exists.
-                        affinityKey(req), // Affinity key based on replication 
factor.
-                        req.replication(),// Replication factor.
-                        req.blockSize(),  // Block size.
-                        req.properties()  // File properties.
-                    ));
+                        case OPEN_CREATE: {
+                            long streamId = registerResource(ses, igfs.create(
+                                req.path(),       // Path.
+                                bufSize,          // Buffer size.
+                                req.flag(),       // Overwrite if exists.
+                                affinityKey(req), // Affinity key based on 
replication factor.
+                                req.replication(),// Replication factor.
+                                req.blockSize(),  // Block size.
+                                req.properties()  // File properties.
+                            ));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file create 
[igfsName=" + userIgfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + 
ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS output stream for file 
create [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", 
ses=" + ses + ']');
 
-                    res.response(streamId);
+                            res.response(streamId);
 
-                    break;
-                }
+                            break;
+                        }
 
-                case OPEN_APPEND: {
-                    long streamId = registerResource(ses, userIgfs.append(
-                        req.path(),        // Path.
-                        bufSize,           // Buffer size.
-                        req.flag(),        // Create if absent.
-                        req.properties()   // File properties.
-                    ));
+                        case OPEN_APPEND: {
+                            long streamId = registerResource(ses, igfs.append(
+                                req.path(),        // Path.
+                                bufSize,           // Buffer size.
+                                req.flag(),        // Create if absent.
+                                req.properties()   // File properties.
+                            ));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file append 
[igfsName=" + userIgfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + 
ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS output stream for file 
append [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", 
ses=" + ses + ']');
 
-                    res.response(streamId);
+                            res.response(streamId);
 
-                    break;
-                }
+                            break;
+                        }
 
-                default:
-                    assert false : "Unhandled path control request command: " 
+ cmd;
+                        default:
+                            assert false : "Unhandled path control request 
command: " + cmd;
 
-                    break;
-            }
+                            break;
+                    }
+
+                    return null;
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
 
         if (log.isDebugEnabled())
-            log.debug("Finished processing path control request [igfsName=" + 
userIgfs.name() + ", req=" + req +
+            log.debug("Finished processing path control request [igfsName=" + 
igfs.name() + ", req=" + req +
                 ", res=" + res + ']');
 
         return res;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 80f693e..a0927e2 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -76,18 +76,18 @@ public class IgniteHadoopFileSystemCounterWriter implements 
HadoopCounterWriter
         try {
             hadoopCfg.set(MRJobConfig.USER_NAME, user);
 
-            FileSystem fs = 
HadoopV2JobResourceManager.fileSystemForUser(jobStatPath.toUri(), hadoopCfg);
+            try (FileSystem fs = 
HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg)) 
{
+                fs.mkdirs(jobStatPath);
 
-            fs.mkdirs(jobStatPath);
+                try (PrintStream out = new PrintStream(fs.create(new 
Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
+                    for (T2<String, Long> evt : perfCntr.evts()) {
+                        out.print(evt.get1());
+                        out.print(':');
+                        out.println(evt.get2().toString());
+                    }
 
-            try (PrintStream out = new PrintStream(fs.create(new 
Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
-                for (T2<String, Long> evt : perfCntr.evts()) {
-                    out.print(evt.get1());
-                    out.print(':');
-                    out.println(evt.get2().toString());
+                    out.flush();
                 }
-
-                out.flush();
             }
         }
         catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 132a2ee..c53aabf 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
@@ -173,21 +172,15 @@ public class IgniteHadoopFileSystem extends FileSystem {
     }
 
     /**
-     * Gets non-null and interned user name as per the Hadoop viewpoint.
-     * @param cfg the Hadoop job configuration, may be null.
+     * Gets non-null and interned user name as per the Hadoop file system 
viewpoint.
      * @return the user name, never null.
      */
-    public static String getHadoopUser(@Nullable Configuration cfg) throws 
IOException {
+    public static String getFsHadoopUser() throws IOException {
         String user = null;
 
-        // TODO: Create ticket to remove these lines.
-//        // First, try to get the user from MR Job configuration:
-//        if (cfg != null)
-//            user = cfg.get(MRJobConfig.USER_NAME);
-
-        UserGroupInformation currentUgi = 
UserGroupInformation.getCurrentUser();
-        if (currentUgi != null)
-             user = currentUgi.getShortUserName();
+        UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+        if (currUgi != null)
+             user = currUgi.getShortUserName();
 
         user = IgfsUserContext.fixUserName(user);
 
@@ -236,7 +229,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             uriAuthority = uri.getAuthority();
 
-            user = getHadoopUser(cfg);
+            user = getFsHadoopUser();
 
             // Override sequential reads before prefetch if needed.
             seqReadsBeforePrefetch = parameter(cfg, 
PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -321,7 +314,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
                 }
             }
 
-            // set working directory to the hone directory of the current Fs 
user:
+            // set working directory to the home directory of the current Fs 
user:
             setWorkingDirectory(null);
         }
         finally {
@@ -866,25 +859,11 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
     /** {@inheritDoc} */
     @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + user/*userName.get()*/);
+        Path path = new Path("/user/" + user);
 
         return path.makeQualified(getUri(), null);
     }
 
-//    /**
-//     * Set user name and default working directory for current thread.
-//     *
-//     * @param userName User name.
-//     */
-//    @Deprecated // TODO: remove this method.
-//    public void setUser(String userName) {
-//        //System.out.println(this + ": ##### setting user = " + userName + 
", thread = " + Thread.currentThread());
-//        assert F.eq(user, userName);
-//        //this.userName.set(userName);
-//
-//        //setWorkingDirectory(null);
-//    }
-
     /** {@inheritDoc} */
     @Override public void setWorkingDirectory(Path newPath) {
         if (newPath == null) {
@@ -893,7 +872,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(homeDir));
 
-            workingDir = homeDir; //.set(homeDir);
+            workingDir = homeDir;
         }
         else {
             Path fixedNewPath = fixRelativePart(newPath);
@@ -906,13 +885,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
 
-            workingDir = fixedNewPath; //.set(fixedNewPath);
+            workingDir = fixedNewPath;
         }
     }
 
     /** {@inheritDoc} */
     @Override public Path getWorkingDirectory() {
-        return workingDir; //.get();
+        return workingDir;
     }
 
     /** {@inheritDoc} */
@@ -1282,12 +1261,4 @@ public class IgniteHadoopFileSystem extends FileSystem {
     public String user() {
         return user;
     }
-
-    /**
-     * Getter for secondaryFs field.
-     * @return the secondary file system, if any.
-     */
-    public FileSystem secondaryFs() {
-        return secondaryFs;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index fd97ed6..8330143 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 
         uri = name;
 
-        user = getHadoopUser(cfg);
+        user = getFsHadoopUser();
 
         try {
             initialize(name, cfg);
@@ -294,6 +294,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
                     SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(secUri, secConfPath);
 
                     secondaryFs = secProvider.createAbstractFileSystem(user);
+
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
@@ -994,4 +995,4 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
     public String user() {
         return user;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 2b1d836..771388a 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -56,11 +56,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
      * @param log Log.
      */
     public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws 
IgniteCheckedException {
-        this.user = userName;
+        this.user = IgfsUserContext.fixUserName(userName);
 
-        this.igfs = igfs; //.forUser(userName);
-
-        //assert this.user == this.igfs.user();
+        this.igfs = igfs;
 
         this.log = log;
 
@@ -68,11 +66,24 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) {
-        igfs.clientLogDirectory(logDir);
+    @Override public IgfsHandshakeResponse handshake(final String logDir) {
+        try {
+            return IgfsUserContext.doAs(user, new 
Callable<IgfsHandshakeResponse>() {
+                @Override public IgfsHandshakeResponse call() throws Exception 
{
+                    igfs.clientLogDirectory(logDir);
+
+                    return new IgfsHandshakeResponse(igfs.name(), 
igfs.proxyPaths(), igfs.groupBlockSize(),
+                        igfs.globalSampling());
+                }
+            });
+        } catch (IgniteException e) {
+            Throwable t = e.getCause();
 
-        return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), 
igfs.groupBlockSize(),
-            igfs.globalSampling());
+            if (t instanceof RuntimeException)
+                throw (RuntimeException)t;
+
+            throw e;
+        }
     }
 
     /** {@inheritDoc} */
@@ -90,12 +101,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile info(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public IgfsFile info(final IgfsPath path) throws 
IgniteCheckedException {
         try {
-            return igfs.info(path);
+            return IgfsUserContext.doAs(user, new Callable<IgfsFile>() {
+                @Override public IgfsFile call() throws Exception {
+                    return igfs.info(path);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get file 
info because Grid is stopping: " + path);
@@ -103,12 +118,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) 
throws IgniteCheckedException {
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, 
String> props) throws IgniteCheckedException {
         try {
-            return igfs.update(path, props);
+            return IgfsUserContext.doAs(user, new Callable<IgfsFile>() {
+                @Override public IgfsFile call() throws Exception {
+                    return igfs.update(path, props);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to update file 
because Grid is stopping: " + path);
@@ -116,14 +135,20 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean setTimes(IgfsPath path, long accessTime, long 
modificationTime) throws IgniteCheckedException {
+    @Override public Boolean setTimes(final IgfsPath path, final long 
accessTime, final long modificationTime) throws IgniteCheckedException {
         try {
-            igfs.setTimes(path, accessTime, modificationTime);
+            IgfsUserContext.doAs(user, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    igfs.setTimes(path, accessTime, modificationTime);
+
+                    return null;
+                }
+            });
 
             return true;
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to set path 
times because Grid is stopping: " +
@@ -132,14 +157,20 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws 
IgniteCheckedException {
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) 
throws IgniteCheckedException {
         try {
-            igfs.rename(src, dest);
+            IgfsUserContext.doAs(user, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    igfs.rename(src, dest);
+
+                    return null;
+                }
+            });
 
             return true;
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to rename path 
because Grid is stopping: " + src);
@@ -147,12 +178,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean delete(IgfsPath path, boolean recursive) throws 
IgniteCheckedException {
+    @Override public Boolean delete(final IgfsPath path, final boolean 
recursive) throws IgniteCheckedException {
         try {
-            return igfs.delete(path, recursive);
+            return IgfsUserContext.doAs(user, new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    return igfs.delete(path, recursive);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to delete path 
because Grid is stopping: " + path);
@@ -162,7 +197,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
         try {
-            return igfs.globalSpace();
+            return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+                @Override public IgfsStatus call() throws Exception {
+                    return igfs.globalSpace();
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get file 
system status because Grid is " +
@@ -171,12 +213,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return igfs.listPaths(path);
+            return IgfsUserContext.doAs(user, new 
Callable<Collection<IgfsPath>>() {
+                @Override public Collection<IgfsPath> call() throws Exception {
+                    return igfs.listPaths(path);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to list paths 
because Grid is stopping: " + path);
@@ -184,12 +230,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return igfs.listFiles(path);
+            return IgfsUserContext.doAs(user, new 
Callable<Collection<IgfsFile>>() {
+                @Override public Collection<IgfsFile> call() throws Exception {
+                    return igfs.listFiles(path);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to list files 
because Grid is stopping: " + path);
@@ -197,14 +247,20 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) 
throws IgniteCheckedException {
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, 
String> props) throws IgniteCheckedException {
         try {
-            igfs.mkdirs(path, props);
+            IgfsUserContext.doAs(user, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    igfs.mkdirs(path, props);
+
+                    return null;
+                }
+            });
 
             return true;
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to create 
directory because Grid is stopping: " +
@@ -213,12 +269,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return igfs.summary(path);
+            return IgfsUserContext.doAs(user, new Callable<IgfsPathSummary>() {
+                @Override public IgfsPathSummary call() throws Exception {
+                    return igfs.summary(path);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get content 
summary because Grid is stopping: " +
@@ -227,13 +287,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, 
long start, long len)
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath 
path, final long start, final long len)
         throws IgniteCheckedException {
         try {
-            return igfs.affinity(path, start, len);
+            return IgfsUserContext.doAs(user, new 
Callable<Collection<IgfsBlockLocation>>() {
+                @Override public Collection<IgfsBlockLocation> call() throws 
Exception {
+                    return igfs.affinity(path, start, len);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get affinity 
because Grid is stopping: " + path);
@@ -241,14 +305,18 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws 
IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+            return IgfsUserContext.doAs(user, new 
Callable<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate call() throws 
Exception {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
 
-            return new HadoopIgfsStreamDelegate(this, stream, 
stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to open file 
because Grid is stopping: " + path);
@@ -256,15 +324,19 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int 
seqReadsBeforePrefetch)
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final 
int seqReadsBeforePrefetch)
         throws IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize, 
seqReadsBeforePrefetch);
+            return IgfsUserContext.doAs(user, new 
Callable<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate call() throws 
Exception {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize, 
seqReadsBeforePrefetch);
 
-            return new HadoopIgfsStreamDelegate(this, stream, 
stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to open file 
because Grid is stopping: " + path);
@@ -272,16 +344,20 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean 
overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) 
throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, 
final boolean overwrite, final boolean colocate,
+        final int replication, final long blockSize, final @Nullable 
Map<String, String> props) throws IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
-                colocate ? igfs.nextAffinityKey() : null, replication, 
blockSize, props);
+            return IgfsUserContext.doAs(user, new 
Callable<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate call() throws 
Exception {
+                    IgfsOutputStream stream = igfs.create(path, bufSize, 
overwrite,
+                        colocate ? igfs.nextAffinityKey() : null, replication, 
blockSize, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to create file 
because Grid is stopping: " + path);
@@ -289,15 +365,19 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean 
create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, 
final boolean create,
+        final @Nullable Map<String, String> props) throws 
IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.append(path, bufSize, create, 
props);
+            return IgfsUserContext.doAs(user, new 
Callable<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate call() throws 
Exception {
+                    IgfsOutputStream stream = igfs.append(path, bufSize, 
create, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream);
+                }
+            });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
+            throw new IgniteCheckedException(e.getCause());
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to append file 
because Grid is stopping: " + path);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 061eed7..639f2eb 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -141,7 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         this.grid = grid;
         this.igfs = igfs;
         this.log = log;
-        this.userName = user;
+        this.userName = IgfsUserContext.fixUserName(user);
 
         io = HadoopIgfsIpcIo.get(log, endpoint);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index 1574152..eaf7392 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -55,6 +55,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
     /** Logger. */
     private final Log log;
 
+    /** The user name this wrapper works on behalf of. */
     private final String userName;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 9d7125a..fe350b2 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
@@ -111,8 +110,17 @@ public abstract class HadoopRunnableTask implements 
Callable<Void> {
 
         assert !F.isEmpty(user);
 
-        if (F.eq(user, FileSystemConfiguration.DFLT_USER_NAME))
-            // do direct call:
+        String ugiUser;
+        try {
+            UserGroupInformation currUser = 
UserGroupInformation.getCurrentUser();
+
+            ugiUser = currUser.getShortUserName();
+        } catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        if (F.eq(user, ugiUser))
+            // if current UGI context user is the same, do direct call:
             return callImpl();
         else {
             // do the call in the context of 'user':
@@ -141,7 +149,7 @@ public abstract class HadoopRunnableTask implements 
Callable<Void> {
 
     /**
      * Runnable task call implementation
-     * @return
+     * @return null.
      * @throws IgniteCheckedException
      */
     Void callImpl() throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index ac10687..0d40f34 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -140,7 +140,7 @@ public class HadoopV2Job implements HadoopJob {
 
             Path jobDir = new Path(jobDirPath);
 
-            try (FileSystem fs = fileSystemForUser(jobDir.toUri(), jobConf)) {
+            try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf)) 
{
                 JobSplit.TaskSplitMetaInfo[] metaInfos = 
SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
 
@@ -264,8 +264,6 @@ public class HadoopV2Job implements HadoopJob {
                 if (jobLocDir.exists())
                     U.delete(jobLocDir);
             }
-//
-//            disposeFileSystem();
         }
         finally {
             taskCtxClsPool.clear();
@@ -300,25 +298,6 @@ public class HadoopV2Job implements HadoopJob {
         }
     }
 
-//    /**
-//     * Closes the underlying file system.
-//     * @throws IgniteCheckedException on error.
-//     */
-//    private void disposeFileSystem() throws IgniteCheckedException {
-//        FileSystem fs0 = fs;
-//
-//        try {
-//            if (fs0 != null)
-//                fs0.close();
-//        }
-//        catch (IOException ioe) {
-//            throw new IgniteCheckedException(ioe);
-//        }
-//        finally {
-//            fs = null;
-//        }
-//    }
-
     /** {@inheritDoc} */
     @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
         rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 01d4719..340891a 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -59,8 +59,6 @@ public class HadoopV2JobResourceManager {
 
     /** Staging directory to delivery job jar and config to the work nodes. */
     private Path stagingDir;
-//
-//    private FileSystem fs;
 
     /**
      * Creates new instance.
@@ -72,10 +70,6 @@ public class HadoopV2JobResourceManager {
         this.jobId = jobId;
         this.ctx = ctx;
         this.log = log.getLogger(HadoopV2JobResourceManager.class);
-//
-//        assert fs != null;
-//
-//        this.fs = fs;
     }
 
     /**
@@ -101,6 +95,22 @@ public class HadoopV2JobResourceManager {
     }
 
     /**
+     * Gets non-null and interned user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    private static String getMrHadoopUser(Configuration cfg) throws 
IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        user = user.intern();
+
+        return user;
+    }
+
+    /**
      * Common method to get the V1 file system in MapRed engine.
      * It creates the filesystem for the user specified in the
      * configuration with {@link MRJobConfig#USER_NAME} property.
@@ -108,10 +118,9 @@ public class HadoopV2JobResourceManager {
      * @param cfg the configuration.
      * @return the file system
      * @throws IOException
-     * @throws InterruptedException
      */
-    public static FileSystem fileSystemForUser(@Nullable URI uri, @Nullable 
Configuration cfg) throws IOException {
-        final String user = IgniteHadoopFileSystem.getHadoopUser(cfg);
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, 
Configuration cfg) throws IOException {
+        final String user = getMrHadoopUser(cfg);
 
         assert user != null;
 
@@ -154,7 +163,7 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    try (FileSystem fs = fileSystemForUser(stagingDir.toUri(), 
cfg)) {
+                    try (FileSystem fs = 
fileSystemForMrUser(stagingDir.toUri(), cfg)) {
                         if (!fs.exists(stagingDir))
                             throw new IgniteCheckedException("Failed to find 
map-reduce submission directory (does not exist): " +
                                 stagingDir);
@@ -246,7 +255,7 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            try (FileSystem srcFs = fileSystemForUser(srcPath.toUri(), cfg)) {
+            try (FileSystem srcFs = fileSystemForMrUser(srcPath.toUri(), cfg)) 
{
                 if (extract) {
                     File archivesPath = new File(jobLocDir.getAbsolutePath(), 
".cached-archives");
 
@@ -329,7 +338,7 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null) {
-                try (FileSystem fs = fileSystemForUser(stagingDir.toUri(), 
ctx.getJobConf())) {
+                try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), 
ctx.getJobConf())) {
                     fs.delete(stagingDir, true);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index de7ff7f..d19f4f4 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -32,7 +32,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v1.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
@@ -233,11 +232,6 @@ public class HadoopV2TaskContext extends HadoopTaskContext 
{
         
Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
 
         try {
-            //FileSystem fs = 
HadoopV2JobResourceManager.fileSystemForUser(null, jobConf());
-            //String user = jobConf().getUser();
-            //System.out.println("Setting user ["+user+"] to fs=" + fs + ", 
thread = " + Thread.currentThread());
-            //HadoopFileSystemsUtils.setUser(fs, user); //
-
             LocalFileSystem locFs = FileSystem.getLocal(jobConf());
 
             locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
@@ -413,7 +407,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private Object readExternalSplit(HadoopExternalSplit split) throws 
IgniteCheckedException {
         Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
 
-        try (FileSystem fs = 
HadoopV2JobResourceManager.fileSystemForUser(jobDir.toUri(), jobConf());
+        try (FileSystem fs = 
HadoopV2JobResourceManager.fileSystemForMrUser(jobDir.toUri(), jobConf());
             FSDataInputStream in = 
fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
 
             in.seek(split.offset());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 63fbf48..cb84f7f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -35,7 +35,7 @@
 
     <properties>
         <ignite.edition>fabric</ignite.edition>
-        <hadoop.version>2.6.0</hadoop.version> // TODO: Revert.
+        <hadoop.version>2.6.0</hadoop.version>
         <spring.version>4.1.0.RELEASE</spring.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.build.timestamp.format>MMMM d 
yyyy</maven.build.timestamp.format>

Reply via email to