http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d0a327e..2e855d0 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, 
Externalizable {
             if (jobCls0 == null) { // It is enough to have only one class 
loader with only Hadoop classes.
                 synchronized (HadoopDefaultJobInfo.class) {
                     if ((jobCls0 = jobCls) == null) {
-                        HadoopClassLoader ldr = new HadoopClassLoader(null, 
"hadoop-main");
+                        HadoopClassLoader ldr = new HadoopClassLoader(null, 
"hadoop-job");
 
                         jobCls = jobCls0 = 
ldr.loadClass(HadoopV2Job.class.getName());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 00be422..68a9ef6 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.net.*;
 import java.util.*;
 
 /**
@@ -57,6 +63,41 @@ public class HadoopUtils {
     /** Old reducer class attribute. */
     private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
 
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in 
#close() method. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> 
fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+            @Override public FileSystem createValue(FsCacheKey key) {
+                try {
+                    assert key != null;
+
+                    // Explicitly disable FileSystem caching:
+                    URI uri = key.uri();
+
+                    String scheme = uri.getScheme();
+
+                    // Copy the configuration to avoid altering the external 
object.
+                    Configuration cfg = new Configuration(key.configuration());
+
+                    String prop = 
HadoopUtils.disableFsCachePropertyName(scheme);
+
+                    cfg.setBoolean(prop, true);
+
+                    return FileSystem.get(uri, cfg, key.user());
+                }
+                catch (IOException | InterruptedException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
+    /**
+     * Constructor.
+     */
+    private HadoopUtils() {
+        // No-op.
+    }
+
     /**
      * Wraps native split.
      *
@@ -126,11 +167,13 @@ public class HadoopUtils {
                 break;
 
             case PHASE_REDUCE:
-                assert status.totalReducerCnt() > 0;
-
                 setupProgress = 1;
                 mapProgress = 1;
-                reduceProgress = 1f - status.pendingReducerCnt() / 
(float)status.totalReducerCnt();
+
+                if (status.totalReducerCnt() > 0)
+                    reduceProgress = 1f - status.pendingReducerCnt() / 
(float)status.totalReducerCnt();
+                else
+                    reduceProgress = 1f;
 
                 break;
 
@@ -300,9 +343,242 @@ public class HadoopUtils {
     }
 
     /**
-     * Constructor.
+     * Creates {@link Configuration} in a correct class loader context to 
avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link Configuration}.
      */
-    private HadoopUtils() {
-        // No-op.
+    public static Configuration safeCreateConfiguration() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        
Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+
+        try {
+            return new Configuration();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
+     * Creates {@link JobConf} in a correct class loader context to avoid 
caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link JobConf}.
+     */
+    public static JobConf safeCreateJobConf() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        
Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+
+        try {
+            return new JobConf();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
+     * Gets non-null user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    private static String getMrHadoopUser(Configuration cfg) throws 
IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        return user;
+    }
+
+    /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It creates the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * @param uri the file system uri.
+     * @param cfg the configuration.
+     * @return the file system
+     * @throws IOException
+     */
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, 
Configuration cfg, boolean doCacheFs) throws IOException {
+        final String usr = getMrHadoopUser(cfg);
+
+        assert usr != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+
+        if (doCacheFs) {
+            try {
+                fs = getWithCaching(uri, cfg, usr);
+            }
+            catch (IgniteException ie) {
+                throw new IOException(ie);
+            }
+        }
+        else {
+            try {
+                fs = FileSystem.get(uri, cfg, usr);
+            }
+            catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+
+                throw new IOException(ie);
+            }
+        }
+
+        assert fs != null;
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, 
((IgniteHadoopFileSystem)fs).user());
+
+        return fs;
+    }
+
+    /**
+     * Note that configuration is not a part of the key.
+     * It is used solely to initialize the first instance
+     * that is created for the key.
+     */
+    public static final class FsCacheKey {
+        /** */
+        private final URI uri;
+
+        /** */
+        private final String usr;
+
+        /** */
+        private final String equalityKey;
+
+        /** */
+        private final Configuration cfg;
+
+        /**
+         * Constructor
+         */
+        public FsCacheKey(URI uri, String usr, Configuration cfg) {
+            assert uri != null;
+            assert usr != null;
+            assert cfg != null;
+
+            this.uri = fixUri(uri, cfg);
+            this.usr = usr;
+            this.cfg = cfg;
+
+            this.equalityKey = createEqualityKey();
+        }
+
+        /**
+         * Creates String key used for equality and hashing.
+         */
+        private String createEqualityKey() {
+            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+            if (uri.getScheme() != null)
+                sb.a(uri.getScheme().toLowerCase());
+
+            sb.a("://");
+
+            if (uri.getAuthority() != null)
+                sb.a(uri.getAuthority().toLowerCase());
+
+            return sb.toString();
+        }
+
+        /**
+         * The URI.
+         */
+        public URI uri() {
+            return uri;
+        }
+
+        /**
+         * The User.
+         */
+        public String user() {
+            return usr;
+        }
+
+        /**
+         * The Configuration.
+         */
+        public Configuration configuration() {
+            return cfg;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return equalityKey.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return equalityKey;
+        }
+    }
+
+    /**
+     * Gets FileSystem caching it in static Ignite cache. The cache is a 
singleton
+     * for each class loader.
+     *
+     * <p/>Note that the file systems in the cache are keyed by a triplet 
{scheme, authority, user}.
+     * The Configuration is not a part of the key. This means that for the 
given key file system is
+     * initialized only once with the Configuration passed in upon the file 
system creation.
+     *
+     * @param uri The file system URI.
+     * @param cfg The configuration.
+     * @param usr The user to create file system for.
+     * @return The file system: either created, or taken from the cache.
+     */
+    private static FileSystem getWithCaching(URI uri, Configuration cfg, 
String usr) {
+        FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+        return fileSysLazyMap.getOrCreate(key);
+    }
+
+    /**
+     * Gets the property name to disable file system cache.
+     * @param scheme The file system URI scheme.
+     * @return The property name. If scheme is null,
+     * returns "fs.null.impl.disable.cache".
+     */
+    public static String disableFsCachePropertyName(@Nullable String scheme) {
+        return String.format("fs.%s.impl.disable.cache", scheme);
+    }
+
+    /**
+     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+     * @param uri0 The uri.
+     * @param cfg The cfg.
+     * @return Correct URI.
+     */
+    public static URI fixUri(URI uri0, Configuration cfg) {
+        if (uri0 == null)
+            return FileSystem.getDefaultUri(cfg);
+
+        String scheme = uri0.getScheme();
+        String authority = uri0.getAuthority();
+
+        if (authority == null) {
+            URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && 
dfltUri.getAuthority() != null))
+                return dfltUri;
+        }
+
+        return uri0;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index 27805f8..dd679de 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -19,26 +19,26 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.security.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.net.*;
+import java.security.*;
 
 /**
  * Encapsulates logic of secondary filesystem creation.
  */
 public class SecondaryFileSystemProvider {
     /** Configuration of the secondary filesystem, never null. */
-    private final Configuration cfg = new Configuration();
+    private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
 
     /** The secondary filesystem URI, never null. */
     private final URI uri;
 
-    /** Optional user name to log into secondary filesystem with. */
-    private @Nullable final String userName;
-
     /**
      * Creates new provider with given config parameters. The configuration 
URL is optional. The filesystem URI must be
      * specified either explicitly or in the configuration provided.
@@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider {
      * property in the provided configuration.
      * @param secConfPath the secondary Fs path (file path on the local file 
system, optional).
      * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path 
resolved.
-     * @param userName User name.
      * @throws IOException
      */
     public SecondaryFileSystemProvider(final @Nullable String secUri,
-        final @Nullable String secConfPath, @Nullable String userName) throws 
IOException {
-        this.userName = userName;
-
+        final @Nullable String secConfPath) throws IOException {
         if (secConfPath != null) {
             URL url = U.resolveIgniteUrl(secConfPath);
 
@@ -79,7 +76,7 @@ public class SecondaryFileSystemProvider {
         }
 
         // Disable caching:
-        String prop = String.format("fs.%s.impl.disable.cache", 
uri.getScheme());
+        String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
 
         cfg.setBoolean(prop, true);
     }
@@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider {
      * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this 
secondary Fs.
      * @throws IOException
      */
-    public FileSystem createFileSystem() throws IOException {
+    public FileSystem createFileSystem(String userName) throws IOException {
+        userName = IgfsUtils.fixUserName(userName);
+
         final FileSystem fileSys;
 
-        if (userName == null)
-            fileSys = FileSystem.get(uri, cfg);
-        else {
-            try {
-                fileSys = FileSystem.get(uri, cfg, userName);
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+        try {
+           fileSys = FileSystem.get(uri, cfg, userName);
+        }
+        catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
 
-                throw new IOException("Failed to create file system due to 
interrupt.", e);
-            }
+           throw new IOException("Failed to create file system due to 
interrupt.", e);
         }
 
         return fileSys;
@@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider {
 
     /**
      * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for 
this secondary Fs.
-     * @throws IOException
+     * @throws IOException in case of error.
      */
-    public AbstractFileSystem createAbstractFileSystem() throws IOException {
-        return AbstractFileSystem.get(uri, cfg);
+    public AbstractFileSystem createAbstractFileSystem(String userName) throws 
IOException {
+        userName = IgfsUtils.fixUserName(userName);
+
+        String ticketCachePath = 
cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+
+        UserGroupInformation ugi = 
UserGroupInformation.getBestUGI(ticketCachePath, userName);
+
+        try {
+            return ugi.doAs(new 
PrivilegedExceptionAction<AbstractFileSystem>() {
+                @Override public AbstractFileSystem run() throws IOException {
+                    return AbstractFileSystem.get(uri, cfg);
+                }
+            });
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+
+            throw new IOException("Failed to create file system due to 
interrupt.", ie);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
deleted file mode 100644
index 509f443..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class HadoopDistributedFileSystem extends DistributedFileSystem {
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws 
IOException {
-        super.initialize(uri, conf);
-
-        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(getHomeDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
-
-        return path.makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        Path fixedDir = fixRelativePart(dir);
-
-        String res = fixedDir.toUri().getPath();
-
-        if (!DFSUtil.isValidName(res))
-            throw new IllegalArgumentException("Invalid DFS directory name " + 
res);
-
-        workingDir.set(fixedDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workingDir.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index f3f51d4..d90bc28 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
 
 /**
  * Utilities for configuring file systems to support the separate working 
directory per each thread.
@@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils {
     public static final String LOC_FS_WORK_DIR_PROP = "fs." + 
FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
 
     /**
-     * Set user name and default working directory for current thread if it's 
supported by file system.
-     *
-     * @param fs File system.
-     * @param userName User name.
-     */
-    public static void setUser(FileSystem fs, String userName) {
-        if (fs instanceof IgniteHadoopFileSystem)
-            ((IgniteHadoopFileSystem)fs).setUser(userName);
-        else if (fs instanceof HadoopDistributedFileSystem)
-            ((HadoopDistributedFileSystem)fs).setUser(userName);
-    }
-
-    /**
      * Setup wrappers of filesystems to support the separate working directory.
      *
      * @param cfg Config for setup.
@@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils {
         cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", 
HadoopLocalFileSystemV1.class.getName());
         cfg.set("fs.AbstractFileSystem." + 
FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
                 HadoopLocalFileSystemV2.class.getName());
-
-        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", 
HadoopDistributedFileSystem.class.getName());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
new file mode 100644
index 0000000..71b38c4
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ *
+ * Despite of the name, does not depend on any Hadoop classes.
+ */
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
+    /** The map storing the actual values. */
+    private final ConcurrentMap<K, ValueWrapper> map = new 
ConcurrentHashMap8<>();
+
+    /** The factory passed in by the client. Will be used for lazy value 
creation. */
+    private final ValueFactory<K, V> factory;
+
+    /** Lock used to close the objects. */
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+    /** Flag indicating that this map is closed and cleared. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     * @param factory the factory to create new values lazily.
+     */
+    public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
+        this.factory = factory;
+    }
+
+    /**
+     * Gets cached or creates a new value of V.
+     * Never returns null.
+     * @param k the key to associate the value with.
+     * @return the cached or newly created value, never null.
+     * @throws IgniteException on error
+     */
+    public V getOrCreate(K k) {
+        ValueWrapper w = map.get(k);
+
+        if (w == null) {
+            closeLock.readLock().lock();
+
+            try {
+                if (closed)
+                    throw new IllegalStateException("Failed to create value 
for key [" + k
+                        + "]: the map is already closed.");
+
+                final ValueWrapper wNew = new ValueWrapper(k);
+
+                w = map.putIfAbsent(k, wNew);
+
+                if (w == null) {
+                    wNew.init();
+
+                    w = wNew;
+                }
+            }
+            finally {
+                closeLock.readLock().unlock();
+            }
+        }
+
+        try {
+            V v = w.getValue();
+
+            assert v != null;
+
+            return v;
+        }
+        catch (IgniteCheckedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Clears the map and closes all the values.
+     */
+    public void close() throws IgniteCheckedException {
+        closeLock.writeLock().lock();
+
+        try {
+            closed = true;
+
+            Exception err = null;
+
+            Set<K> keySet = map.keySet();
+
+            for (K key : keySet) {
+                V v = null;
+
+                try {
+                    v = map.get(key).getValue();
+                }
+                catch (IgniteCheckedException ignore) {
+                    // No-op.
+                }
+
+                if (v != null) {
+                    try {
+                        v.close();
+                    }
+                    catch (Exception err0) {
+                        if (err == null)
+                            err = err0;
+                    }
+                }
+            }
+
+            map.clear();
+
+            if (err != null)
+                throw new IgniteCheckedException(err);
+        }
+        finally {
+            closeLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Helper class that drives the lazy value creation.
+     */
+    private class ValueWrapper {
+        /** Future. */
+        private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
+
+        /** the key */
+        private final K key;
+
+        /**
+         * Creates new wrapper.
+         */
+        private ValueWrapper(K key) {
+            this.key = key;
+        }
+
+        /**
+         * Initializes the value using the factory.
+         */
+        private void init() {
+            try {
+                final V v0 = factory.createValue(key);
+
+                if (v0 == null)
+                    throw new IgniteException("Failed to create non-null 
value. [key=" + key + ']');
+
+                fut.onDone(v0);
+            }
+            catch (Throwable e) {
+                fut.onDone(e);
+            }
+        }
+
+        /**
+         * Gets the available value or blocks until the value is initialized.
+         * @return the value, never null.
+         * @throws IgniteCheckedException on error.
+         */
+        V getValue() throws IgniteCheckedException {
+            return fut.get();
+        }
+    }
+
+    /**
+     * Interface representing the factory that creates map values.
+     * @param <K> the type of the key.
+     * @param <V> the type of the value.
+     */
+    public interface ValueFactory <K, V> {
+        /**
+         * Creates the new value. Should never return null.
+         *
+         * @param key the key to create value for
+         * @return the value.
+         * @throws IgniteException on failure.
+         */
+        public V createValue(K key);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
index 2f19226..b9c5113 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs {
      * @throws IOException If failed.
      */
     public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+    /**
+     * The user this Igfs instance works on behalf of.
+     * @return the user name.
+     */
+    public String user();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 44e531e..47ba0e8 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** Logger. */
     private final Log log;
 
+    /** The user this Igfs works on behalf of. */
+    private final String user;
+
     /**
      * Constructor.
      *
      * @param igfs Target IGFS.
      * @param log Log.
      */
-    public HadoopIgfsInProc(IgfsEx igfs, Log log) {
+    public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws 
IgniteCheckedException {
+        this.user = IgfsUtils.fixUserName(userName);
+
         this.igfs = igfs;
+
         this.log = log;
 
         bufSize = igfs.configuration().getBlockSize() * 2;
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) {
-        igfs.clientLogDirectory(logDir);
+    @Override public IgfsHandshakeResponse handshake(final String logDir) {
+        return IgfsUserContext.doAs(user, new 
IgniteOutClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply() {
+                igfs.clientLogDirectory(logDir);
 
-        return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), 
igfs.groupBlockSize(),
-            igfs.globalSampling());
+                return new IgfsHandshakeResponse(igfs.name(), 
igfs.proxyPaths(), igfs.groupBlockSize(),
+                    igfs.globalSampling());
+                }
+         });
     }
 
     /** {@inheritDoc} */
@@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile info(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public IgfsFile info(final IgfsPath path) throws 
IgniteCheckedException {
         try {
-            return igfs.info(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() 
{
+                @Override public IgfsFile apply() {
+                    return igfs.info(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) 
throws IgniteCheckedException {
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, 
String> props) throws IgniteCheckedException {
         try {
-            return igfs.update(path, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() 
{
+                @Override public IgfsFile apply() {
+                    return igfs.update(path, props);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean setTimes(IgfsPath path, long accessTime, long 
modificationTime) throws IgniteCheckedException {
+    @Override public Boolean setTimes(final IgfsPath path, final long 
accessTime, final long modificationTime) throws IgniteCheckedException {
         try {
-            igfs.setTimes(path, accessTime, modificationTime);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.setTimes(path, accessTime, modificationTime);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws 
IgniteCheckedException {
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) 
throws IgniteCheckedException {
         try {
-            igfs.rename(src, dest);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.rename(src, dest);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean delete(IgfsPath path, boolean recursive) throws 
IgniteCheckedException {
+    @Override public Boolean delete(final IgfsPath path, final boolean 
recursive) throws IgniteCheckedException {
         try {
-            return igfs.delete(path, recursive);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+                @Override public Boolean apply() {
+                    return igfs.delete(path, recursive);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
         try {
-            return igfs.globalSpace();
+            return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+                @Override public IgfsStatus call() throws 
IgniteCheckedException {
+                    return igfs.globalSpace();
+                }
+            });
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get file 
system status because Grid is " +
                 "stopping.");
         }
+        catch (IgniteCheckedException | RuntimeException | Error e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new AssertionError("Must never go there.");
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return igfs.listPaths(path);
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsPath>>() {
+                @Override public Collection<IgfsPath> apply() {
+                    return igfs.listPaths(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return igfs.listFiles(path);
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsFile>>() {
+                @Override public Collection<IgfsFile> apply() {
+                    return igfs.listFiles(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) 
throws IgniteCheckedException {
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, 
String> props) throws IgniteCheckedException {
         try {
-            igfs.mkdirs(path, props);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.mkdirs(path, props);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return igfs.summary(path);
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<IgfsPathSummary>() {
+                @Override public IgfsPathSummary apply() {
+                    return igfs.summary(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, 
long start, long len)
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath 
path, final long start, final long len)
         throws IgniteCheckedException {
         try {
-            return igfs.affinity(path, start, len);
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+                @Override public Collection<IgfsBlockLocation> apply() {
+                    return igfs.affinity(path, start, len);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws 
IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws 
IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
 
-            return new HadoopIgfsStreamDelegate(this, stream, 
stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int 
seqReadsBeforePrefetch)
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final 
int seqReadsBeforePrefetch)
         throws IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize, 
seqReadsBeforePrefetch);
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize, 
seqReadsBeforePrefetch);
 
-            return new HadoopIgfsStreamDelegate(this, stream, 
stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean 
overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) 
throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, 
final boolean overwrite, final boolean colocate,
+        final int replication, final long blockSize, final @Nullable 
Map<String, String> props) throws IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
-                colocate ? igfs.nextAffinityKey() : null, replication, 
blockSize, props);
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.create(path, bufSize, 
overwrite,
+                        colocate ? igfs.nextAffinityKey() : null, replication, 
blockSize, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean 
create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, 
final boolean create,
+        final @Nullable Map<String, String> props) throws 
IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.append(path, bufSize, create, 
props);
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.append(path, bufSize, 
create, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
         if (lsnr0 != null && log.isDebugEnabled())
             log.debug("Removed stream event listener [delegate=" + delegate + 
']');
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
index 0264e7b..3561e95 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.*;
 @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
 public class HadoopIgfsIpcIo implements HadoopIgfsIo {
     /** Logger. */
-    private Log log;
+    private final Log log;
 
     /** Request futures map. */
     private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 7dca049..f23c62c 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
     /** IGFS name. */
     private final String igfs;
 
+    /** The user this out proc is performing on behalf of. */
+    private final String userName;
+
     /** Client log. */
     private final Log log;
 
@@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, 
Log log) throws IOException {
-        this(host, port, grid, igfs, false, log);
+    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, 
Log log, String user) throws IOException {
+        this(host, port, grid, igfs, false, log, user);
     }
 
     /**
@@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) 
throws IOException {
-        this(null, port, grid, igfs, true, log);
+    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, 
String user) throws IOException {
+        this(null, port, grid, igfs, true, log, user);
     }
 
     /**
@@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, 
boolean shmem, Log log)
+    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, 
boolean shmem, Log log, String user)
         throws IOException {
         assert host != null && !shmem || host == null && shmem :
             "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" 
+ shmem + ']';
@@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         this.grid = grid;
         this.igfs = igfs;
         this.log = log;
+        this.userName = IgfsUtils.fixUserName(user);
 
         io = HadoopIgfsIpcIo.get(log, endpoint);
 
@@ -173,6 +177,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
 
         msg.command(INFO);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_RES).get();
     }
@@ -184,6 +189,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(UPDATE);
         msg.path(path);
         msg.properties(props);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_RES).get();
     }
@@ -196,6 +202,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.path(path);
         msg.accessTime(accessTime);
         msg.modificationTime(modificationTime);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -207,6 +214,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(RENAME);
         msg.path(src);
         msg.destinationPath(dest);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -218,6 +226,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(DELETE);
         msg.path(path);
         msg.flag(recursive);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -231,6 +240,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.path(path);
         msg.start(start);
         msg.length(len);
+        msg.userName(userName);
 
         return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
     }
@@ -241,6 +251,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
 
         msg.command(PATH_SUMMARY);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(SUMMARY_RES).get();
     }
@@ -252,6 +263,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(MAKE_DIRECTORIES);
         msg.path(path);
         msg.properties(props);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -262,6 +274,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
 
         msg.command(LIST_FILES);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_COL_RES).get();
     }
@@ -272,6 +285,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
 
         msg.command(LIST_PATHS);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(PATH_COL_RES).get();
     }
@@ -288,6 +302,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.command(OPEN_READ);
         msg.path(path);
         msg.flag(false);
+        msg.userName(userName);
 
         IgfsInputStreamDescriptor rmtDesc = 
io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
 
@@ -303,6 +318,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.path(path);
         msg.flag(true);
         msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+        msg.userName(userName);
 
         IgfsInputStreamDescriptor rmtDesc = 
io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
 
@@ -321,6 +337,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.properties(props);
         msg.replication(replication);
         msg.blockSize(blockSize);
+        msg.userName(userName);
 
         Long streamId = io.send(msg).chain(LONG_RES).get();
 
@@ -336,6 +353,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
         msg.path(path);
         msg.flag(create);
         msg.properties(props);
+        msg.userName(userName);
 
         Long streamId = io.send(msg).chain(LONG_RES).get();
 
@@ -471,4 +489,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, 
HadoopIgfsIpcIoListener
             }
         };
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return userName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index 1dada21..7d0db49 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -55,6 +55,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
     /** Logger. */
     private final Log log;
 
+    /** The user name this wrapper works on behalf of. */
+    private final String userName;
+
     /**
      * Constructor.
      *
@@ -63,13 +66,15 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
      * @param conf Configuration.
      * @param log Current logger.
      */
-    public HadoopIgfsWrapper(String authority, String logDir, Configuration 
conf, Log log) throws IOException {
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration 
conf, Log log, String user)
+            throws IOException {
         try {
             this.authority = authority;
             this.endpoint = new HadoopIgfsEndpoint(authority);
             this.logDir = logDir;
             this.conf = conf;
             this.log = log;
+            this.userName = user;
         }
         catch (IgniteCheckedException e) {
             throw new IOException("Failed to parse endpoint: " + authority, e);
@@ -362,13 +367,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 HadoopIgfsEx hadoop = null;
 
                 try {
-                    hadoop = new HadoopIgfsInProc(igfs, log);
+                    hadoop = new HadoopIgfsInProc(igfs, log, userName);
 
                     curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
                 }
                 catch (IOException | IgniteCheckedException e) {
                     if (e instanceof HadoopIgfsCommunicationException)
-                        hadoop.close(true);
+                        if (hadoop != null)
+                            hadoop.close(true);
 
                     if (log.isDebugEnabled())
                         log.debug("Failed to connect to in-proc IGFS, fallback 
to IPC mode.", e);
@@ -384,7 +390,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 HadoopIgfsEx hadoop = null;
 
                 try {
-                    hadoop = new HadoopIgfsOutProc(endpoint.port(), 
endpoint.grid(), endpoint.igfs(), log);
+                    hadoop = new HadoopIgfsOutProc(endpoint.port(), 
endpoint.grid(), endpoint.igfs(), log, userName);
 
                     curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
                 }
@@ -409,7 +415,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
 
                 try {
                     hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), 
endpoint.grid(), endpoint.igfs(),
-                        log);
+                        log, userName);
 
                     curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
                 }
@@ -430,7 +436,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
             HadoopIgfsEx hadoop = null;
 
             try {
-                hadoop = new HadoopIgfsOutProc(endpoint.host(), 
endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+                hadoop = new HadoopIgfsOutProc(endpoint.host(), 
endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                    log, userName);
 
                 curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 2e04ac1..b170125 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements 
Callable<Void> {
 
     /** {@inheritDoc} */
     @Override public Void call() throws IgniteCheckedException {
+        ctx = job.getTaskContext(info);
+
+        return ctx.runAsJobOwner(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                call0();
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Implements actual task running.
+     * @throws IgniteCheckedException
+     */
+    void call0() throws IgniteCheckedException {
         execStartTs = U.currentTimeMillis();
 
         Throwable err = null;
@@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements 
Callable<Void> {
         HadoopPerformanceCounter perfCntr = null;
 
         try {
-            ctx = job.getTaskContext(info);
-
             perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), 
nodeId);
 
             perfCntr.onTaskSubmit(info, submitTs);
@@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements 
Callable<Void> {
             if (ctx != null)
                 ctx.cleanupTaskEnvironment();
         }
-
-        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index d265ca8..d754039 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.hadoop.v2;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.JobID;
@@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob {
         new ConcurrentHashMap8<>();
 
     /** Pooling task context class and thus class loading environment. */
-    private final Queue<Class<?>> taskCtxClsPool = new 
ConcurrentLinkedQueue<>();
+    private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = 
new ConcurrentLinkedQueue<>();
 
     /** All created contexts. */
     private final Queue<Class<?>> fullCtxClsQueue = new 
ConcurrentLinkedDeque<>();
@@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob {
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
-        HadoopClassLoader clsLdr = 
(HadoopClassLoader)getClass().getClassLoader();
-
-        // Before create JobConf instance we should set new context class 
loader.
-        Thread.currentThread().setContextClassLoader(clsLdr);
-
-        jobConf = new JobConf();
+        jobConf = HadoopUtils.safeCreateJobConf();
 
         HadoopFileSystemsUtils.setupFileSystems(jobConf);
 
@@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob {
 
             Path jobDir = new Path(jobDirPath);
 
-            try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
+            try {
+                FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, 
true);
+
                 JobSplit.TaskSplitMetaInfo[] metaInfos = 
SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
 
@@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob {
         if (old != null)
             return old.get();
 
-        Class<?> cls = taskCtxClsPool.poll();
+        Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
 
         try {
             if (cls == null) {
@@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob {
                 // Note that the classloader identified by the task it was 
initially created for,
                 // but later it may be reused for other tasks.
                 HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
-                    "hadoop-" + info.jobId() + "-" + info.type() + "-" + 
info.taskNumber());
+                    "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + 
info.taskNumber());
 
-                cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+                cls = (Class<? extends 
HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
 
                 fullCtxClsQueue.add(cls);
             }
@@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob {
 
     /** {@inheritDoc} */
     @Override public void cleanupStagingDirectory() {
-        if (rsrcMgr != null)
-            rsrcMgr.cleanupStagingDirectory();
+        rsrcMgr.cleanupStagingDirectory();
+    }
+
+    /**
+     * Getter for job configuration.
+     * @return The job configuration.
+     */
+    public JobConf jobConf() {
+        return jobConf;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 6f6bfa1..2f64e77 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -40,6 +40,9 @@ import java.util.*;
  * files are needed to be placed on local files system.
  */
 public class HadoopV2JobResourceManager {
+    /** File type Fs disable caching property name. */
+    private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = 
HadoopUtils.disableFsCachePropertyName("file");
+
     /** Hadoop job context. */
     private final JobContextImpl ctx;
 
@@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager {
         try {
             cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, 
dir.getAbsolutePath());
 
-            if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+            if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
                 FileSystem.getLocal(cfg).setWorkingDirectory(new 
Path(dir.getAbsolutePath()));
         }
         finally {
@@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
+                    FileSystem fs = 
HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true);
 
                     if (!fs.exists(stagingDir))
-                        throw new IgniteCheckedException("Failed to find 
map-reduce submission directory (does not exist): " +
-                            stagingDir);
+                        throw new IgniteCheckedException("Failed to find 
map-reduce submission " +
+                            "directory (does not exist): " + stagingDir);
 
                     if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                        throw new IgniteCheckedException("Failed to copy job 
submission directory contents to local file system " +
-                            "[path=" + stagingDir + ", locDir=" + 
jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+                        throw new IgniteCheckedException("Failed to copy job 
submission directory "
+                            + "contents to local file system "
+                            + "[path=" + stagingDir + ", locDir=" + 
jobLocDir.getAbsolutePath()
+                            + ", jobId=" + jobId + ']');
                 }
 
                 File jarJobFile = new File(jobLocDir, "job.jar");
@@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager {
                 }
             }
             else if (!jobLocDir.mkdirs())
-                throw new IgniteCheckedException("Failed to create local job 
directory: " + jobLocDir.getAbsolutePath());
+                throw new IgniteCheckedException("Failed to create local job 
directory: "
+                    + jobLocDir.getAbsolutePath());
 
             setLocalFSWorkingDirectory(jobLocDir);
         }
@@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            FileSystem srcFs = srcPath.getFileSystem(cfg);
+            FileSystem srcFs = 
HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), 
".cached-archives");
 
                 if (!archivesPath.exists() && !archivesPath.mkdir())
                     throw new IOException("Failed to create directory " +
-                         "[path=" + archivesPath + ", jobId=" + jobId + ']');
+                        "[path=" + archivesPath + ", jobId=" + jobId + ']');
 
                 File archiveFile = new File(archivesPath, locName);
 
@@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null)
-                stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, 
true);
+                HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), 
ctx.getJobConf(), true).delete(stagingDir, true);
         }
         catch (Exception e) {
             log.error("Failed to remove job staging directory [path=" + 
stagingDir + ", jobId=" + jobId + ']' , e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index e9c859bd..e89feba 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.security.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -239,9 +243,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         
Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
 
         try {
-            FileSystem fs = FileSystem.get(jobConf());
-
-            HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+            FileSystem.get(jobConf());
 
             LocalFileSystem locFs = FileSystem.getLocal(jobConf());
 
@@ -421,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private Object readExternalSplit(HadoopExternalSplit split) throws 
IgniteCheckedException {
         Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
 
-        try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+        try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), 
false);
             FSDataInputStream in = 
fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
 
             in.seek(split.offset());
@@ -450,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext 
{
             throw new IgniteCheckedException(e);
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public <T> T runAsJobOwner(final Callable<T> c) throws 
IgniteCheckedException {
+        String user = job.info().user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        String ugiUser;
+
+        try {
+            UserGroupInformation currUser = 
UserGroupInformation.getCurrentUser();
+
+            assert currUser != null;
+
+            ugiUser = currUser.getShortUserName();
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        try {
+            if (F.eq(user, ugiUser))
+                // if current UGI context user is the same, do direct call:
+                return c.call();
+            else {
+                UserGroupInformation ugi = 
UserGroupInformation.getBestUGI(null, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return c.call();
+                    }
+                });
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
index b94d9d1..b9f8179 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.hadoop.mapreduce.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends 
HadoopAbstractSelfTest {
      * @return Configuration.
      */
     private Configuration config(int port) {
-        Configuration conf = new Configuration();
+        Configuration conf = HadoopUtils.safeCreateConfiguration();
 
         setupFileSystems(conf);
 
@@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends 
HadoopAbstractSelfTest {
             ctx.getCounter(TestCounter.COUNTER2).increment(1);
 
             int sum = 0;
-            for (IntWritable value : values) {
+            for (IntWritable value : values)
                 sum += value.get();
-            }
 
             ctx.write(key, new IntWritable(sum));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
index d11cabb..91f926d 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
@@ -39,6 +40,7 @@ import org.jsr166.*;
 
 import java.io.*;
 import java.net.*;
+import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -58,6 +60,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest 
extends IgfsCommonA
     /** Thread count for multithreaded tests. */
     private static final int THREAD_CNT = 8;
 
+    /** Secondary file system user. */
+    private static final String SECONDARY_FS_USER = "secondary-default";
+
     /** IP finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
 
@@ -255,7 +260,7 @@ public abstract class 
HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         if (mode != PRIMARY)
             cfg.setSecondaryFileSystem(new 
IgniteHadoopIgfsSecondaryFileSystem(secondaryFileSystemUriPath(),
-                secondaryFileSystemConfigPath()));
+                secondaryFileSystemConfigPath(), SECONDARY_FS_USER));
 
         
cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
         cfg.setManagementPort(-1);
@@ -278,11 +283,28 @@ public abstract class 
HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         
primaryFsCfg.addResource(U.resolveIgniteUrl(primaryFileSystemConfigPath()));
 
-        fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+        UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, 
getClientFsUser());
+
+        // Create Fs on behalf of the client user:
+        ugi.doAs(new PrivilegedExceptionAction<Object>() {
+            @Override public Object run() throws Exception {
+                fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+
+                return null;
+            }
+        });
 
         barrier = new CyclicBarrier(THREAD_CNT);
     }
 
+    /**
+     * Gets the user the Fs client operates on bahalf of.
+     * @return The user the Fs client operates on bahalf of.
+     */
+    protected String getClientFsUser() {
+        return "foo";
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         try {
@@ -297,14 +319,17 @@ public abstract class 
HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
     /** @throws Exception If failed. */
     public void testStatus() throws Exception {
+        Path file1 = new Path("/file1");
 
-        try (FSDataOutputStream file = fs.create(new Path("/file1"), 
EnumSet.noneOf(CreateFlag.class),
+        try (FSDataOutputStream file = fs.create(file1, 
EnumSet.noneOf(CreateFlag.class),
             Options.CreateOpts.perms(FsPermission.getDefault()))) {
             file.write(new byte[1024 * 1024]);
         }
 
         FsStatus status = fs.getFsStatus();
 
+        assertEquals(getClientFsUser(), fs.getFileStatus(file1).getOwner());
+
         assertEquals(4, grid(0).cluster().nodes().size());
 
         long used = 0, max = 0;
@@ -707,6 +732,8 @@ public abstract class 
HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         os.close();
 
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
         fs.setOwner(file, "aUser", "aGroup");
 
         assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -796,20 +823,20 @@ public abstract class 
HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         int cnt = 2 * 1024;
 
-        FSDataOutputStream out = fs.create(file, 
EnumSet.noneOf(CreateFlag.class),
-            Options.CreateOpts.perms(FsPermission.getDefault()));
+        try (FSDataOutputStream out = fs.create(file, 
EnumSet.noneOf(CreateFlag.class),
+            Options.CreateOpts.perms(FsPermission.getDefault()))) {
 
-        for (long i = 0; i < cnt; i++)
-            out.writeLong(i);
+            for (long i = 0; i < cnt; i++)
+                out.writeLong(i);
+        }
 
-        out.close();
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
 
-        FSDataInputStream in = fs.open(file, 1024);
+        try (FSDataInputStream in = fs.open(file, 1024)) {
 
-        for (long i = 0; i < cnt; i++)
-            assertEquals(i, in.readLong());
-
-        in.close();
+            for (long i = 0; i < cnt; i++)
+                assertEquals(i, in.readLong());
+        }
     }
 
     /** @throws Exception If failed. */
@@ -1052,7 +1079,9 @@ public abstract class 
HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
     }
 
     /** @throws Exception If failed. */
-    public void _testRenameDirectoryIfDstPathExists() throws Exception {
+    public void testRenameDirectoryIfDstPathExists() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-825";); 
+        
         Path fsHome = new Path(primaryFsUri);
         Path srcDir = new Path(fsHome, "/tmp/");
         Path dstDir = new Path(fsHome, "/tmpNew/");
@@ -1191,6 +1220,9 @@ public abstract class 
HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
         assertEquals(nestedDirPerm, 
fs.getFileStatus(nestedDir).getPermission());
+
+        assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+        assertEquals(getClientFsUser(), 
fs.getFileStatus(nestedDir).getOwner());
     }
 
     /** @throws Exception If failed. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 9e84c51..b089995 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest 
extends IgfsCommonAbstra
             primaryConfFullPath = null;
 
         SecondaryFileSystemProvider provider =
-            new SecondaryFileSystemProvider(primaryFsUriStr, 
primaryConfFullPath, null);
+            new SecondaryFileSystemProvider(primaryFsUriStr, 
primaryConfFullPath);
 
-        primaryFs = provider.createFileSystem();
+        primaryFs = provider.createFileSystem(null);
 
         primaryFsUri = provider.uri();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
index d3440fc..c0f73af 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
@@ -73,10 +73,7 @@ public class IgfsNearOnlyMultiNodeSelfTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
+        cfg.setDiscoverySpi(new 
TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true));
 
         FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
 

Reply via email to