[IGNITE-218] workable version

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1634a685
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1634a685
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1634a685

Branch: refs/heads/ignite-218
Commit: 1634a685ad0817659fe80e0d5809f75faa6616ad
Parents: f11e357
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Tue Apr 14 16:57:19 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Tue Apr 14 16:57:19 2015 +0300

----------------------------------------------------------------------
 .../igfs/secondary/IgfsSecondaryFileSystem.java |  14 ++
 .../igfs/common/IgfsHandshakeRequest.java       |  13 ++
 .../internal/igfs/common/IgfsMarshaller.java    |   5 +
 .../igfs/common/IgfsPathControlRequest.java     |  12 ++
 .../internal/processors/hadoop/HadoopJob.java   |   2 +-
 .../internal/processors/igfs/IgfsAsyncImpl.java |  10 +
 .../ignite/internal/processors/igfs/IgfsEx.java |  14 ++
 .../internal/processors/igfs/IgfsImpl.java      | 158 ++++++++++++---
 .../processors/igfs/IgfsIpcHandler.java         |  48 +++--
 .../igfs/IgfsSecondaryFileSystemImpl.java       |  16 +-
 .../internal/processors/igfs/IgfsServer.java    |   4 +-
 .../ignite/internal/util/IgniteUtils.java       |  12 ++
 .../fs/IgniteHadoopFileSystemCounterWriter.java |   6 +-
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |  42 +++-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    | 124 +++++++++---
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |  26 ++-
 .../internal/processors/hadoop/HadoopUtils.java |   8 +-
 .../hadoop/SecondaryFileSystemProvider.java     |  24 ++-
 .../hadoop/fs/HadoopDistributedFileSystem.java  | 199 ++++++++++---------
 .../hadoop/fs/HadoopFileSystemsUtils.java       |  28 +--
 .../processors/hadoop/igfs/HadoopIgfsEx.java    |   6 +
 .../hadoop/igfs/HadoopIgfsInProc.java           |  17 +-
 .../processors/hadoop/igfs/HadoopIgfsIpcIo.java |   2 +-
 .../hadoop/igfs/HadoopIgfsOutProc.java          |  34 +++-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |  14 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  46 +++++
 .../processors/hadoop/v2/HadoopV2Job.java       |  38 +++-
 .../hadoop/v2/HadoopV2JobResourceManager.java   | 109 +++++++---
 .../hadoop/v2/HadoopV2TaskContext.java          |   9 +-
 .../IgniteHadoopFileSystemClientSelfTest.java   |   2 +-
 .../HadoopDefaultMapReducePlannerSelfTest.java  |  12 ++
 .../hadoop/HadoopFileSystemsTest.java           |   2 +-
 .../processors/hadoop/HadoopStartup.java        |   2 +-
 .../hadoop/examples/HadoopWordCount2.java       |  24 ++-
 parent/pom.xml                                  |   2 +-
 35 files changed, 819 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
 
b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 9026eac..c2c610c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,4 +198,18 @@ public interface IgfsSecondaryFileSystem {
      * @return Map of properties.
      */
     public Map<String,String> properties();
+
+    /**
+     * Gets an instance of 2ndary Fs for the specified user (may be 'this').
+     * @param userName the user name
+     * @return the instance
+     * @throws IgniteCheckedException
+     */
+    public IgfsSecondaryFileSystem forUser(String userName) throws 
IgniteCheckedException;
+
+    /**
+     * The user name this 2ndary Fs works on behalf of.
+     * @return the user name.
+     */
+    public String getUser();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
index ec8ef6e..4d2091b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
@@ -31,6 +31,9 @@ public class IgfsHandshakeRequest extends IgfsMessage {
     /** Expected IGFS name. */
     private String igfsName;
 
+    /** User name the request is done on behalf of. */
+    private String userName;
+
     /** Logger directory. */
     private String logDir;
 
@@ -90,4 +93,14 @@ public class IgfsHandshakeRequest extends IgfsMessage {
     @Override public String toString() {
         return S.toString(IgfsHandshakeRequest.class, this);
     }
+
+    public final String userName() {
+        assert userName != null;
+
+        return userName;
+    }
+
+    public final void userName(String userName) {
+        this.userName = U.fixUserName(userName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index 11af716..e1a5d00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -73,6 +73,7 @@ public class IgfsMarshaller {
     }
 
     /**
+     * Serializes the message and sends it into the given output stream.
      * @param msg Message.
      * @param hdr Message header.
      * @param out Output.
@@ -89,6 +90,7 @@ public class IgfsMarshaller {
 
                     IgfsHandshakeRequest req = (IgfsHandshakeRequest)msg;
 
+                    U.writeString(out, req.userName());
                     U.writeString(out, req.gridName());
                     U.writeString(out, req.igfsName());
                     U.writeString(out, req.logDirectory());
@@ -119,6 +121,7 @@ public class IgfsMarshaller {
 
                     IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
+                    U.writeString(out, req.userName());
                     writePath(out, req.path());
                     writePath(out, req.destinationPath());
                     out.writeBoolean(req.flag());
@@ -205,6 +208,7 @@ public class IgfsMarshaller {
                 case HANDSHAKE: {
                     IgfsHandshakeRequest req = new IgfsHandshakeRequest();
 
+                    req.userName(U.readString(in));
                     req.gridName(U.readString(in));
                     req.igfsName(U.readString(in));
                     req.logDirectory(U.readString(in));
@@ -236,6 +240,7 @@ public class IgfsMarshaller {
                 case OPEN_CREATE: {
                     IgfsPathControlRequest req = new IgfsPathControlRequest();
 
+                    req.userName(U.readString(in));
                     req.path(readPath(in));
                     req.destinationPath(readPath(in));
                     req.flag(in.readBoolean());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index 7ed1619..99a946a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -63,6 +63,8 @@ public class IgfsPathControlRequest extends IgfsMessage {
     /** Last modification time. */
     private long modificationTime;
 
+    private String userName;
+
     /**
      * @param path Path.
      */
@@ -235,4 +237,14 @@ public class IgfsPathControlRequest extends IgfsMessage {
     @Override public String toString() {
         return S.toString(IgfsPathControlRequest.class, this, "cmd", 
command());
     }
+
+    public final String userName() {
+        assert userName != null;
+
+        return userName;
+    }
+
+    public final void userName(String userName) {
+        this.userName = U.fixUserName(userName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
index 65cb48d..5fd6c81 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
@@ -98,5 +98,5 @@ public interface HadoopJob {
     /**
      * Cleans up the job staging directory.
      */
-    void cleanupStagingDirectory();
+    public void cleanupStagingDirectory();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 48a32f4..1ae6446 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -314,4 +314,14 @@ public class IgfsAsyncImpl extends 
AsyncSupportAdapter<IgniteFileSystem> impleme
     @Override public IgfsSecondaryFileSystem asSecondary() {
         return igfs.asSecondary();
     }
+
+    /** {@inheritDoc} */
+    @Override public IgfsEx forUser(String userName) throws 
IgniteCheckedException {
+        return igfs.forUser(userName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return igfs.user();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 99f647e..5f73407 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -169,4 +169,18 @@ public interface IgfsEx extends IgniteFileSystem {
      * @return Secondary file system wrapper.
      */
     public IgfsSecondaryFileSystem asSecondary();
+
+    /**
+     * TODO
+     * @param userName
+     * @return
+     * @throws IgniteCheckedException
+     */
+    public IgfsEx forUser(@Nullable String userName) throws 
IgniteCheckedException;
+
+    /**
+     * Getter for user name.
+     * @return user name.
+     */
+    public String user();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 824f178..82ef628 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -42,6 +42,7 @@ import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import java.io.*;
+import java.lang.ref.*;
 import java.net.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -56,7 +57,7 @@ import static 
org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*;
 /**
  * Cache-based IGFS implementation.
  */
-public final class IgfsImpl implements IgfsEx {
+public class IgfsImpl implements IgfsEx {
     /** Default permissions for file system entry. */
     private static final String PERMISSION_DFLT_VAL = "0777";
 
@@ -64,7 +65,7 @@ public final class IgfsImpl implements IgfsEx {
     private static final Map<String, String> DFLT_DIR_META = 
F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL);
 
     /** Handshake message. */
-    private final IgfsPaths secondaryPaths;
+    private IgfsPaths secondaryPaths;
 
     /** Cache based structure (meta data) manager. */
     private IgfsMetaManager meta;
@@ -73,7 +74,10 @@ public final class IgfsImpl implements IgfsEx {
     private IgfsDataManager data;
 
     /** FS configuration. */
-    private FileSystemConfiguration cfg;
+    private final FileSystemConfiguration cfg;
+
+    /** The user name this Fs works on behalf of. */
+    private final String user;
 
     /** IGFS context. */
     private IgfsContext igfsCtx;
@@ -88,7 +92,7 @@ public final class IgfsImpl implements IgfsEx {
     private IgniteLogger log;
 
     /** Mode resolver. */
-    private final IgfsModeResolver modeRslvr;
+    private IgfsModeResolver modeRslvr;
 
     /** Connection to the secondary file system. */
     private IgfsSecondaryFileSystem secondaryFs;
@@ -96,7 +100,7 @@ public final class IgfsImpl implements IgfsEx {
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
-    /** Writers map. */
+    /** Writers userToIgfsExMap. */
     private final ConcurrentHashMap8<IgfsPath, IgfsFileWorker> workerMap = new 
ConcurrentHashMap8<>();
 
     /** Delete futures. */
@@ -120,6 +124,8 @@ public final class IgfsImpl implements IgfsEx {
     /** Eviction policy (if set). */
     private IgfsPerBlockLruEvictionPolicy evictPlc;
 
+    private final ConcurrentMap<String, Reference<IgfsImpl>> userToIgfsExMap = 
new ConcurrentHashMap<>();
+
     /**
      * Creates IGFS instance with given context.
      *
@@ -137,10 +143,67 @@ public final class IgfsImpl implements IgfsEx {
         evts = igfsCtx.kernalContext().event();
         meta = igfsCtx.meta();
         data = igfsCtx.data();
-        secondaryFs = cfg.getSecondaryFileSystem();
+
+        user = U.fixUserName(getSecondaryFsUser());
+
+        assert user != null;
+
+        initIgfsSecondaryFileSystem();
+
+        // Check whether IGFS LRU eviction policy is set on data cache.
+        String dataCacheName = igfsCtx.configuration().getDataCacheName();
+
+        for (CacheConfiguration cacheCfg : 
igfsCtx.kernalContext().config().getCacheConfiguration()) {
+            if (F.eq(dataCacheName, cacheCfg.getName())) {
+                EvictionPolicy evictPlc = cacheCfg.getEvictionPolicy();
+
+                if (evictPlc != null & evictPlc instanceof 
IgfsPerBlockLruEvictionPolicy)
+                    this.evictPlc = (IgfsPerBlockLruEvictionPolicy)evictPlc;
+
+                break;
+            }
+        }
+
+        topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
+
+        igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
+        igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
+    }
+
+    /**
+     * Gets secondary file system user, or null, if no secondary file system 
is present.
+     * @return the secondary file system user.
+     */
+    @Nullable protected String getSecondaryFsUser() {
+        FileSystemConfiguration fsCfg = igfsCtx.configuration();
+
+        if (fsCfg == null)
+            return null;
+
+        IgfsSecondaryFileSystem sec = fsCfg.getSecondaryFileSystem();
+
+        if (sec == null)
+            return null;
+
+        return sec.getUser();
+    }
+
+    /**
+     * Initializes 2ndary file system.
+     * @throws IgniteCheckedException on error
+     */
+    private void initIgfsSecondaryFileSystem() throws IgniteCheckedException {
+        // This is the "prototype" (in terms of GOF pattern) of the 2ndary Fs 
to be cloned
+        // for the specified user:
+        final IgfsSecondaryFileSystem secondaryFs0 = 
cfg.getSecondaryFileSystem();
+
+        if (secondaryFs0 == null)
+            secondaryFs = null;
+        else
+            secondaryFs = secondaryFs0.forUser(user);
 
         /* Default IGFS mode. */
-        IgfsMode dfltMode;
+        final IgfsMode dfltMode;
 
         if (secondaryFs == null) {
             if (cfg.getDefaultMode() == PROXY)
@@ -195,25 +258,6 @@ public final class IgfsImpl implements IgfsEx {
 
         secondaryPaths = new IgfsPaths(secondaryFs == null ? null : 
secondaryFs.properties(), dfltMode,
             modeRslvr.modesOrdered());
-
-        // Check whether IGFS LRU eviction policy is set on data cache.
-        String dataCacheName = igfsCtx.configuration().getDataCacheName();
-
-        for (CacheConfiguration cacheCfg : 
igfsCtx.kernalContext().config().getCacheConfiguration()) {
-            if (F.eq(dataCacheName, cacheCfg.getName())) {
-                EvictionPolicy evictPlc = cacheCfg.getEvictionPolicy();
-
-                if (evictPlc != null & evictPlc instanceof 
IgfsPerBlockLruEvictionPolicy)
-                    this.evictPlc = (IgfsPerBlockLruEvictionPolicy)evictPlc;
-
-                break;
-            }
-        }
-
-        topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
-        igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
-        igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
     /**
@@ -2096,4 +2140,66 @@ public final class IgfsImpl implements IgfsEx {
         else
             throw new IllegalStateException("Failed to perform IGFS action 
because grid is stopping.");
     }
+
+    /** {@inheritDoc} */
+    @Override public IgfsEx forUser(String userName) throws 
IgniteCheckedException {
+        final String userFixed = U.fixUserName(userName);
+
+        if (this.user == userFixed)
+            return this; // same user
+
+        Reference<IgfsImpl> ref = userToIgfsExMap.get(userFixed);
+
+        IgfsImpl val = (ref == null) ? null : ref.get();
+
+        if (val == null) {
+            // Create impl from the same context but with different user:
+            IgfsImpl newVal = new IgfsImpl(igfsCtx) {
+                @Override protected String getSecondaryFsUser() {
+                    return userFixed;
+                }
+            };
+            final Reference<IgfsImpl> newRef = new WeakReference<>(newVal);
+
+            while (true) {
+                ref = userToIgfsExMap.get(userFixed);
+
+                if (ref == null) {
+                    ref = userToIgfsExMap.putIfAbsent(userFixed, newRef);
+
+                    if (ref == null) {
+                        ref = newRef;
+                        val = newVal;
+
+                        break; // ref replaced with newRef
+                    }
+                }
+
+                val = ref.get();
+
+                if (val != null)
+                    break; // there is an existing value
+
+                boolean replaced = userToIgfsExMap.replace(userFixed, ref, 
newRef);
+
+                if (replaced) {
+                    ref = newRef;
+                    val = newVal;
+
+                    break;
+                }
+            }
+        }
+
+        assert val == ref.get();
+        assert val != null;
+        assert F.eq(val.user(), userFixed);
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 8a8b858..4812c95 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
     private final int bufSize; // Buffer size. Must not be less then file 
block size.
 
     /** IGFS instance for this handler. */
-    private IgfsEx igfs;
+    private final IgfsEx igfs;
 
     /** Resource ID generator. */
-    private AtomicLong rsrcIdGen = new AtomicLong();
+    private final AtomicLong rsrcIdGen = new AtomicLong();
 
     /** Stopping flag. */
     private volatile boolean stopping;
@@ -250,77 +250,83 @@ class IgfsIpcHandler implements IgfsServerHandler {
 
         IgfsControlResponse res = new IgfsControlResponse();
 
+        final String userName = req.userName();
+
+        assert userName != null;
+
+        final IgfsEx userIgfs = igfs.forUser(userName);
+
         try {
             switch (cmd) {
                 case EXISTS:
-                    res.response(igfs.exists(req.path()));
+                    res.response(userIgfs.exists(req.path()));
 
                     break;
 
                 case INFO:
-                    res.response(igfs.info(req.path()));
+                    res.response(userIgfs.info(req.path()));
 
                     break;
 
                 case PATH_SUMMARY:
-                    res.response(igfs.summary(req.path()));
+                    res.response(userIgfs.summary(req.path()));
 
                     break;
 
                 case UPDATE:
-                    res.response(igfs.update(req.path(), req.properties()));
+                    res.response(userIgfs.update(req.path(), 
req.properties()));
 
                     break;
 
                 case RENAME:
-                    igfs.rename(req.path(), req.destinationPath());
+                    userIgfs.rename(req.path(), req.destinationPath());
 
                     res.response(true);
 
                     break;
 
                 case DELETE:
-                    res.response(igfs.delete(req.path(), req.flag()));
+                    res.response(userIgfs.delete(req.path(), req.flag()));
 
                     break;
 
                 case MAKE_DIRECTORIES:
-                    igfs.mkdirs(req.path(), req.properties());
+                    userIgfs.mkdirs(req.path(), req.properties());
 
                     res.response(true);
 
                     break;
 
                 case LIST_PATHS:
-                    res.paths(igfs.listPaths(req.path()));
+                    res.paths(userIgfs.listPaths(req.path()));
 
                     break;
 
                 case LIST_FILES:
-                    res.files(igfs.listFiles(req.path()));
+                    res.files(userIgfs.listFiles(req.path()));
 
                     break;
 
                 case SET_TIMES:
-                    igfs.setTimes(req.path(), req.accessTime(), 
req.modificationTime());
+                    userIgfs.setTimes(req.path(), req.accessTime(), 
req.modificationTime());
 
                     res.response(true);
 
                     break;
 
                 case AFFINITY:
-                    res.locations(igfs.affinity(req.path(), req.start(), 
req.length()));
+                    res.locations(userIgfs.affinity(req.path(), req.start(), 
req.length()));
 
                     break;
 
                 case OPEN_READ: {
-                    IgfsInputStreamAdapter igfsIn = !req.flag() ? 
igfs.open(req.path(), bufSize) :
-                        igfs.open(req.path(), bufSize, 
req.sequentialReadsBeforePrefetch());
+                    IgfsInputStreamAdapter igfsIn = !req.flag() ? 
userIgfs.open(req.path(), bufSize) :
+                        userIgfs.open(req.path(), bufSize, 
req.sequentialReadsBeforePrefetch());
 
                     long streamId = registerResource(ses, igfsIn);
 
                     if (log.isDebugEnabled())
-                        log.debug("Opened IGFS input stream for file read 
[igfsName=" + igfs.name() + ", path=" +
+                        log.debug("Opened IGFS input stream for file read 
[igfsName=" + userIgfs.name() + ", path=" +
                             req.path() + ", streamId=" + streamId + ", ses=" + 
ses + ']');
 
                     IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), 
null,
@@ -332,7 +338,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
                 }
 
                 case OPEN_CREATE: {
-                    long streamId = registerResource(ses, igfs.create(
+                    long streamId = registerResource(ses, userIgfs.create(
                         req.path(),       // Path.
                         bufSize,          // Buffer size.
                         req.flag(),       // Overwrite if exists.
@@ -343,7 +349,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
                     ));
 
                     if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file create 
[igfsName=" + igfs.name() + ", path=" +
+                        log.debug("Opened IGFS output stream for file create 
[igfsName=" + userIgfs.name() + ", path=" +
                             req.path() + ", streamId=" + streamId + ", ses=" + 
ses + ']');
 
                     res.response(streamId);
@@ -352,7 +358,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
                 }
 
                 case OPEN_APPEND: {
-                    long streamId = registerResource(ses, igfs.append(
+                    long streamId = registerResource(ses, userIgfs.append(
                         req.path(),        // Path.
                         bufSize,           // Buffer size.
                         req.flag(),        // Create if absent.
@@ -360,7 +366,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
                     ));
 
                     if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file append 
[igfsName=" + igfs.name() + ", path=" +
+                        log.debug("Opened IGFS output stream for file append 
[igfsName=" + userIgfs.name() + ", path=" +
                             req.path() + ", streamId=" + streamId + ", ses=" + 
ses + ']');
 
                     res.response(streamId);
@@ -379,7 +385,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
         }
 
         if (log.isDebugEnabled())
-            log.debug("Finished processing path control request [igfsName=" + 
igfs.name() + ", req=" + req +
+            log.debug("Finished processing path control request [igfsName=" + 
userIgfs.name() + ", req=" + req +
                 ", res=" + res + ']');
 
         return res;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 683b317..23431a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -30,14 +30,14 @@ import java.util.*;
  */
 class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
     /** Delegate. */
-    private final IgfsImpl igfs;
+    private final IgfsEx igfs;
 
     /**
      * Constructor.
      *
      * @param igfs Delegate.
      */
-    IgfsSecondaryFileSystemImpl(IgfsImpl igfs) {
+    IgfsSecondaryFileSystemImpl(IgfsEx igfs) {
         this.igfs = igfs;
     }
 
@@ -118,4 +118,16 @@ class IgfsSecondaryFileSystemImpl implements 
IgfsSecondaryFileSystem {
     @Override public Map<String, String> properties() {
         return Collections.emptyMap();
     }
+
+    /** {@inheritDoc} */
+    @Override public IgfsSecondaryFileSystem forUser(String userName) throws 
IgniteCheckedException {
+        IgfsEx forUser = igfs.forUser(userName);
+
+        return new IgfsSecondaryFileSystemImpl(forUser);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getUser() {
+        return igfs.user();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 253d5be..caa6866 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -239,13 +239,13 @@ public class IgfsServer {
      */
     private class ClientWorker extends GridWorker {
         /** Connected client endpoint. */
-        private IpcEndpoint endpoint;
+        private final IpcEndpoint endpoint;
 
         /** Data output stream. */
         private final IgfsDataOutputStream out;
 
         /** Client session object. */
-        private IgfsClientSession ses;
+        private final IgfsClientSession ses;
 
         /** Queue node for fast unlink. */
         private ConcurrentLinkedDeque8.Node<ClientWorker> node;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1aac985..55785e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9005,4 +9005,16 @@ public abstract class IgniteUtils {
 
         return hasShmem;
     }
+
+    /**
+     * Provides non-null interned user name
+     * @param user a user name.
+     * @return non-null interned user name
+     */
+    public static String fixUserName(@Nullable String user) {
+        if (user == null)
+           return "";
+
+        return user.intern();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 66e9761..80f693e 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.util.typedef.*;
 
 import java.io.*;
@@ -72,7 +74,9 @@ public class IgniteHadoopFileSystemCounterWriter implements 
HadoopCounterWriter
         HadoopPerformanceCounter perfCntr = 
HadoopPerformanceCounter.getCounter(cntrs, null);
 
         try {
-            FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
+            hadoopCfg.set(MRJobConfig.USER_NAME, user);
+
+            FileSystem fs = 
HadoopV2JobResourceManager.fileSystemForUser(jobStatPath.toUri(), hadoopCfg);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index ba891f8..3c9cc8a 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -20,7 +20,6 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.igfs.secondary.*;
@@ -43,7 +42,11 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     /** Hadoop file system. */
     private final FileSystem fileSys;
 
-    /** Properties of file system */
+    /** Properties of file system, see {@link #properties()}
+     * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
+     * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH}
+     * See {@link IgfsEx#SECONDARY_FS_URI}
+     * */
     private final Map<String, String> props = new HashMap<>();
 
     /**
@@ -131,14 +134,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem 
implements IgfsSecondaryFileSys
      * @param detailMsg Detailed error message.
      * @return Appropriate exception.
      */
-    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
     private IgfsException handleSecondaryFsError(IOException e, String 
detailMsg) {
-        boolean wrongVer = X.hasCause(e, RemoteException.class) ||
-            (e.getMessage() != null && e.getMessage().contains("Failed on 
local"));
-
-        return !wrongVer ? cast(detailMsg, e) :
-            new IgfsInvalidHdfsVersionException("HDFS version you are 
connecting to differs from local " +
-                "version.", e);    }
+        return cast(detailMsg, e);
+    }
 
     /**
      * Cast IO exception to IGFS exception.
@@ -450,4 +448,30 @@ public class IgniteHadoopIgfsSecondaryFileSystem 
implements IgfsSecondaryFileSys
     public FileSystem fileSystem() {
         return fileSys;
     }
+
+    /**
+     * TODO
+     * @param user2
+     * @return
+     * @throws IgniteCheckedException
+     */
+    @Override public IgfsSecondaryFileSystem forUser(String user2) throws 
IgniteCheckedException {
+        String user = props.get(SECONDARY_FS_USER_NAME);
+        if (F.eq(user, user2))
+            return this;
+        else {
+            String uri = props.get(SECONDARY_FS_URI);
+            String cfgPath = props.get(SECONDARY_FS_CONFIG_PATH);
+
+            return new IgniteHadoopIgfsSecondaryFileSystem(uri, cfgPath, 
user2);
+        }
+    }
+
+    /**
+     * TODO
+     * @return
+     */
+    @Override public String getUser() {
+        return props.get(SECONDARY_FS_USER_NAME);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 1f53a06..2dd7ddb 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
@@ -97,21 +98,22 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Grid remote client. */
     private HadoopIgfsWrapper rmtClient;
 
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>(){
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
+//    /** User name for each thread. */
+//    private final ThreadLocal<String> userName = new ThreadLocal<String>(){
+//        /** {@inheritDoc} */
+//        @Override protected String initialValue() {
+//            return DFLT_USER_NAME;
+//        }
+//    };
+
+//    /** Working directory for each thread. */
+//    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){
+//        /** {@inheritDoc} */
+//        @Override protected Path initialValue() {
+//            return getHomeDirectory();
+//        }
+//    };
+    private Path workingDir;
 
     /** Default replication factor. */
     private short dfltReplication;
@@ -129,6 +131,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Secondary URI string. */
     private URI secondaryUri;
 
+    /** The user name this file system was created on behalf of. */
+    private String user;
+
     /** IGFS mode resolver. */
     private IgfsModeResolver modeRslvr;
 
@@ -182,6 +187,37 @@ public class IgniteHadoopFileSystem extends FileSystem {
     }
 
     /**
+     * Gets non-null and interned user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    public static String getHadoopUser(@Nullable Configuration cfg) throws 
IOException {
+        String user = null;
+
+        // First, try to get the user from MR Job configuration:
+        if (cfg != null)
+            user = cfg.get(MRJobConfig.USER_NAME);
+
+        // 2nd, try to get it from UserGroupInformation (may return any result 
if we're
+        // inside UserGroupInformation.doAs(...) closure):
+        if (user == null) {
+            UserGroupInformation currentUgi = 
UserGroupInformation.getCurrentUser();
+            if (currentUgi != null)
+                user = currentUgi.getShortUserName();
+        }
+
+        // 3rd, get the default system (process owner) user name (defaults to 
"anonymous" in case of null):
+        if (user == null)
+            user = DFLT_USER_NAME;
+
+        assert user != null;
+
+        user = U.fixUserName(user);
+
+        return user;
+    }
+
+    /**
      * Public setter that can be used by direct users of FS or Visor.
      *
      * @param colocateFileWrites Whether all ongoing file writes should be 
colocated.
@@ -221,7 +257,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             uriAuthority = uri.getAuthority();
 
-            setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+            user = getHadoopUser(cfg);
+
+            //setUser(user);
 
             // Override sequential reads before prefetch if needed.
             seqReadsBeforePrefetch = parameter(cfg, 
PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -244,7 +282,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() 
: null;
 
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, 
user);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -289,11 +327,10 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
                 String secUri = props.get(SECONDARY_FS_URI);
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-                String secUserName = props.get(SECONDARY_FS_USER_NAME);
 
                 try {
                     SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(secUri, secConfPath,
-                        secUserName);
+                        user);
 
                     secondaryFs = secProvider.createFileSystem();
                     secondaryUri = secProvider.uri();
@@ -306,6 +343,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
                             "will have no effect): " + e.getMessage());
                 }
             }
+
+            // set working directory to the hone directory of the current Fs 
user:
+            setWorkingDirectory(null);
         }
         finally {
             leaveBusy();
@@ -849,7 +889,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
     /** {@inheritDoc} */
     @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
+        Path path = new Path("/user/" + user/*userName.get()*/);
 
         return path.makeQualified(getUri(), null);
     }
@@ -859,10 +899,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
      *
      * @param userName User name.
      */
+    @Deprecated // TODO: remove this method.
     public void setUser(String userName) {
-        this.userName.set(userName);
+        //System.out.println(this + ": ##### setting user = " + userName + ", 
thread = " + Thread.currentThread());
+        assert F.eq(user, userName);
+        //this.userName.set(userName);
 
-        setWorkingDirectory(null);
+        //setWorkingDirectory(null);
     }
 
     /** {@inheritDoc} */
@@ -873,7 +916,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(homeDir));
 
-            workingDir.set(homeDir);
+            workingDir = homeDir; //.set(homeDir);
         }
         else {
             Path fixedNewPath = fixRelativePart(newPath);
@@ -886,13 +929,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
 
-            workingDir.set(fixedNewPath);
+            workingDir = fixedNewPath; //.set(fixedNewPath);
         }
     }
 
     /** {@inheritDoc} */
     @Override public Path getWorkingDirectory() {
-        return workingDir.get();
+        return workingDir; //.get();
     }
 
     /** {@inheritDoc} */
@@ -1153,7 +1196,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             return null;
 
         return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
-            new IgfsPath(convert(workingDir.get()), path.toUri().getPath());
+            new IgfsPath(convert(workingDir), path.toUri().getPath());
     }
 
     /**
@@ -1191,9 +1234,16 @@ public class IgniteHadoopFileSystem extends FileSystem {
      */
     @SuppressWarnings("deprecation")
     private FileStatus convert(IgfsFile file) {
-        return new FileStatus(file.length(), file.isDirectory(), 
getDefaultReplication(),
-            file.groupBlockSize(), file.modificationTime(), file.accessTime(), 
permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME), 
file.property(PROP_GROUP_NAME, "users"),
+        return new FileStatus(
+            file.length(),
+            file.isDirectory(),
+            getDefaultReplication(),
+            file.groupBlockSize(),
+            file.modificationTime(),
+            file.accessTime(),
+            permission(file),
+            file.property(PROP_USER_NAME, user),
+            file.property(PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
                 return "FileStatus [path=" + getPath() + ", isDir=" + isDir() 
+ ", len=" + getLen() +
@@ -1247,4 +1297,20 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override public String toString() {
         return S.toString(IgniteHadoopFileSystem.class, this);
     }
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
+
+    /**
+     * Getter for secondaryFs field.
+     * @return the secondary file system, if any.
+     */
+    public FileSystem secondaryFs() {
+        return secondaryFs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 9cfb79b..71413f5 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
@@ -40,6 +39,7 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
 import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*;
@@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
     /** Grid remote client. */
     private HadoopIgfsWrapper rmtClient;
 
+    /** The name of the user this File System created on behalf of. */
+    private final String user;
+
     /** Working directory. */
     private IgfsPath workingDir;
 
     /** URI. */
-    private URI uri;
+    private final URI uri;
 
     /** Authority. */
     private String uriAuthority;
@@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 
         uri = name;
 
+        user = getHadoopUser(cfg);
+
         try {
             initialize(name, cfg);
         }
@@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
             throw e;
         }
 
-        workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, 
DFLT_USER_NAME));
+        workingDir = new IgfsPath("/user/" + user);
     }
 
     /** {@inheritDoc} */
@@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() 
: null;
 
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, 
user);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -284,11 +289,10 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 
                 String secUri = props.get(SECONDARY_FS_URI);
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-                String secUserName = props.get(SECONDARY_FS_USER_NAME);
 
                 try {
                     SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(secUri, secConfPath,
-                        secUserName);
+                        user);
 
                     secondaryFs = secProvider.createAbstractFileSystem();
                     secondaryUri = secProvider.uri();
@@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
             file.modificationTime(),
             file.accessTime(),
             permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME),
+            file.property(PROP_USER_NAME, user),
             file.property(PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
@@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
     @Override public String toString() {
         return S.toString(IgniteHadoopFileSystem.class, this);
     }
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 00be422..75c8a49 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -126,11 +126,13 @@ public class HadoopUtils {
                 break;
 
             case PHASE_REDUCE:
-                assert status.totalReducerCnt() > 0;
-
                 setupProgress = 1;
                 mapProgress = 1;
-                reduceProgress = 1f - status.pendingReducerCnt() / 
(float)status.totalReducerCnt();
+
+                if (status.totalReducerCnt() > 0)
+                    reduceProgress = 1f - status.pendingReducerCnt() / 
(float)status.totalReducerCnt();
+                else
+                    reduceProgress = 1f;
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index 27805f8..ca4da3c 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.net.*;
+import java.security.*;
 
 /**
  * Encapsulates logic of secondary filesystem creation.
@@ -109,10 +111,28 @@ public class SecondaryFileSystemProvider {
 
     /**
      * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for 
this secondary Fs.
-     * @throws IOException
+     * @throws IOException in case of error.
      */
     public AbstractFileSystem createAbstractFileSystem() throws IOException {
-        return AbstractFileSystem.get(uri, cfg);
+        if (userName == null)
+            return AbstractFileSystem.get(uri, cfg);
+        else {
+            String ticketCachePath = 
cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+
+            UserGroupInformation ugi = 
UserGroupInformation.getBestUGI(ticketCachePath, userName);
+
+            try {
+                return ugi.doAs(new 
PrivilegedExceptionAction<AbstractFileSystem>() {
+                    @Override public AbstractFileSystem run() throws 
IOException {
+                        return AbstractFileSystem.get(uri, cfg);
+                    }
+                });
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+
+                throw new IOException("Failed to create file system due to 
interrupt.", ie);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
index 509f443..6204a2a 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
@@ -1,91 +1,108 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class HadoopDistributedFileSystem extends DistributedFileSystem {
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws 
IOException {
-        super.initialize(uri, conf);
-
-        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(getHomeDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
-
-        return path.makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        Path fixedDir = fixRelativePart(dir);
-
-        String res = fixedDir.toUri().getPath();
-
-        if (!DFSUtil.isValidName(res))
-            throw new IllegalArgumentException("Invalid DFS directory name " + 
res);
-
-        workingDir.set(fixedDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workingDir.get();
-    }
-}
+///*
+//* Licensed to the Apache Software Foundation (ASF) under one or more
+//* contributor license agreements.  See the NOTICE file distributed with
+//* this work for additional information regarding copyright ownership.
+//* The ASF licenses this file to You under the Apache License, Version 2.0
+//* (the "License"); you may not use this file except in compliance with
+//* the License.  You may obtain a copy of the License at
+//*
+//*      http://www.apache.org/licenses/LICENSE-2.0
+//*
+//* Unless required by applicable law or agreed to in writing, software
+//* distributed under the License is distributed on an "AS IS" BASIS,
+//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//* See the License for the specific language governing permissions and
+//* limitations under the License.
+//*/
+//
+//package org.apache.ignite.internal.processors.hadoop.fs;
+//
+//import org.apache.hadoop.conf.*;
+//import org.apache.hadoop.fs.*;
+//import org.apache.hadoop.hdfs.*;
+//import org.apache.hadoop.security.*;
+//import org.apache.ignite.internal.util.typedef.*;
+//
+//import java.io.*;
+//import java.lang.reflect.*;
+//import java.net.*;
+//
+//import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+//import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*;
+//
+///**
+//* Wrapper of HDFS for support of separated working directory.
+//*/
+//public class HadoopDistributedFileSystem extends DistributedFileSystem {
+////    /** User name for each thread. */
+////    private final ThreadLocal<String> userName = new ThreadLocal<String>() 
{
+////        /** {@inheritDoc} */
+////        @Override protected String initialValue() {
+////            return DFLT_USER_NAME;
+////        }
+////    };
+////
+////    /** Working directory for each thread. */
+////    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
+////        /** {@inheritDoc} */
+////        @Override protected Path initialValue() {
+////            return getHomeDirectory();
+////        }
+////    };
+//
+//    /** {@inheritDoc} */
+//    @Override public void initialize(URI uri, Configuration conf) throws 
IOException {
+//        super.initialize(uri, conf);
+//
+//        //setUser(getHadoopUser(conf));
+//    }
+//
+//    /**
+//     * Set user name and default working directory for current thread.
+//     *
+//     * @param userName User name.
+//     */
+//    @Deprecated
+//    public void setUser(String userName) {
+//        //System.out.println(this + ": ##### setting user = " + userName + 
", thread = " + Thread.currentThread());
+//
+//        DFSClient dfsClient = getClient();
+//
+//        try {
+//            Field f = DFSClient.class.getDeclaredField("ugi");
+//            f.setAccessible(true);
+//            UserGroupInformation ugi = 
(UserGroupInformation)f.get(dfsClient);
+//            assert F.eq(ugi.getShortUserName(), userName);
+//        } catch (Throwable t) {
+//            t.printStackTrace();
+//        }
+//
+//        //this.userName.set(userName);
+//
+//        //setWorkingDirectory(getHomeDirectory());
+//    }
+//
+////    /** {@inheritDoc} */
+////    @Override public Path getHomeDirectory() {
+////        Path path = new Path("/user/" + userName.get());
+////
+////        return path.makeQualified(getUri(), null);
+////    }
+////
+////    /** {@inheritDoc} */
+////    @Override public void setWorkingDirectory(Path dir) {
+////        Path fixedDir = fixRelativePart(dir);
+////
+////        String res = fixedDir.toUri().getPath();
+////
+////        if (!DFSUtil.isValidName(res))
+////            throw new IllegalArgumentException("Invalid DFS directory name 
" + res);
+////
+////        workingDir.set(fixedDir);
+////    }
+////
+////    /** {@inheritDoc} */
+////    @Override public Path getWorkingDirectory() {
+////        return workingDir.get();
+////    }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index f3f51d4..7631ae9 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -29,18 +29,19 @@ public class HadoopFileSystemsUtils {
     /** Name of the property for setting working directory on create new local 
FS instance. */
     public static final String LOC_FS_WORK_DIR_PROP = "fs." + 
FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
 
-    /**
-     * Set user name and default working directory for current thread if it's 
supported by file system.
-     *
-     * @param fs File system.
-     * @param userName User name.
-     */
-    public static void setUser(FileSystem fs, String userName) {
-        if (fs instanceof IgniteHadoopFileSystem)
-            ((IgniteHadoopFileSystem)fs).setUser(userName);
-        else if (fs instanceof HadoopDistributedFileSystem)
-            ((HadoopDistributedFileSystem)fs).setUser(userName);
-    }
+//    /**
+//     * Set user name and default working directory for current thread if 
it's supported by file system.
+//     *
+//     * @param fs File system.
+//     * @param userName User name.
+//     */
+//    @Deprecated // TODO: remove this method.
+//    public static void setUser(FileSystem fs, String userName) {
+//        if (fs instanceof IgniteHadoopFileSystem)
+//            ((IgniteHadoopFileSystem)fs).setUser(userName);
+////        else if (fs instanceof HadoopDistributedFileSystem)
+////            ((HadoopDistributedFileSystem)fs).setUser(userName);
+//    }
 
     /**
      * Setup wrappers of filesystems to support the separate working directory.
@@ -52,6 +53,7 @@ public class HadoopFileSystemsUtils {
         cfg.set("fs.AbstractFileSystem." + 
FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
                 HadoopLocalFileSystemV2.class.getName());
 
-        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", 
HadoopDistributedFileSystem.class.getName());
+//        // TODO: this should be removed:
+//        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", 
HadoopDistributedFileSystem.class.getName());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
index 2f19226..b9c5113 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs {
      * @throws IOException If failed.
      */
     public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+    /**
+     * The user this Igfs instance works on behalf of.
+     * @return the user name.
+     */
+    public String user();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 44e531e..df17198 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -46,14 +46,22 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** Logger. */
     private final Log log;
 
+    /** The user this Igfs works on behalf of. */
+    private final String user;
+
     /**
      * Constructor.
      *
      * @param igfs Target IGFS.
      * @param log Log.
      */
-    public HadoopIgfsInProc(IgfsEx igfs, Log log) {
-        this.igfs = igfs;
+    public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws 
IgniteCheckedException {
+        this.user = userName;
+
+        this.igfs = igfs.forUser(userName);
+
+        assert this.user == this.igfs.user();
+
         this.log = log;
 
         bufSize = igfs.configuration().getBlockSize() * 2;
@@ -407,4 +415,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
         if (lsnr0 != null && log.isDebugEnabled())
             log.debug("Removed stream event listener [delegate=" + delegate + 
']');
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
index 7d590c7..856a4fb 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.*;
 @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
 public class HadoopIgfsIpcIo implements HadoopIgfsIo {
     /** Logger. */
-    private Log log;
+    private final Log log;
 
     /** Request futures map. */
     private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 7dca049..9cee867 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
     /** IGFS name. */
     private final String igfs;
 
+    /** The user this out proc is performing on behalf of. */
+    private final String userName;
+
     /** Client log. */
     private final Log log;
 
@@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, 
Log log) throws IOException {
-        this(host, port, grid, igfs, false, log);
+    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, 
Log log, String user) throws IOException {
+        this(host, port, grid, igfs, false, log, user);
     }
 
     /**
@@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) 
throws IOException {
-        this(null, port, grid, igfs, true, log);
+    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, 
String user) throws IOException {
+        this(null, port, grid, igfs, true, log, user);
     }
 
     /**
@@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, 
boolean shmem, Log log)
+    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, 
boolean shmem, Log log, String user)
         throws IOException {
         assert host != null && !shmem || host == null && shmem :
             "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" 
+ shmem + ']';
@@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         this.grid = grid;
         this.igfs = igfs;
         this.log = log;
+        this.userName = user;
 
         io = HadoopIgfsIpcIo.get(log, endpoint);
 
@@ -148,6 +152,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
     @Override public IgfsHandshakeResponse handshake(String logDir) throws 
IgniteCheckedException {
         final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
 
+        req.userName(userName);
         req.gridName(grid);
         req.igfsName(igfs);
         req.logDirectory(logDir);
@@ -173,6 +178,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
 
         msg.command(INFO);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_RES).get();
     }
@@ -184,6 +190,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(UPDATE);
         msg.path(path);
         msg.properties(props);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_RES).get();
     }
@@ -196,6 +203,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.path(path);
         msg.accessTime(accessTime);
         msg.modificationTime(modificationTime);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -207,6 +215,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(RENAME);
         msg.path(src);
         msg.destinationPath(dest);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -218,6 +227,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(DELETE);
         msg.path(path);
         msg.flag(recursive);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -231,6 +241,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.path(path);
         msg.start(start);
         msg.length(len);
+        msg.userName(userName);
 
         return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
     }
@@ -241,6 +252,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
 
         msg.command(PATH_SUMMARY);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(SUMMARY_RES).get();
     }
@@ -252,6 +264,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(MAKE_DIRECTORIES);
         msg.path(path);
         msg.properties(props);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -262,6 +275,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
 
         msg.command(LIST_FILES);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_COL_RES).get();
     }
@@ -272,6 +286,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
 
         msg.command(LIST_PATHS);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(PATH_COL_RES).get();
     }
@@ -288,6 +303,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(OPEN_READ);
         msg.path(path);
         msg.flag(false);
+        msg.userName(userName);
 
         IgfsInputStreamDescriptor rmtDesc = 
io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
 
@@ -303,6 +319,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.path(path);
         msg.flag(true);
         msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+        msg.userName(userName);
 
         IgfsInputStreamDescriptor rmtDesc = 
io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
 
@@ -321,6 +338,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.properties(props);
         msg.replication(replication);
         msg.blockSize(blockSize);
+        msg.userName(userName);
 
         Long streamId = io.send(msg).chain(LONG_RES).get();
 
@@ -336,6 +354,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.path(path);
         msg.flag(create);
         msg.properties(props);
+        msg.userName(userName);
 
         Long streamId = io.send(msg).chain(LONG_RES).get();
 
@@ -471,4 +490,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
             }
         };
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return userName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index 1dada21..1574152 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -55,6 +55,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
     /** Logger. */
     private final Log log;
 
+    private final String userName;
+
     /**
      * Constructor.
      *
@@ -63,13 +65,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
      * @param conf Configuration.
      * @param log Current logger.
      */
-    public HadoopIgfsWrapper(String authority, String logDir, Configuration 
conf, Log log) throws IOException {
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration 
conf, Log log, String user) throws IOException {
         try {
             this.authority = authority;
             this.endpoint = new HadoopIgfsEndpoint(authority);
             this.logDir = logDir;
             this.conf = conf;
             this.log = log;
+            this.userName = user;
         }
         catch (IgniteCheckedException e) {
             throw new IOException("Failed to parse endpoint: " + authority, e);
@@ -362,7 +365,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 HadoopIgfsEx hadoop = null;
 
                 try {
-                    hadoop = new HadoopIgfsInProc(igfs, log);
+                    hadoop = new HadoopIgfsInProc(igfs, log, userName);
 
                     curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
                 }
@@ -384,7 +387,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 HadoopIgfsEx hadoop = null;
 
                 try {
-                    hadoop = new HadoopIgfsOutProc(endpoint.port(), 
endpoint.grid(), endpoint.igfs(), log);
+                    hadoop = new HadoopIgfsOutProc(endpoint.port(), 
endpoint.grid(), endpoint.igfs(), log, userName);
 
                     curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
                 }
@@ -409,7 +412,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
 
                 try {
                     hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), 
endpoint.grid(), endpoint.igfs(),
-                        log);
+                        log, userName);
 
                     curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
                 }
@@ -430,7 +433,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
             HadoopIgfsEx hadoop = null;
 
             try {
-                hadoop = new HadoopIgfsOutProc(endpoint.host(), 
endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+                hadoop = new HadoopIgfsOutProc(endpoint.host(), 
endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                    log, userName);
 
                 curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 2b36267..9d7125a 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -17,13 +17,21 @@
 
 package org.apache.ignite.internal.processors.hadoop.taskexecutor;
 
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.io.*;
+import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -99,6 +107,44 @@ public abstract class HadoopRunnableTask implements 
Callable<Void> {
 
     /** {@inheritDoc} */
     @Override public Void call() throws IgniteCheckedException {
+        final String user = job.info().user();
+
+        assert !F.isEmpty(user);
+
+        if (F.eq(user, FileSystemConfiguration.DFLT_USER_NAME))
+            // do direct call:
+            return callImpl();
+        else {
+            // do the call in the context of 'user':
+            try {
+                final String ticketCachePath;
+
+                if (job instanceof HadoopV2Job) {
+                    Configuration conf = ((HadoopV2Job)job).jobConf();
+
+                    ticketCachePath = 
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+                } else
+                    ticketCachePath = 
job.info().property(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+
+                UserGroupInformation ugi = 
UserGroupInformation.getBestUGI(ticketCachePath, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                    @Override public Void run() throws IgniteCheckedException {
+                        return callImpl();
+                    }
+                });
+            } catch (IOException | InterruptedException e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
+    }
+
+    /**
+     * Runnable task call implementation
+     * @return
+     * @throws IgniteCheckedException
+     */
+    Void callImpl() throws IgniteCheckedException {
         execStartTs = U.currentTimeMillis();
 
         Throwable err = null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index e3c2bfa..ac10687 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -40,6 +40,7 @@ import java.util.Queue;
 import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+import static 
org.apache.ignite.internal.processors.hadoop.v2.HadoopV2JobResourceManager.*;
 
 /**
  * Hadoop job implementation for v2 API.
@@ -68,7 +69,7 @@ public class HadoopV2Job implements HadoopJob {
         new ConcurrentHashMap8<>();
 
     /** Pooling task context class and thus class loading environment. */
-    private final Queue<Class<?>> taskCtxClsPool = new 
ConcurrentLinkedQueue<>();
+    private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = 
new ConcurrentLinkedQueue<>();
 
     /** All created contexts. */
     private final Queue<Class<?>> fullCtxClsQueue = new 
ConcurrentLinkedDeque<>();
@@ -139,7 +140,7 @@ public class HadoopV2Job implements HadoopJob {
 
             Path jobDir = new Path(jobDirPath);
 
-            try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
+            try (FileSystem fs = fileSystemForUser(jobDir.toUri(), jobConf)) {
                 JobSplit.TaskSplitMetaInfo[] metaInfos = 
SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
 
@@ -194,7 +195,7 @@ public class HadoopV2Job implements HadoopJob {
         if (old != null)
             return old.get();
 
-        Class<?> cls = taskCtxClsPool.poll();
+        Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
 
         try {
             if (cls == null) {
@@ -204,7 +205,7 @@ public class HadoopV2Job implements HadoopJob {
                 HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
                     "hadoop-" + info.jobId() + "-" + info.type() + "-" + 
info.taskNumber());
 
-                cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+                cls = (Class<? extends 
HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
 
                 fullCtxClsQueue.add(cls);
             }
@@ -263,6 +264,8 @@ public class HadoopV2Job implements HadoopJob {
                 if (jobLocDir.exists())
                     U.delete(jobLocDir);
             }
+//
+//            disposeFileSystem();
         }
         finally {
             taskCtxClsPool.clear();
@@ -297,6 +300,25 @@ public class HadoopV2Job implements HadoopJob {
         }
     }
 
+//    /**
+//     * Closes the underlying file system.
+//     * @throws IgniteCheckedException on error.
+//     */
+//    private void disposeFileSystem() throws IgniteCheckedException {
+//        FileSystem fs0 = fs;
+//
+//        try {
+//            if (fs0 != null)
+//                fs0.close();
+//        }
+//        catch (IOException ioe) {
+//            throw new IgniteCheckedException(ioe);
+//        }
+//        finally {
+//            fs = null;
+//        }
+//    }
+
     /** {@inheritDoc} */
     @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
         rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info));
@@ -319,4 +341,12 @@ public class HadoopV2Job implements HadoopJob {
         if (rsrcMgr != null)
             rsrcMgr.cleanupStagingDirectory();
     }
+
+    /**
+     * Getter for job configuration.
+     * @return
+     */
+    public JobConf jobConf() {
+        return jobConf;
+    }
 }

Reply via email to