[IGNITE-218]: 1. IgfsUserContext and its usages; This fixes. the issue with #withReconnect tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/57d8056b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/57d8056b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/57d8056b Branch: refs/heads/ignite-218 Commit: 57d8056bbf51973875f936c33ead87409317f4cd Parents: 4240ce0 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Sat Apr 18 01:35:33 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Sat Apr 18 01:35:33 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/igfs/IgfsUserContext.java | 94 +++++++++++---- .../processors/igfs/IgfsIpcHandler.java | 7 +- .../hadoop/igfs/HadoopIgfsInProc.java | 117 +++++++++---------- 3 files changed, 128 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57d8056b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java index e48507f..567fae5 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -20,6 +20,7 @@ package org.apache.ignite.igfs; import org.apache.ignite.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.util.concurrent.*; @@ -34,44 +35,87 @@ public abstract class IgfsUserContext { /** * Executes given callable in the given user context. * The main contract of this method is that {@link #currentUser()} method invoked - * inside 'cllbl' callable always returns 'user' this callable executed with. - * @param user the user name - * @param cllbl the callable to execute - * @param <T> The type of callable result. - * @return the result of - * @throws NullPointerException if callable is null - * @throws IllegalArgumentException if user name is null or empty String. - * @throws IgniteException if any Exception thrown from callable.call(). - * The contract is that if this method throws IgniteException, this IgniteException getCause() method - * must return exactly the Exception thrown from the callable. + * inside closure always returns 'user' this callable executed with. + * @param user the user name to invoke closure on behalf of. + * @param clo the closure to execute + * @param <T> The type of closure result. + * @return the result of closure execution. + * @throws NullPointerException if user name is null or empty String or if the closure is null. */ - public static <T> T doAs(String user, final Callable<T> cllbl) { - A.ensure(!F.isEmpty(user), "Failed to use null or empty user name."); + public static <T> T doAs(String user, final IgniteOutClosure<T> clo) { + if (F.isEmpty(user)) + // use NPE to ensure that #doAs() caller will not treat this exception + // as the one thrown from the closure: + throw new NullPointerException("Failed to use null or empty user name."); final String ctxUser = userStackThreadLocal.get(); + if (F.eq(ctxUser, user)) + return clo.apply(); // correct context is already there + + userStackThreadLocal.set(user); + try { - if (F.eq(ctxUser, user)) - return cllbl.call(); // correct context is already there + return clo.apply(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts + * callable that throws checked Exception. + * The Exception is not ever wrapped anyhow. + * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is: + * <pre name="code" class="java"> + * public Foo myOperation() throws MyCheckedException1, MyCheckedException2 { + * try { + * return IgfsUserContext.doAs(user, new Callable<Foo>() { + * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 { + * return makeSomeFoo(); // do the job + * } + * }); + * } + * catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) { + * throw e; + * } + * catch (Exception e) { + * throw new AssertionError("Must never go there."); + * } + * } + * </pre> + * @param user the user name to invoke closure on behalf of. + * @param clbl the Callable to execute + * @param <T> The type of callable result. + * @return the result of closure execution. + * @throws NullPointerException if user name is null or empty String or if the closure is null. + */ + public static <T> T doAs(String user, final Callable<T> clbl) throws Exception { + if (F.isEmpty(user)) + // use NPE to ensure that #doAs() caller will not treat this exception + // as the one thrown from the closure: + throw new NullPointerException("Failed to use null or empty user name."); - userStackThreadLocal.set(user); + final String ctxUser = userStackThreadLocal.get(); - try { - return cllbl.call(); - } - finally { - userStackThreadLocal.set(ctxUser); - } + if (F.eq(ctxUser, user)) + return clbl.call(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clbl.call(); } - catch (Exception e) { - throw new IgniteException(e); + finally { + userStackThreadLocal.set(ctxUser); } } /** * Gets the current context user. - * If this method is invoked outside of any {@link #doAs(String, Callable)} on the call stack, it will return null. - * Otherwise it will return the user name set in the most lower {@link #doAs(String, Callable)} call + * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will return null. + * Otherwise it will return the user name set in the most lower {@link #doAs(String, IgniteOutClosure)} call * on the call stack. * @return the current user, may be null. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57d8056b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index 3ba99fc..cfe6ed4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -31,7 +31,6 @@ import org.jetbrains.annotations.*; import java.io.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** @@ -256,8 +255,8 @@ class IgfsIpcHandler implements IgfsServerHandler { assert userName != null; try { - IgfsUserContext.doAs(userName, new Callable<Void>() { - @Override public Void call() throws Exception { + IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() { + @Override public Void apply() { switch (cmd) { case EXISTS: res.response(igfs.exists(req.path())); @@ -386,7 +385,7 @@ class IgfsIpcHandler implements IgfsServerHandler { }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57d8056b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index ed7f296..47ba0e8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.io.*; @@ -67,23 +68,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgfsHandshakeResponse handshake(final String logDir) { - try { - return IgfsUserContext.doAs(user, new Callable<IgfsHandshakeResponse>() { - @Override public IgfsHandshakeResponse call() throws Exception { - igfs.clientLogDirectory(logDir); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse apply() { + igfs.clientLogDirectory(logDir); - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); + return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), + igfs.globalSampling()); } - }); - } catch (IgniteException e) { - Throwable t = e.getCause(); - - if (t instanceof RuntimeException) - throw (RuntimeException)t; - - throw e; - } + }); } /** {@inheritDoc} */ @@ -103,14 +95,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<IgfsFile>() { - @Override public IgfsFile call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() { + @Override public IgfsFile apply() { return igfs.info(path); } }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path); @@ -120,14 +112,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<IgfsFile>() { - @Override public IgfsFile call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() { + @Override public IgfsFile apply() { return igfs.update(path, props); } }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path); @@ -137,8 +129,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException { try { - IgfsUserContext.doAs(user, new Callable<Void>() { - @Override public Void call() throws Exception { + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { igfs.setTimes(path, accessTime, modificationTime); return null; @@ -148,7 +140,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { return true; } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " + @@ -159,8 +151,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException { try { - IgfsUserContext.doAs(user, new Callable<Void>() { - @Override public Void call() throws Exception { + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { igfs.rename(src, dest); return null; @@ -170,7 +162,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { return true; } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src); @@ -180,14 +172,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<Boolean>() { - @Override public Boolean call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() { + @Override public Boolean apply() { return igfs.delete(path, recursive); } }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path); @@ -198,31 +190,34 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { @Override public IgfsStatus fsStatus() throws IgniteCheckedException { try { return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() { - @Override public IgfsStatus call() throws Exception { + @Override public IgfsStatus call() throws IgniteCheckedException { return igfs.globalSpace(); } }); } - catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); - } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + "stopping."); } + catch (IgniteCheckedException | RuntimeException | Error e) { + throw e; + } + catch (Exception e) { + throw new AssertionError("Must never go there."); + } } /** {@inheritDoc} */ @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<Collection<IgfsPath>>() { - @Override public Collection<IgfsPath> call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> apply() { return igfs.listPaths(path); } }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path); @@ -232,14 +227,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<Collection<IgfsFile>>() { - @Override public Collection<IgfsFile> call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> apply() { return igfs.listFiles(path); } }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path); @@ -249,8 +244,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - IgfsUserContext.doAs(user, new Callable<Void>() { - @Override public Void call() throws Exception { + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { igfs.mkdirs(path, props); return null; @@ -260,7 +255,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { return true; } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " + @@ -271,14 +266,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<IgfsPathSummary>() { - @Override public IgfsPathSummary call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() { + @Override public IgfsPathSummary apply() { return igfs.summary(path); } }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " + @@ -290,14 +285,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<Collection<IgfsBlockLocation>>() { - @Override public Collection<IgfsBlockLocation> call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> apply() { return igfs.affinity(path, start, len); } }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path); @@ -307,8 +302,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { IgfsInputStreamAdapter stream = igfs.open(path, bufSize); return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); @@ -316,7 +311,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); @@ -327,8 +322,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); @@ -336,7 +331,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); @@ -347,8 +342,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate, final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); @@ -357,7 +352,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path); @@ -368,8 +363,8 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - return IgfsUserContext.doAs(user, new Callable<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate call() throws Exception { + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { IgfsOutputStream stream = igfs.append(path, bufSize, create, props); return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); @@ -377,7 +372,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { }); } catch (IgniteException e) { - throw new IgniteCheckedException(e.getCause()); + throw new IgniteCheckedException(e); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path);