[IGNITE-218]: 1. IgfsUserContext and its usages; This fixes. the issue with 
#withReconnect tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/57d8056b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/57d8056b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/57d8056b

Branch: refs/heads/ignite-218
Commit: 57d8056bbf51973875f936c33ead87409317f4cd
Parents: 4240ce0
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Sat Apr 18 01:35:33 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Sat Apr 18 01:35:33 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/igfs/IgfsUserContext.java |  94 +++++++++++----
 .../processors/igfs/IgfsIpcHandler.java         |   7 +-
 .../hadoop/igfs/HadoopIgfsInProc.java           | 117 +++++++++----------
 3 files changed, 128 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57d8056b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java 
b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
index e48507f..567fae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.igfs;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.util.concurrent.*;
@@ -34,44 +35,87 @@ public abstract class IgfsUserContext {
     /**
      * Executes given callable in the given user context.
      * The main contract of this method is that {@link #currentUser()} method 
invoked
-     * inside 'cllbl' callable always returns 'user' this callable executed 
with.
-     * @param user the user name
-     * @param cllbl the callable to execute
-     * @param <T> The type of callable result.
-     * @return the result of
-     * @throws NullPointerException if callable is null
-     * @throws IllegalArgumentException if user name is null or empty String.
-     * @throws IgniteException if any Exception thrown from callable.call().
-     * The contract is that if this method throws IgniteException, this 
IgniteException getCause() method
-     * must return exactly the Exception thrown from the callable.
+     * inside closure always returns 'user' this callable executed with.
+     * @param user the user name to invoke closure on behalf of.
+     * @param clo the closure to execute
+     * @param <T> The type of closure result.
+     * @return the result of closure execution.
+     * @throws NullPointerException if user name is null or empty String or if 
the closure is null.
      */
-    public static <T> T doAs(String user, final Callable<T> cllbl) {
-        A.ensure(!F.isEmpty(user), "Failed to use null or empty user name.");
+    public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+        if (F.isEmpty(user))
+            // use NPE to ensure that #doAs() caller will not treat this 
exception
+            // as the one thrown from the closure:
+            throw new NullPointerException("Failed to use null or empty user 
name.");
 
         final String ctxUser = userStackThreadLocal.get();
 
+        if (F.eq(ctxUser, user))
+            return clo.apply(); // correct context is already there
+
+        userStackThreadLocal.set(user);
+
         try {
-            if (F.eq(ctxUser, user))
-                return cllbl.call(); // correct context is already there
+            return clo.apply();
+        }
+        finally {
+            userStackThreadLocal.set(ctxUser);
+        }
+    }
+
+    /**
+     * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but 
accepts
+     * callable that throws checked Exception.
+     * The Exception is not ever wrapped anyhow.
+     * If your Callable throws Some specific checked Exceptions, the 
recommended usage pattern is:
+     * <pre name="code" class="java">
+     *  public Foo myOperation() throws MyCheckedException1, 
MyCheckedException2 {
+     *      try {
+     *          return IgfsUserContext.doAs(user, new Callable<Foo>() {
+     *              @Override public Foo call() throws MyCheckedException1, 
MyCheckedException2 {
+     *                  return makeSomeFoo(); // do the job
+     *              }
+     *          });
+     *      }
+     *      catch (MyCheckedException1 | MyCheckedException2 | 
RuntimeException | Error e) {
+     *          throw e;
+     *      }
+     *      catch (Exception e) {
+     *          throw new AssertionError("Must never go there.");
+     *      }
+     *  }
+     * </pre>
+     * @param user the user name to invoke closure on behalf of.
+     * @param clbl the Callable to execute
+     * @param <T> The type of callable result.
+     * @return the result of closure execution.
+     * @throws NullPointerException if user name is null or empty String or if 
the closure is null.
+     */
+    public static <T> T doAs(String user, final Callable<T> clbl) throws 
Exception {
+        if (F.isEmpty(user))
+            // use NPE to ensure that #doAs() caller will not treat this 
exception
+            // as the one thrown from the closure:
+            throw new NullPointerException("Failed to use null or empty user 
name.");
 
-            userStackThreadLocal.set(user);
+        final String ctxUser = userStackThreadLocal.get();
 
-            try {
-                return cllbl.call();
-            }
-            finally {
-                userStackThreadLocal.set(ctxUser);
-            }
+        if (F.eq(ctxUser, user))
+            return clbl.call(); // correct context is already there
+
+        userStackThreadLocal.set(user);
+
+        try {
+            return clbl.call();
         }
-        catch (Exception e) {
-            throw new IgniteException(e);
+        finally {
+            userStackThreadLocal.set(ctxUser);
         }
     }
 
     /**
      * Gets the current context user.
-     * If this method is invoked outside of any {@link #doAs(String, 
Callable)} on the call stack, it will return null.
-     * Otherwise it will return the user name set in the most lower {@link 
#doAs(String, Callable)} call
+     * If this method is invoked outside of any {@link #doAs(String, 
IgniteOutClosure)} on the call stack, it will return null.
+     * Otherwise it will return the user name set in the most lower {@link 
#doAs(String, IgniteOutClosure)} call
      * on the call stack.
      * @return the current user, may be null.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57d8056b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 3ba99fc..cfe6ed4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -31,7 +31,6 @@ import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 /**
@@ -256,8 +255,8 @@ class IgfsIpcHandler implements IgfsServerHandler {
         assert userName != null;
 
         try {
-            IgfsUserContext.doAs(userName, new Callable<Void>() {
-                @Override public Void call() throws Exception {
+            IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() {
+                @Override public Void apply() {
                     switch (cmd) {
                         case EXISTS:
                             res.response(igfs.exists(req.path()));
@@ -386,7 +385,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57d8056b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index ed7f296..47ba0e8 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -67,23 +68,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
 
     /** {@inheritDoc} */
     @Override public IgfsHandshakeResponse handshake(final String logDir) {
-        try {
-            return IgfsUserContext.doAs(user, new 
Callable<IgfsHandshakeResponse>() {
-                @Override public IgfsHandshakeResponse call() throws Exception 
{
-                    igfs.clientLogDirectory(logDir);
+        return IgfsUserContext.doAs(user, new 
IgniteOutClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply() {
+                igfs.clientLogDirectory(logDir);
 
-                    return new IgfsHandshakeResponse(igfs.name(), 
igfs.proxyPaths(), igfs.groupBlockSize(),
-                        igfs.globalSampling());
+                return new IgfsHandshakeResponse(igfs.name(), 
igfs.proxyPaths(), igfs.groupBlockSize(),
+                    igfs.globalSampling());
                 }
-            });
-        } catch (IgniteException e) {
-            Throwable t = e.getCause();
-
-            if (t instanceof RuntimeException)
-                throw (RuntimeException)t;
-
-            throw e;
-        }
+         });
     }
 
     /** {@inheritDoc} */
@@ -103,14 +95,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsFile info(final IgfsPath path) throws 
IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new Callable<IgfsFile>() {
-                @Override public IgfsFile call() throws Exception {
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() 
{
+                @Override public IgfsFile apply() {
                     return igfs.info(path);
                 }
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get file 
info because Grid is stopping: " + path);
@@ -120,14 +112,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsFile update(final IgfsPath path, final Map<String, 
String> props) throws IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new Callable<IgfsFile>() {
-                @Override public IgfsFile call() throws Exception {
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() 
{
+                @Override public IgfsFile apply() {
                     return igfs.update(path, props);
                 }
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to update file 
because Grid is stopping: " + path);
@@ -137,8 +129,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public Boolean setTimes(final IgfsPath path, final long 
accessTime, final long modificationTime) throws IgniteCheckedException {
         try {
-            IgfsUserContext.doAs(user, new Callable<Void>() {
-                @Override public Void call() throws Exception {
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
                     igfs.setTimes(path, accessTime, modificationTime);
 
                     return null;
@@ -148,7 +140,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
             return true;
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to set path 
times because Grid is stopping: " +
@@ -159,8 +151,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) 
throws IgniteCheckedException {
         try {
-            IgfsUserContext.doAs(user, new Callable<Void>() {
-                @Override public Void call() throws Exception {
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
                     igfs.rename(src, dest);
 
                     return null;
@@ -170,7 +162,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
             return true;
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to rename path 
because Grid is stopping: " + src);
@@ -180,14 +172,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public Boolean delete(final IgfsPath path, final boolean 
recursive) throws IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new Callable<Boolean>() {
-                @Override public Boolean call() throws Exception {
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+                @Override public Boolean apply() {
                     return igfs.delete(path, recursive);
                 }
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to delete path 
because Grid is stopping: " + path);
@@ -198,31 +190,34 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
         try {
             return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
-                @Override public IgfsStatus call() throws Exception {
+                @Override public IgfsStatus call() throws 
IgniteCheckedException {
                     return igfs.globalSpace();
                 }
             });
         }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
-        }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get file 
system status because Grid is " +
                 "stopping.");
         }
+        catch (IgniteCheckedException | RuntimeException | Error e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new AssertionError("Must never go there.");
+        }
     }
 
     /** {@inheritDoc} */
     @Override public Collection<IgfsPath> listPaths(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new 
Callable<Collection<IgfsPath>>() {
-                @Override public Collection<IgfsPath> call() throws Exception {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsPath>>() {
+                @Override public Collection<IgfsPath> apply() {
                     return igfs.listPaths(path);
                 }
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to list paths 
because Grid is stopping: " + path);
@@ -232,14 +227,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public Collection<IgfsFile> listFiles(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new 
Callable<Collection<IgfsFile>>() {
-                @Override public Collection<IgfsFile> call() throws Exception {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsFile>>() {
+                @Override public Collection<IgfsFile> apply() {
                     return igfs.listFiles(path);
                 }
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to list files 
because Grid is stopping: " + path);
@@ -249,8 +244,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public Boolean mkdirs(final IgfsPath path, final Map<String, 
String> props) throws IgniteCheckedException {
         try {
-            IgfsUserContext.doAs(user, new Callable<Void>() {
-                @Override public Void call() throws Exception {
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
                     igfs.mkdirs(path, props);
 
                     return null;
@@ -260,7 +255,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
             return true;
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to create 
directory because Grid is stopping: " +
@@ -271,14 +266,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsPathSummary contentSummary(final IgfsPath path) 
throws IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new Callable<IgfsPathSummary>() {
-                @Override public IgfsPathSummary call() throws Exception {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<IgfsPathSummary>() {
+                @Override public IgfsPathSummary apply() {
                     return igfs.summary(path);
                 }
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get content 
summary because Grid is stopping: " +
@@ -290,14 +285,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath 
path, final long start, final long len)
         throws IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new 
Callable<Collection<IgfsBlockLocation>>() {
-                @Override public Collection<IgfsBlockLocation> call() throws 
Exception {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+                @Override public Collection<IgfsBlockLocation> apply() {
                     return igfs.affinity(path, start, len);
                 }
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get affinity 
because Grid is stopping: " + path);
@@ -307,8 +302,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws 
IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new 
Callable<HadoopIgfsStreamDelegate>() {
-                @Override public HadoopIgfsStreamDelegate call() throws 
Exception {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
                     IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
 
                     return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream, stream.fileInfo().length());
@@ -316,7 +311,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to open file 
because Grid is stopping: " + path);
@@ -327,8 +322,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final 
int seqReadsBeforePrefetch)
         throws IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new 
Callable<HadoopIgfsStreamDelegate>() {
-                @Override public HadoopIgfsStreamDelegate call() throws 
Exception {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
                     IgfsInputStreamAdapter stream = igfs.open(path, bufSize, 
seqReadsBeforePrefetch);
 
                     return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream, stream.fileInfo().length());
@@ -336,7 +331,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to open file 
because Grid is stopping: " + path);
@@ -347,8 +342,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, 
final boolean overwrite, final boolean colocate,
         final int replication, final long blockSize, final @Nullable 
Map<String, String> props) throws IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new 
Callable<HadoopIgfsStreamDelegate>() {
-                @Override public HadoopIgfsStreamDelegate call() throws 
Exception {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
                     IgfsOutputStream stream = igfs.create(path, bufSize, 
overwrite,
                         colocate ? igfs.nextAffinityKey() : null, replication, 
blockSize, props);
 
@@ -357,7 +352,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to create file 
because Grid is stopping: " + path);
@@ -368,8 +363,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, 
final boolean create,
         final @Nullable Map<String, String> props) throws 
IgniteCheckedException {
         try {
-            return IgfsUserContext.doAs(user, new 
Callable<HadoopIgfsStreamDelegate>() {
-                @Override public HadoopIgfsStreamDelegate call() throws 
Exception {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
                     IgfsOutputStream stream = igfs.append(path, bufSize, 
create, props);
 
                     return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream);
@@ -377,7 +372,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
             });
         }
         catch (IgniteException e) {
-            throw new IgniteCheckedException(e.getCause());
+            throw new IgniteCheckedException(e);
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to append file 
because Grid is stopping: " + path);

Reply via email to