[IGNITE-218]: intermediate commit.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3efeeaef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3efeeaef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3efeeaef

Branch: refs/heads/ignite-218
Commit: 3efeeaefb18c15e2b51c0099ee56dc4b157851b0
Parents: 399743c
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Thu Apr 16 17:13:14 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Thu Apr 16 17:13:14 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/igfs/IgfsUserContext.java |  40 +--
 .../igfs/secondary/IgfsSecondaryFileSystem.java |  14 --
 .../igfs/common/IgfsHandshakeRequest.java       |  14 --
 .../internal/igfs/common/IgfsMarshaller.java    |   4 -
 .../igfs/common/IgfsPathControlRequest.java     |   2 +-
 .../internal/processors/igfs/IgfsAsyncImpl.java |  11 -
 .../ignite/internal/processors/igfs/IgfsEx.java |  23 +-
 .../internal/processors/igfs/IgfsImpl.java      | 175 +------------
 .../processors/igfs/IgfsIpcHandler.java         |   4 +-
 .../processors/igfs/IgniteUserContext.java      | 168 -------------
 .../ignite/internal/util/IgniteUtils.java       |  12 -
 .../apache/ignite/hadoop/fs/ExpirableMap.java   | 247 -------------------
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 162 +++++++-----
 .../ignite/hadoop/fs/LazyConcurrentMap.java     | 156 ++++++++++++
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  54 ++--
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   5 +-
 .../hadoop/SecondaryFileSystemProvider.java     |  35 +--
 .../hadoop/igfs/HadoopIgfsInProc.java           |   4 +-
 .../hadoop/igfs/HadoopIgfsOutProc.java          |   1 -
 ...oopSecondaryFileSystemConfigurationTest.java |   4 +-
 .../HadoopDefaultMapReducePlannerSelfTest.java  |  10 -
 21 files changed, 345 insertions(+), 800 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java 
b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
index 83b6691..ebbe1c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -2,22 +2,17 @@ package org.apache.ignite.igfs;
 
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
 import java.util.concurrent.*;
 
 /**
  * Provides ability to execute IGFS code in a context of a specific user.
  */
 public abstract class IgfsUserContext {
-    /**
-     * Thread local to hold the current user context.
-     */
-    private static final ThreadLocal<String> userStackThreadLocal = new 
ThreadLocal<String>() {
-        @Override protected String initialValue() {
-            String dfltUser = FileSystemConfiguration.DFLT_USER_NAME;
-
-            return dfltUser.intern();
-        }
-    };
+    /** Thread local to hold the current user context. */
+    private static final ThreadLocal<String> userStackThreadLocal = new 
ThreadLocal<>();
 
     /**
      * Executes given callable in the given user context.
@@ -27,12 +22,16 @@ public abstract class IgfsUserContext {
      * @param cllbl the callable to execute
      * @param <T> The type of callable result.
      * @return the result of
-     * @throws NullPointerException if 'user' or 'callble' is null
+     * @throws NullPointerException if callable is null
+     * @throws IllegalArgumentException if user name is null or empty String.
      * @throws IgniteException if any Exception thrown from callable.call().
      * The contract is that if this method throws IgniteException, this 
IgniteException getCause() method
      * must return exactly the Exception thrown from the callable.
      */
     public static <T> T doAs(String user, final Callable<T> cllbl) {
+        if (F.isEmpty(user))
+            throw new IllegalArgumentException("Failed to use null or empty 
user name.");
+
         user = user.intern();
 
         final String ctxUser = userStackThreadLocal.get();
@@ -57,13 +56,26 @@ public abstract class IgfsUserContext {
 
     /**
      * Gets the current context user.
-     * If this method is invoked outside of any doAs(), it will return the 
default user
-     * name as defined in {@link FileSystemConfiguration#DFLT_USER_NAME}.
+     * If this method is invoked outside of any doAs() on call stack, it will 
return null.
      * Note that the returned user name is always interned, so
      * you may compare the names using '==' reference equality.
      * @return the current user, never null.
      */
-    public static String currentUser() {
+    @Nullable public static String currentUser() {
         return userStackThreadLocal.get();
     }
+
+    /**
+     * Provides non-null interned user name.
+     * If the user name is null or empty string, defaults to {@link 
FileSystemConfiguration#DFLT_USER_NAME},
+     * which is the current process owner user.
+     * @param user a user name to be fixed.
+     * @return non-null interned user name.
+     */
+    public static String fixUserName(@Nullable String user) {
+        if (F.isEmpty(user))
+           user = FileSystemConfiguration.DFLT_USER_NAME;
+
+        return user.intern();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
 
b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index c2c610c..9026eac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,18 +198,4 @@ public interface IgfsSecondaryFileSystem {
      * @return Map of properties.
      */
     public Map<String,String> properties();
-
-    /**
-     * Gets an instance of 2ndary Fs for the specified user (may be 'this').
-     * @param userName the user name
-     * @return the instance
-     * @throws IgniteCheckedException
-     */
-    public IgfsSecondaryFileSystem forUser(String userName) throws 
IgniteCheckedException;
-
-    /**
-     * The user name this 2ndary Fs works on behalf of.
-     * @return the user name.
-     */
-    public String getUser();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
index bf5892b..ec8ef6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java
@@ -31,10 +31,6 @@ public class IgfsHandshakeRequest extends IgfsMessage {
     /** Expected IGFS name. */
     private String igfsName;
 
-    // TODO: Remove.
-    /** User name the request is done on behalf of. */
-    private String userName;
-
     /** Logger directory. */
     private String logDir;
 
@@ -94,14 +90,4 @@ public class IgfsHandshakeRequest extends IgfsMessage {
     @Override public String toString() {
         return S.toString(IgfsHandshakeRequest.class, this);
     }
-
-    public final String userName() {
-        assert userName != null;
-
-        return userName;
-    }
-
-    public final void userName(String userName) {
-        this.userName = U.fixUserName(userName);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index 748fd7e..a4c7830 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -90,8 +90,6 @@ public class IgfsMarshaller {
 
                     IgfsHandshakeRequest req = (IgfsHandshakeRequest)msg;
 
-                    // TODO: Remove.
-                    U.writeString(out, req.userName());
                     U.writeString(out, req.gridName());
                     U.writeString(out, req.igfsName());
                     U.writeString(out, req.logDirectory());
@@ -209,8 +207,6 @@ public class IgfsMarshaller {
                 case HANDSHAKE: {
                     IgfsHandshakeRequest req = new IgfsHandshakeRequest();
 
-                    // TODO: Remove.
-                    req.userName(U.readString(in));
                     req.gridName(U.readString(in));
                     req.igfsName(U.readString(in));
                     req.logDirectory(U.readString(in));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index 99a946a..55495d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -245,6 +245,6 @@ public class IgfsPathControlRequest extends IgfsMessage {
     }
 
     public final void userName(String userName) {
-        this.userName = U.fixUserName(userName);
+        this.userName = IgfsUserContext.fixUserName(userName);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 5fb9410..8099985 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -314,15 +314,4 @@ public class IgfsAsyncImpl extends 
AsyncSupportAdapter<IgniteFileSystem> impleme
     @Override public IgfsSecondaryFileSystem asSecondary() {
         return igfs.asSecondary();
     }
-
-    // TODO: Remove.
-    /** {@inheritDoc} */
-    @Override public IgfsEx forUser(String userName) throws 
IgniteCheckedException {
-        return igfs.forUser(userName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String user() {
-        return igfs.user();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index c69d144..361f75f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem {
     /** Property name for URI of file system. */
     public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
 
-    /** Property name for user name of file system. */
-    public static final String SECONDARY_FS_USER_NAME = 
"SECONDARY_FS_USER_NAME";
+    /** Property name for default user name of file system.
+     * NOTE: for secondary file system this is just a default user name, which 
is used
+     * when the 2ndary filesystem is used outside of any user context.
+     * If another user name is set in the context, 2ndary file system will 
work on behalf
+     * of that user, which is different from the default. */
+     public static final String SECONDARY_FS_USER_NAME = 
"SECONDARY_FS_USER_NAME";
 
     /**
      * Stops IGFS cleaning all used resources.
@@ -171,19 +175,4 @@ public interface IgfsEx extends IgniteFileSystem {
      * @return Secondary file system wrapper.
      */
     public IgfsSecondaryFileSystem asSecondary();
-
-    // TODO: REmove.
-    /**
-     * TODO
-     * @param userName
-     * @return
-     * @throws IgniteCheckedException
-     */
-    public IgfsEx forUser(@Nullable String userName) throws 
IgniteCheckedException;
-
-    /**
-     * Getter for user name.
-     * @return user name.
-     */
-    public String user();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 36030a4..a0f1c3f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -43,7 +43,6 @@ import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import java.io.*;
-import java.lang.ref.*;
 import java.net.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -57,6 +56,7 @@ import static 
org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*;
 
 /**
  * Cache-based IGFS implementation.
+ * This is a singleton: only 1 IgfsImpl exists in Ignite node.
  */
 public class IgfsImpl implements IgfsEx {
     /** Default permissions for file system entry. */
@@ -77,9 +77,6 @@ public class IgfsImpl implements IgfsEx {
     /** FS configuration. */
     private final FileSystemConfiguration cfg;
 
-    /** The user name this Fs works on behalf of. */
-    private final String user;
-
     /** IGFS context. */
     private IgfsContext igfsCtx;
 
@@ -125,8 +122,6 @@ public class IgfsImpl implements IgfsEx {
     /** Eviction policy (if set). */
     private IgfsPerBlockLruEvictionPolicy evictPlc;
 
-    private final ConcurrentMap<String, Reference<IgfsImpl>> userToIgfsExMap = 
new ConcurrentHashMap<>();
-
     /** Pool for threads working in DUAL mode. */
     private IgniteThreadPoolExecutor dualPool;
 
@@ -148,63 +143,7 @@ public class IgfsImpl implements IgfsEx {
         meta = igfsCtx.meta();
         data = igfsCtx.data();
 
-        user = U.fixUserName(getSecondaryFsUser());
-
-        assert user != null;
-
-        initIgfsSecondaryFileSystem();
-
-        // Check whether IGFS LRU eviction policy is set on data cache.
-        String dataCacheName = igfsCtx.configuration().getDataCacheName();
-
-        for (CacheConfiguration cacheCfg : 
igfsCtx.kernalContext().config().getCacheConfiguration()) {
-            if (F.eq(dataCacheName, cacheCfg.getName())) {
-                EvictionPolicy evictPlc = cacheCfg.getEvictionPolicy();
-
-                if (evictPlc != null & evictPlc instanceof 
IgfsPerBlockLruEvictionPolicy)
-                    this.evictPlc = (IgfsPerBlockLruEvictionPolicy)evictPlc;
-
-                break;
-            }
-        }
-
-        topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
-        igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
-        igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
-    }
-
-    /**
-     * Gets secondary file system user, or null, if no secondary file system 
is present.
-     * @return the secondary file system user.
-     */
-    @Nullable protected String getSecondaryFsUser() {
-        FileSystemConfiguration fsCfg = igfsCtx.configuration();
-
-        if (fsCfg == null)
-            return null;
-
-        IgfsSecondaryFileSystem sec = fsCfg.getSecondaryFileSystem();
-
-        if (sec == null)
-            return null;
-
-        return sec.getUser();
-    }
-
-    /**
-     * Initializes 2ndary file system.
-     * @throws IgniteCheckedException on error
-     */
-    private void initIgfsSecondaryFileSystem() throws IgniteCheckedException {
-        // This is the "prototype" (in terms of GOF pattern) of the 2ndary Fs 
to be cloned
-        // for the specified user:
-        final IgfsSecondaryFileSystem secondaryFs0 = 
cfg.getSecondaryFileSystem();
-
-        if (secondaryFs0 == null)
-            secondaryFs = null;
-        else
-            secondaryFs = secondaryFs0.forUser(user);
+        secondaryFs = cfg.getSecondaryFileSystem();
 
         /* Default IGFS mode. */
         final IgfsMode dfltMode;
@@ -263,6 +202,9 @@ public class IgfsImpl implements IgfsEx {
         secondaryPaths = new IgfsPaths(secondaryFs == null ? null : 
secondaryFs.properties(), dfltMode,
             modeRslvr.modesOrdered());
 
+        dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, 
Integer.MAX_VALUE, 5000L,
+            new LinkedBlockingQueue<Runnable>(), new 
IgfsThreadFactory(cfg.getName()), null) : null;
+
         // Check whether IGFS LRU eviction policy is set on data cache.
         String dataCacheName = igfsCtx.configuration().getDataCacheName();
 
@@ -281,9 +223,6 @@ public class IgfsImpl implements IgfsEx {
 
         igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
         igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
-
-        dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, 
Integer.MAX_VALUE, 5000L,
-            new LinkedBlockingQueue<Runnable>(), new 
IgfsThreadFactory(cfg.getName()), null) : null;
     }
 
     /**
@@ -2169,87 +2108,6 @@ public class IgfsImpl implements IgfsEx {
             throw new IllegalStateException("Failed to perform IGFS action 
because grid is stopping.");
     }
 
-    /** {@inheritDoc} */
-    @Override public IgfsEx forUser(String userName) throws 
IgniteCheckedException {
-
-//        Wrapper w = map.get(userName);
-//
-//        if (w != null)
-//            return w.get();
-//        else {
-//            Wrapper newWrapper = new Wrapper();
-//            Wrapper old = map.putIfAbsent(userName, newWrapper);
-//
-//            if (old != null)
-//                return old.get();
-//            else
-//                return newWrapper.init();
-//        }
-
-
-        final String userFixed = U.fixUserName(userName);
-
-        if (this.user == userFixed)
-            return this; // same user
-
-        Reference<IgfsImpl> ref = userToIgfsExMap.get(userFixed);
-
-        IgfsImpl val = (ref == null) ? null : ref.get();
-
-        if (val == null) {
-            // Create impl from the same context but with different user:
-            IgfsImpl newVal = new IgfsImpl(igfsCtx) {
-                @Override protected String getSecondaryFsUser() {
-                    return userFixed;
-                }
-            };
-
-
-
-            final Reference<IgfsImpl> newRef = new WeakReference<>(newVal);
-
-            while (true) {
-                ref = userToIgfsExMap.get(userFixed);
-
-                if (ref == null) {
-                    ref = userToIgfsExMap.putIfAbsent(userFixed, newRef);
-
-                    if (ref == null) {
-                        ref = newRef;
-                        val = newVal;
-
-                        break; // ref replaced with newRef
-                    }
-                }
-
-                val = ref.get();
-
-                if (val != null)
-                    break; // there is an existing value
-
-                boolean replaced = userToIgfsExMap.replace(userFixed, ref, 
newRef);
-
-                if (replaced) {
-                    ref = newRef;
-                    val = newVal;
-
-                    break;
-                }
-            }
-        }
-
-        assert val == ref.get();
-        assert val != null;
-        assert F.eq(val.user(), userFixed);
-
-        return val;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String user() {
-        return user;
-    }
-
     /**
      * IGFS thread factory.
      */
@@ -2280,27 +2138,4 @@ public class IgfsImpl implements IgfsEx {
             return t;
         }
     }
-
-//    private ConcurrentHashMap8<String, Wrapper> map;
-
-//    private static class Wrapper {
-//
-//        private IgfsSecondaryFileSystem fs;
-//
-//        private CountDownLatch latch;
-//
-//        public IgfsSecondaryFileSystem init() {
-//            // fs = ...
-//
-//            latch.countDown();
-//
-//            return fs;
-//        }
-//
-//        public IgfsSecondaryFileSystem get() {
-//            latch.await();
-//
-//            return fs;
-//        }
-//    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 4812c95..d8a8bdf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -254,7 +254,9 @@ class IgfsIpcHandler implements IgfsServerHandler {
 
         assert userName != null;
 
-        final IgfsEx userIgfs = igfs.forUser(userName);
+        final IgfsEx userIgfs = igfs; //.forUser(userName);
+
+        //IgfsUtils.setContextUser(userName);
 
         try {
             switch (cmd) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java
deleted file mode 100644
index 97b0631..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java
+++ /dev/null
@@ -1,168 +0,0 @@
-package org.apache.ignite.internal.processors.igfs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import javax.security.auth.*;
-import java.security.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- *
- */
-public abstract class IgniteUserContext {
-
-    private static final IgniteUserContext instance = new 
AccessControllerIgniteUserContext();
-
-    /**
-     *
-     * @param user
-     * @param callable
-     * @param <T>
-     * @return
-     * @throws IgniteCheckedException
-     */
-    public abstract <T> T doAs0 (String user, final Callable<T> callable) 
throws IgniteCheckedException;
-
-    /**
-     *
-     * @return
-     */
-    public abstract String getContextUser0();
-
-    /**
-     *
-     * @param user
-     * @param callable
-     * @param <T>
-     * @return
-     * @throws IgniteCheckedException
-     */
-    public static <T> T doAs(String user, final Callable<T> callable) throws 
IgniteCheckedException {
-        return instance.doAs0(user, callable);
-    }
-
-    /**
-     *
-     * @return
-     */
-    public static String getContextUser() {
-        return instance.getContextUser0();
-    }
-
-    private static final class IgnitePrincipal implements Principal {
-
-        private final String name;
-
-        public IgnitePrincipal(String name) {
-            this.name = name.intern();
-        }
-
-        @Override public String getName() {
-            return name;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            else if (o == null || getClass() != o.getClass())
-                return false;
-            else
-                //noinspection StringEquality
-                return name == ((IgnitePrincipal)o).name;
-        }
-
-        @Override public int hashCode() {
-            return name.hashCode();
-        }
-
-        @Override public String toString() {
-            return name;
-        }
-    }
-
-    static class AccessControllerIgniteUserContext extends IgniteUserContext {
-        /** {@inheritDoc} */
-        @Override public <T> T doAs0(String user, final Callable<T> callable) 
throws IgniteCheckedException {
-            user = user.intern();
-
-            try {
-                //noinspection StringEquality
-                if (getContextUser0() == user)
-                    return callable.call();
-
-                Subject subject = new Subject();
-
-                subject.getPrincipals().add(new IgnitePrincipal(user));
-
-                return Subject.doAs(subject, new 
PrivilegedExceptionAction<T>() {
-                    @Override public T run() throws Exception {
-                        return callable.call();
-                    }
-                });
-            } catch (Exception pae) {
-                throw U.cast(pae);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getContextUser0() {
-            AccessControlContext context = AccessController.getContext();
-
-            Subject subject = Subject.getSubject(context);
-
-            Set<IgnitePrincipal> set = 
subject.getPrincipals(IgnitePrincipal.class);
-
-            if (set.isEmpty())
-                return null;
-            else
-                return set.iterator().next().getName();
-        }
-    }
-
-    static class ThreadLocalIgniteUserContext extends IgniteUserContext {
-        private final ThreadLocal<Stack<String>> userStackThreadLocal = new 
ThreadLocal<Stack<String>>() {
-            @Override protected Stack<String> initialValue() {
-                return new Stack<>();
-            }
-        };
-
-        /** {@inheritDoc} */
-        @Override public <T> T doAs0(String user, Callable<T> callable) throws 
IgniteCheckedException {
-            user = user.intern();
-
-            final Stack<String> stack = userStackThreadLocal.get();
-
-            try {
-                //noinspection StringEquality
-                if (!stack.isEmpty() && stack.peek() == user)
-                    return callable.call(); // correct context is already there
-
-                stack.push(user);
-
-                try {
-                    return callable.call();
-                }
-                finally {
-                    String userPopped = stack.pop();
-                    //noinspection StringEquality
-                    assert user == userPopped;
-                }
-            } catch (Exception e) {
-                throw U.cast(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getContextUser0() {
-            Stack<String> stack = userStackThreadLocal.get();
-
-            if (stack.isEmpty())
-                return null;
-
-            return stack.peek();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 55785e0..1aac985 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9005,16 +9005,4 @@ public abstract class IgniteUtils {
 
         return hasShmem;
     }
-
-    /**
-     * Provides non-null interned user name
-     * @param user a user name.
-     * @return non-null interned user name
-     */
-    public static String fixUserName(@Nullable String user) {
-        if (user == null)
-           return "";
-
-        return user.intern();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java
deleted file mode 100644
index ed0cb97..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java
+++ /dev/null
@@ -1,247 +0,0 @@
-package org.apache.ignite.hadoop.fs;
-
-import org.apache.ignite.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import java.lang.ref.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Maps values by keys.
- * Uses affective strategy of object caching:
- * Values are lazily created via special {@link ValueFactory};
- *
- * When a value is not used longer than 'expirationTime', the value gets held 
by {@link WeakReference}, and may
- * disappear from the map, if it is no longer strongly reachable from the 
client code.
- * The values must implement {@link AccessTimeAware} interface in order to 
give information about last access
- * time to a value object.
- *
- * If a value has expired and has been removed from the map, the method {@link 
#get} will return null,
- * unless method {@link #getOrCreate} is invoked, which will create a new 
value and map it to the key again.
- */
-public class ExpirableMap<K, T extends ExpirableMap.AccessTimeAware> {
-
-    private final ConcurrentMap<K, Wrapper> map = new ConcurrentHashMap8<>();
-
-    private final ValueFactory<K, T> factory;
-
-    private final long expirationTimeMs;
-
-    private final ReferenceQueue<T> refQueue = new ReferenceQueue<>();
-
-    public ExpirableMap(ValueFactory<K, T> factory, final long 
expirationTimeMs) {
-        this.factory = factory;
-
-        this.expirationTimeMs = expirationTimeMs;
-
-        // Expiration checker thread:
-        Thread t = new Thread(new Runnable() {
-            @Override public void run() {
-                while (true) {
-                    System.out.println("checking expiration.");
-                    updateExpiration();
-                    removeStale();
-                    try {
-                        Thread.sleep(expirationTimeMs);
-                    } catch (InterruptedException ie) {
-                        break;
-                    }
-                }
-            }
-        });
-        t.setName("ExpirableMap expiration checker " + 
Integer.toHexString(this.hashCode()));
-        t.setDaemon(true);
-        t.start();
-    }
-
-    void updateExpiration() {
-        for (Wrapper w: map.values()) {
-            if (w != null)
-                w.checkExpired(expirationTimeMs);
-        }
-    }
-
-    /**
-     * Gets cached or creates a new value of V.
-     * @param k the key to associate the value with.
-     * @return the cached or newly created value, never null.
-     * @throws IgniteCheckedException on error
-     */
-    public T getOrCreate(K k) throws IgniteCheckedException {
-        final Wrapper w = new Wrapper(k);
-
-        try {
-            while (true) {
-                Wrapper wOld = map.putIfAbsent(k, w);
-
-                if (wOld == null) {
-                    // new wrapper 'w' has been put:
-                    w.init();
-
-                    return w.getValue();
-                }
-                else {
-                    // get the value from existing wrapper:
-                    T v = wOld.getValue();
-
-                    if (v != null)
-                        return v; // value found in the old wrapper.
-
-                    // The value expired and possibly destroyed.
-                    // We have to replace the wrapper with a new one:
-                    if (map.replace(k, wOld, w)) {
-                        w.init();
-
-                        return w.getValue();
-                    }
-                    // Somebody already replaced the wrapper, loop again.
-                }
-            }
-        }
-        catch (InterruptedException ie) {
-            throw new IgniteException(ie);
-        }
-    }
-
-    public @Nullable T get(K k) {
-        Wrapper w = map.get(k);
-
-        if (w == null)
-            return null;
-
-        try {
-            return w.getValue();
-        } catch (InterruptedException ie) {
-            throw new IgniteException(ie);
-        }
-    }
-
-    public Set<K> keySet() {
-        return map.keySet();
-    }
-
-    private class Wrapper {
-
-        private final CountDownLatch latch = new CountDownLatch(1);
-
-        private final K key;
-
-        private volatile T v;
-
-        private DataWeakReference<Wrapper, T> weakRef;
-
-        private Wrapper(K key) {
-            this.key = key;
-        }
-
-        private void init() throws IgniteCheckedException {
-            final T v0 = factory.createValue(key);
-
-            if (v0 == null)
-                throw new IgniteCheckedException("Failed to create value. 
[key=" + key + ']');
-
-            weakRef = new DataWeakReference<>(this, v0, refQueue);
-
-            v = v0;
-
-            latch.countDown();
-        }
-
-        /**
-         * Blocks until the value is initialized.
-         * @return
-         * @throws InterruptedException
-         */
-        @Nullable T getValue() throws InterruptedException {
-            latch.await();
-
-            T v0 = v;
-
-            if (v0 != null)
-                return v0;
-
-            // Value may be not reachable strongly (expired), but may still be 
reachable weakly:
-            return weakRef.get();
-        }
-
-        void checkExpired(long expirationTimeMs) {
-            T v0 = v;
-
-            if (v0 == null) // The value is already expired:
-                return;
-
-            long a = v0.accessTimeMs();
-
-            long usedAgo = System.currentTimeMillis() - a;
-
-            if (usedAgo >= expirationTimeMs) {
-                v = null; // null the strong reference; 'v' remains only 
weakly reachable.
-
-                System.out.println("expired: " + v0 );
-            }
-        }
-    }
-
-    void removeStale() {
-        DataWeakReference<Wrapper,T> ref;
-
-        while ((ref = (DataWeakReference<Wrapper,T>)refQueue.poll()) != null) {
-            Wrapper w = ref.getData();
-
-            K key = w.key;
-
-            boolean removed = map.remove(key, w);
-
-            System.out.println("dequeued: " + ref + " -> " + ref.get() + " 
removed: " + removed);
-        }
-    }
-
-    public static interface AccessTimeAware {
-        public long accessTimeMs();
-    }
-
-    /**
-     * Interface representing the factory that creates map values.
-     * @param <K>
-     * @param <V>
-     */
-    public interface ValueFactory <K, V> {
-        /**
-         * Creates the new value.
-         * @param key
-         * @return
-         * @throws IgniteCheckedException
-         */
-        public V createValue(K key) throws IgniteCheckedException;
-    }
-
-    /**
-     * Weak reference with an associated data object.
-     * @param <D> type of the data object.
-     * @param <V> type of the Reference referent.
-     */
-    private static class DataWeakReference <D, V> extends WeakReference<V> {
-        /** The data object. */
-        private final D data;
-
-        /**
-         * Guess, what is this?? Yes, this is Constructor!
-         * @param data
-         * @param referent
-         * @param q the reference refQueue to refQueue the reference into.
-         */
-        DataWeakReference(D data, V referent, ReferenceQueue q) {
-            super(referent, q);
-            this.data = data;
-        }
-
-        /**
-         * Getter for the data object.
-         */
-        D getData() {
-            return data;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 3c9cc8a..64b54e0 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.hadoop.igfs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.jetbrains.annotations.*;
+import org.apache.ignite.hadoop.fs.LazyConcurrentMap.*;
 
 import java.io.*;
 import java.net.*;
@@ -36,19 +37,41 @@ import java.util.*;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
 
 /**
- * Adapter to use any Hadoop file system {@link FileSystem} as  {@link 
IgfsSecondaryFileSystem}.
+ * Adapter to use any Hadoop file system {@link FileSystem} as {@link 
IgfsSecondaryFileSystem}.
+ * In fact, this class deals with different FileSystems depending on the user 
context,
+ * see {@link IgfsUserContext#currentUser()}.
  */
 public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSystem, AutoCloseable {
-    /** Hadoop file system. */
-    private final FileSystem fileSys;
-
     /** Properties of file system, see {@link #properties()}
-     * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
+     *
      * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH}
      * See {@link IgfsEx#SECONDARY_FS_URI}
+     * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
      * */
     private final Map<String, String> props = new HashMap<>();
 
+    /** Secondary file system provider. */
+    private final SecondaryFileSystemProvider secProvider;
+
+    /** The default user name. It is used if no user context is set. */
+    private final String dfltUserName;
+
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in 
#close() method. */
+    private volatile LazyConcurrentMap<String, FileSystem> fileSysLazyMap = 
new LazyConcurrentMap<>(
+        new ValueFactory<String, FileSystem>() {
+            @Override public FileSystem createValue(String key) {
+                try {
+                    assert !F.isEmpty(key);
+
+                    return secProvider.createFileSystem(key);
+                }
+                catch (IOException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
     /**
      * Simple constructor that is to be used by default.
      *
@@ -92,27 +115,32 @@ public class IgniteHadoopIgfsSecondaryFileSystem 
implements IgfsSecondaryFileSys
         if (F.isEmpty(userName))
             userName = null;
 
+        this.dfltUserName = IgfsUserContext.fixUserName(userName);
+
         try {
-            SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(uri, cfgPath, userName);
+            this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
 
-            fileSys = secProvider.createFileSystem();
+        // Test filesystem creation for the default user name.
+        // The value is stored in the 'fileSysLazyMap' cache.
+        FileSystem fileSys = fileSysLazyMap.getOrCreate(dfltUserName);
 
-            uri = secProvider.uri().toString();
+        assert fileSys != null;
 
-            if (!uri.endsWith("/"))
-                uri += "/";
+        uri = secProvider.uri().toString();
 
-            if (cfgPath != null)
-                props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+        if (!uri.endsWith("/"))
+            uri += "/";
 
-            if (userName != null)
-                props.put(SECONDARY_FS_USER_NAME, userName);
+        if (cfgPath != null)
+            props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
 
-            props.put(SECONDARY_FS_URI, uri);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
+        props.put(SECONDARY_FS_URI, uri);
+
+        props.put(SECONDARY_FS_USER_NAME, dfltUserName);
     }
 
     /**
@@ -122,7 +150,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
      * @return Hadoop path.
      */
     private Path convert(IgfsPath path) {
-        URI uri = fileSys.getUri();
+        URI uri = fileSysForUser().getUri();
 
         return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
     }
@@ -176,7 +204,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public boolean exists(IgfsPath path) {
         try {
-            return fileSys.exists(convert(path));
+            return fileSysForUser().exists(convert(path));
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to check file existence 
[path=" + path + "]");
@@ -187,6 +215,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, 
String> props) {
         HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
 
+        final FileSystem fileSys = fileSysForUser();
+
         try {
             if (props0.userName() != null || props0.groupName() != null)
                 fileSys.setOwner(convert(path), props0.userName(), 
props0.groupName());
@@ -206,7 +236,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     @Override public void rename(IgfsPath src, IgfsPath dest) {
         // Delegate to the secondary file system.
         try {
-            if (!fileSys.rename(convert(src), convert(dest)))
+            if (!fileSysForUser().rename(convert(src), convert(dest)))
                 throw new IgfsException("Failed to rename (secondary file 
system returned false) " +
                     "[src=" + src + ", dest=" + dest + ']');
         }
@@ -218,7 +248,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public boolean delete(IgfsPath path, boolean recursive) {
         try {
-            return fileSys.delete(convert(path), recursive);
+            return fileSysForUser().delete(convert(path), recursive);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to delete file [path=" + 
path + ", recursive=" + recursive + "]");
@@ -228,7 +258,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path) {
         try {
-            if (!fileSys.mkdirs(convert(path)))
+            if (!fileSysForUser().mkdirs(convert(path)))
                 throw new IgniteException("Failed to make directories [path=" 
+ path + "]");
         }
         catch (IOException e) {
@@ -239,7 +269,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> 
props) {
         try {
-            if (!fileSys.mkdirs(convert(path), new 
HadoopIgfsProperties(props).permission()))
+            if (!fileSysForUser().mkdirs(convert(path), new 
HadoopIgfsProperties(props).permission()))
                 throw new IgniteException("Failed to make directories [path=" 
+ path + ", props=" + props + "]");
         }
         catch (IOException e) {
@@ -250,7 +280,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
         try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
+            FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
 
             if (statuses == null)
                 throw new IgfsPathNotFoundException("Failed to list files 
(path not found): " + path);
@@ -273,7 +303,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
         try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
+            FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
 
             if (statuses == null)
                 throw new IgfsPathNotFoundException("Failed to list files 
(path not found): " + path);
@@ -300,13 +330,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem 
implements IgfsSecondaryFileSys
 
     /** {@inheritDoc} */
     @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath 
path, int bufSize) {
-        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, 
convert(path), bufSize);
+        return new 
HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), 
convert(path), bufSize);
     }
 
     /** {@inheritDoc} */
     @Override public OutputStream create(IgfsPath path, boolean overwrite) {
         try {
-            return fileSys.create(convert(path), overwrite);
+            return fileSysForUser().create(convert(path), overwrite);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to create file [path=" + 
path + ", overwrite=" + overwrite + "]");
@@ -320,7 +350,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
             new HadoopIgfsProperties(props != null ? props : 
Collections.<String, String>emptyMap());
 
         try {
-            return fileSys.create(convert(path), props0.permission(), 
overwrite, bufSize, (short)replication, blockSize,
+            return fileSysForUser().create(convert(path), props0.permission(), 
overwrite, bufSize, (short)replication, blockSize,
                 null);
         }
         catch (IOException e) {
@@ -334,7 +364,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     @Override public OutputStream append(IgfsPath path, int bufSize, boolean 
create,
         @Nullable Map<String, String> props) {
         try {
-            return fileSys.append(convert(path), bufSize);
+            return fileSysForUser().append(convert(path), bufSize);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to append file [path=" + 
path + ", bufSize=" + bufSize + "]");
@@ -344,7 +374,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public IgfsFile info(final IgfsPath path) {
         try {
-            final FileStatus status = fileSys.getFileStatus(convert(path));
+            final FileStatus status = 
fileSysForUser().getFileStatus(convert(path));
 
             if (status == null)
                 return null;
@@ -419,7 +449,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
         try {
             // We don't use FileSystem#getUsed() since it counts only the files
             // in the filesystem root, not all the files recursively.
-            return fileSys.getContentSummary(new Path("/")).getSpaceConsumed();
+            return fileSysForUser().getContentSummary(new 
Path("/")).getSpaceConsumed();
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to get used space size of 
file system.");
@@ -427,51 +457,63 @@ public class IgniteHadoopIgfsSecondaryFileSystem 
implements IgfsSecondaryFileSys
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Map<String, String> properties() {
+    @Override public Map<String, String> properties() {
         return props;
     }
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {
-        try {
-            fileSys.close();
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
+        final LazyConcurrentMap<String,FileSystem> map = fileSysLazyMap;
+
+        if (map == null)
+            return; // already cleared.
+
+        fileSysLazyMap = null; // 'this' will be unusable after #close().
+
+        List<IOException> ioExs = new LinkedList<>();
+
+        Set<String> keySet = map.keySet();
+
+        for (String key: keySet) {
+            FileSystem fs = map.get(key);
+
+            if (fs != null) {
+                try {
+                    fs.close();
+                }
+                catch (IOException ioe) {
+                    ioExs.add(ioe);
+                }
+            }
         }
+
+        map.clear();
+
+        if (!ioExs.isEmpty())
+            throw new IgniteCheckedException(ioExs.get(0));
     }
 
     /**
      * Gets the underlying {@link FileSystem}.
+     * This method is used solely for testing.
      * @return the underlying Hadoop {@link FileSystem}.
      */
     public FileSystem fileSystem() {
-        return fileSys;
+        return fileSysForUser();
     }
 
     /**
-     * TODO
-     * @param user2
-     * @return
-     * @throws IgniteCheckedException
+     * Gets the FileSystem for the current context user.
+     * @return the FileSystem instance, never null.
      */
-    @Override public IgfsSecondaryFileSystem forUser(String user2) throws 
IgniteCheckedException {
-        String user = props.get(SECONDARY_FS_USER_NAME);
-        if (F.eq(user, user2))
-            return this;
-        else {
-            String uri = props.get(SECONDARY_FS_URI);
-            String cfgPath = props.get(SECONDARY_FS_CONFIG_PATH);
+    private FileSystem fileSysForUser() {
+        String user = IgfsUserContext.currentUser();
 
-            return new IgniteHadoopIgfsSecondaryFileSystem(uri, cfgPath, 
user2);
-        }
-    }
+        if (F.isEmpty(user))
+            user = dfltUserName; // default is never empty.
 
-    /**
-     * TODO
-     * @return
-     */
-    @Override public String getUser() {
-        return props.get(SECONDARY_FS_USER_NAME);
+        assert !F.isEmpty(user);
+
+        return fileSysLazyMap.getOrCreate(user);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java
new file mode 100644
index 0000000..f2951f4
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java
@@ -0,0 +1,156 @@
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ */
+public class LazyConcurrentMap<K, V> {
+    /** The map storing the actual values. */
+    private final ConcurrentMap<K, ValueWrapper> map = new 
ConcurrentHashMap8<>();
+
+    /** The factory passed in by the client. Will be used for lazy value 
creation. */
+    private final ValueFactory<K, V> factory;
+
+    /**
+     * Constructor.
+     * @param factory the factory to create new values lazily.
+     */
+    public LazyConcurrentMap(ValueFactory<K, V> factory) {
+        this.factory = factory;
+    }
+
+    /**
+     * Gets cached or creates a new value of V.
+     * Never returns null.
+     * @param k the key to associate the value with.
+     * @return the cached or newly created value, never null.
+     * @throws IgniteException on error
+     */
+    public V getOrCreate(K k) {
+        final ValueWrapper wNew = new ValueWrapper(k);
+
+        ValueWrapper w = map.putIfAbsent(k, wNew);
+
+        if (w == null) {
+            // new wrapper 'w' has been put, so init the value:
+            wNew.init();
+
+            w = wNew;
+        }
+
+        try {
+            V v = w.getValue();
+
+            assert v != null;
+
+            return v;
+        }
+        catch (InterruptedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Gets the value without any attempt to create a new one.
+     * @param k the key
+     * @return the value, or null if there is no value for this key.
+     */
+    public @Nullable V get(K k) {
+        ValueWrapper w = map.get(k);
+
+        if (w == null)
+            return null;
+
+        try {
+            return w.getValue();
+        } catch (InterruptedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Gets the keySet of this map,
+     * the contract is as per {@link ConcurrentMap#keySet()}
+     * @return the set of keys, never null.
+     */
+    public Set<K> keySet() {
+        return map.keySet();
+    }
+
+    /**
+     * Clears the map.
+     * Follows the contract of {@link ConcurrentMap#clear()}
+     */
+    public void clear() {
+        map.clear();
+    }
+
+
+    /**
+     * Helper class that drives the lazy value creation.
+     */
+    private class ValueWrapper {
+        /** Value creation latch */
+        private final CountDownLatch vlueCrtLatch = new CountDownLatch(1);
+
+        /** the key */
+        private final K key;
+
+        /** the value */
+        private volatile V v;
+
+        /**
+         * Creates new wrapper.
+         */
+        private ValueWrapper(K key) {
+            this.key = key;
+        }
+
+        /**
+         * Initializes the value using the factory.
+         */
+        private void init() {
+            final V v0 = factory.createValue(key);
+
+            if (v0 == null)
+                throw new IgniteException("Failed to create non-null value. 
[key=" + key + ']');
+
+            v = v0;
+
+            vlueCrtLatch.countDown();
+        }
+
+        /**
+         * Blocks until the value is initialized.
+         * @return the value
+         * @throws InterruptedException
+         */
+        @Nullable V getValue() throws InterruptedException {
+            vlueCrtLatch.await();
+
+            return v;
+        }
+    }
+
+    /**
+     * Interface representing the factory that creates map values.
+     * @param <K> the type of the key.
+     * @param <V> the type of the value.
+     */
+    public interface ValueFactory <K, V> {
+        /**
+         * Creates the new value. Must never return null.
+         * @param key the key to create value for
+         * @return the value.
+         * @throws IgniteException on failure.
+         */
+        public V createValue(K key);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 2dd7ddb..0a02d88 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -194,26 +194,14 @@ public class IgniteHadoopFileSystem extends FileSystem {
     public static String getHadoopUser(@Nullable Configuration cfg) throws 
IOException {
         String user = null;
 
-        // First, try to get the user from MR Job configuration:
-        if (cfg != null)
-            user = cfg.get(MRJobConfig.USER_NAME);
-
-        // 2nd, try to get it from UserGroupInformation (may return any result 
if we're
-        // inside UserGroupInformation.doAs(...) closure):
-        if (user == null) {
-            UserGroupInformation currentUgi = 
UserGroupInformation.getCurrentUser();
-            if (currentUgi != null)
-                user = currentUgi.getShortUserName();
-        }
+        UserGroupInformation currentUgi = 
UserGroupInformation.getCurrentUser();
+        if (currentUgi != null)
+             user = currentUgi.getShortUserName();
 
-        // 3rd, get the default system (process owner) user name (defaults to 
"anonymous" in case of null):
-        if (user == null)
-            user = DFLT_USER_NAME;
+        user = IgfsUserContext.fixUserName(user);
 
         assert user != null;
 
-        user = U.fixUserName(user);
-
         return user;
     }
 
@@ -259,8 +247,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             user = getHadoopUser(cfg);
 
-            //setUser(user);
-
             // Override sequential reads before prefetch if needed.
             seqReadsBeforePrefetch = parameter(cfg, 
PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
 
@@ -329,10 +315,10 @@ public class IgniteHadoopFileSystem extends FileSystem {
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(secUri, secConfPath,
-                        user);
+                    SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(secUri, secConfPath);
+
+                    secondaryFs = secProvider.createFileSystem(user);
 
-                    secondaryFs = secProvider.createFileSystem();
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
@@ -894,19 +880,19 @@ public class IgniteHadoopFileSystem extends FileSystem {
         return path.makeQualified(getUri(), null);
     }
 
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    @Deprecated // TODO: remove this method.
-    public void setUser(String userName) {
-        //System.out.println(this + ": ##### setting user = " + userName + ", 
thread = " + Thread.currentThread());
-        assert F.eq(user, userName);
-        //this.userName.set(userName);
-
-        //setWorkingDirectory(null);
-    }
+//    /**
+//     * Set user name and default working directory for current thread.
+//     *
+//     * @param userName User name.
+//     */
+//    @Deprecated // TODO: remove this method.
+//    public void setUser(String userName) {
+//        //System.out.println(this + ": ##### setting user = " + userName + 
", thread = " + Thread.currentThread());
+//        assert F.eq(user, userName);
+//        //this.userName.set(userName);
+//
+//        //setWorkingDirectory(null);
+//    }
 
     /** {@inheritDoc} */
     @Override public void setWorkingDirectory(Path newPath) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 71413f5..fd97ed6 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -291,10 +291,9 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(secUri, secConfPath,
-                        user);
+                    SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(secUri, secConfPath);
 
-                    secondaryFs = secProvider.createAbstractFileSystem();
+                    secondaryFs = secProvider.createAbstractFileSystem(user);
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index ca4da3c..e49da8e 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.security.*;
+import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
@@ -38,8 +39,8 @@ public class SecondaryFileSystemProvider {
     /** The secondary filesystem URI, never null. */
     private final URI uri;
 
-    /** Optional user name to log into secondary filesystem with. */
-    private @Nullable final String userName;
+//    /** Optional user name to log into secondary filesystem with. */
+//    private @Nullable final String userName;
 
     /**
      * Creates new provider with given config parameters. The configuration 
URL is optional. The filesystem URI must be
@@ -49,12 +50,12 @@ public class SecondaryFileSystemProvider {
      * property in the provided configuration.
      * @param secConfPath the secondary Fs path (file path on the local file 
system, optional).
      * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path 
resolved.
-     * @param userName User name.
+     //* @param userName User name.
      * @throws IOException
      */
     public SecondaryFileSystemProvider(final @Nullable String secUri,
-        final @Nullable String secConfPath, @Nullable String userName) throws 
IOException {
-        this.userName = userName;
+        final @Nullable String secConfPath/*, @Nullable String userName*/) 
throws IOException {
+        //this.userName = userName;
 
         if (secConfPath != null) {
             URL url = U.resolveIgniteUrl(secConfPath);
@@ -90,12 +91,14 @@ public class SecondaryFileSystemProvider {
      * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this 
secondary Fs.
      * @throws IOException
      */
-    public FileSystem createFileSystem() throws IOException {
+    public FileSystem createFileSystem(String userName) throws IOException {
+        userName = IgfsUserContext.fixUserName(userName);
+
         final FileSystem fileSys;
 
-        if (userName == null)
-            fileSys = FileSystem.get(uri, cfg);
-        else {
+//        if (userName == null)
+//            fileSys = FileSystem.get(uri, cfg);
+//        else {
             try {
                 fileSys = FileSystem.get(uri, cfg, userName);
             }
@@ -104,7 +107,7 @@ public class SecondaryFileSystemProvider {
 
                 throw new IOException("Failed to create file system due to 
interrupt.", e);
             }
-        }
+//        }
 
         return fileSys;
     }
@@ -113,10 +116,12 @@ public class SecondaryFileSystemProvider {
      * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for 
this secondary Fs.
      * @throws IOException in case of error.
      */
-    public AbstractFileSystem createAbstractFileSystem() throws IOException {
-        if (userName == null)
-            return AbstractFileSystem.get(uri, cfg);
-        else {
+    public AbstractFileSystem createAbstractFileSystem(String userName) throws 
IOException {
+        userName = IgfsUserContext.fixUserName(userName);
+
+//        if (userName == null)
+//            return AbstractFileSystem.get(uri, cfg);
+//        else {
             String ticketCachePath = 
cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
 
             UserGroupInformation ugi = 
UserGroupInformation.getBestUGI(ticketCachePath, userName);
@@ -132,7 +137,7 @@ public class SecondaryFileSystemProvider {
 
                 throw new IOException("Failed to create file system due to 
interrupt.", ie);
             }
-        }
+//        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index df17198..2b1d836 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -58,9 +58,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws 
IgniteCheckedException {
         this.user = userName;
 
-        this.igfs = igfs.forUser(userName);
+        this.igfs = igfs; //.forUser(userName);
 
-        assert this.user == this.igfs.user();
+        //assert this.user == this.igfs.user();
 
         this.log = log;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 9cee867..061eed7 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -152,7 +152,6 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
     @Override public IgfsHandshakeResponse handshake(String logDir) throws 
IgniteCheckedException {
         final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
 
-        req.userName(userName);
         req.gridName(grid);
         req.igfsName(igfs);
         req.logDirectory(logDir);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 9e84c51..b089995 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest 
extends IgfsCommonAbstra
             primaryConfFullPath = null;
 
         SecondaryFileSystemProvider provider =
-            new SecondaryFileSystemProvider(primaryFsUriStr, 
primaryConfFullPath, null);
+            new SecondaryFileSystemProvider(primaryFsUriStr, 
primaryConfFullPath);
 
-        primaryFs = provider.createFileSystem();
+        primaryFs = provider.createFileSystem(null);
 
         primaryFsUri = provider.uri();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index 63c0b21..5972294 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -916,16 +916,6 @@ public class HadoopDefaultMapReducePlannerSelfTest extends 
HadoopAbstractSelfTes
         @Override public IgfsSecondaryFileSystem asSecondary() {
             return null;
         }
-
-        /** {@inheritDoc} */
-        @Override public IgfsEx forUser(String userName) throws 
IgniteCheckedException {
-            return this;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String user() {
-            return null;
-        }
     }
 
     /**

Reply via email to