[IGNITE-218]: intermediate commit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3efeeaef Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3efeeaef Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3efeeaef Branch: refs/heads/ignite-218 Commit: 3efeeaefb18c15e2b51c0099ee56dc4b157851b0 Parents: 399743c Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Thu Apr 16 17:13:14 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Thu Apr 16 17:13:14 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/igfs/IgfsUserContext.java | 40 +-- .../igfs/secondary/IgfsSecondaryFileSystem.java | 14 -- .../igfs/common/IgfsHandshakeRequest.java | 14 -- .../internal/igfs/common/IgfsMarshaller.java | 4 - .../igfs/common/IgfsPathControlRequest.java | 2 +- .../internal/processors/igfs/IgfsAsyncImpl.java | 11 - .../ignite/internal/processors/igfs/IgfsEx.java | 23 +- .../internal/processors/igfs/IgfsImpl.java | 175 +------------ .../processors/igfs/IgfsIpcHandler.java | 4 +- .../processors/igfs/IgniteUserContext.java | 168 ------------- .../ignite/internal/util/IgniteUtils.java | 12 - .../apache/ignite/hadoop/fs/ExpirableMap.java | 247 ------------------- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 162 +++++++----- .../ignite/hadoop/fs/LazyConcurrentMap.java | 156 ++++++++++++ .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 54 ++-- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 5 +- .../hadoop/SecondaryFileSystemProvider.java | 35 +-- .../hadoop/igfs/HadoopIgfsInProc.java | 4 +- .../hadoop/igfs/HadoopIgfsOutProc.java | 1 - ...oopSecondaryFileSystemConfigurationTest.java | 4 +- .../HadoopDefaultMapReducePlannerSelfTest.java | 10 - 21 files changed, 345 insertions(+), 800 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java index 83b6691..ebbe1c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -2,22 +2,17 @@ package org.apache.ignite.igfs; import org.apache.ignite.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + import java.util.concurrent.*; /** * Provides ability to execute IGFS code in a context of a specific user. */ public abstract class IgfsUserContext { - /** - * Thread local to hold the current user context. - */ - private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<String>() { - @Override protected String initialValue() { - String dfltUser = FileSystemConfiguration.DFLT_USER_NAME; - - return dfltUser.intern(); - } - }; + /** Thread local to hold the current user context. */ + private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>(); /** * Executes given callable in the given user context. @@ -27,12 +22,16 @@ public abstract class IgfsUserContext { * @param cllbl the callable to execute * @param <T> The type of callable result. * @return the result of - * @throws NullPointerException if 'user' or 'callble' is null + * @throws NullPointerException if callable is null + * @throws IllegalArgumentException if user name is null or empty String. * @throws IgniteException if any Exception thrown from callable.call(). * The contract is that if this method throws IgniteException, this IgniteException getCause() method * must return exactly the Exception thrown from the callable. */ public static <T> T doAs(String user, final Callable<T> cllbl) { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + user = user.intern(); final String ctxUser = userStackThreadLocal.get(); @@ -57,13 +56,26 @@ public abstract class IgfsUserContext { /** * Gets the current context user. - * If this method is invoked outside of any doAs(), it will return the default user - * name as defined in {@link FileSystemConfiguration#DFLT_USER_NAME}. + * If this method is invoked outside of any doAs() on call stack, it will return null. * Note that the returned user name is always interned, so * you may compare the names using '==' reference equality. * @return the current user, never null. */ - public static String currentUser() { + @Nullable public static String currentUser() { return userStackThreadLocal.get(); } + + /** + * Provides non-null interned user name. + * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME}, + * which is the current process owner user. + * @param user a user name to be fixed. + * @return non-null interned user name. + */ + public static String fixUserName(@Nullable String user) { + if (F.isEmpty(user)) + user = FileSystemConfiguration.DFLT_USER_NAME; + + return user.intern(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index c2c610c..9026eac 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -198,18 +198,4 @@ public interface IgfsSecondaryFileSystem { * @return Map of properties. */ public Map<String,String> properties(); - - /** - * Gets an instance of 2ndary Fs for the specified user (may be 'this'). - * @param userName the user name - * @return the instance - * @throws IgniteCheckedException - */ - public IgfsSecondaryFileSystem forUser(String userName) throws IgniteCheckedException; - - /** - * The user name this 2ndary Fs works on behalf of. - * @return the user name. - */ - public String getUser(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java index bf5892b..ec8ef6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java @@ -31,10 +31,6 @@ public class IgfsHandshakeRequest extends IgfsMessage { /** Expected IGFS name. */ private String igfsName; - // TODO: Remove. - /** User name the request is done on behalf of. */ - private String userName; - /** Logger directory. */ private String logDir; @@ -94,14 +90,4 @@ public class IgfsHandshakeRequest extends IgfsMessage { @Override public String toString() { return S.toString(IgfsHandshakeRequest.class, this); } - - public final String userName() { - assert userName != null; - - return userName; - } - - public final void userName(String userName) { - this.userName = U.fixUserName(userName); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java index 748fd7e..a4c7830 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java @@ -90,8 +90,6 @@ public class IgfsMarshaller { IgfsHandshakeRequest req = (IgfsHandshakeRequest)msg; - // TODO: Remove. - U.writeString(out, req.userName()); U.writeString(out, req.gridName()); U.writeString(out, req.igfsName()); U.writeString(out, req.logDirectory()); @@ -209,8 +207,6 @@ public class IgfsMarshaller { case HANDSHAKE: { IgfsHandshakeRequest req = new IgfsHandshakeRequest(); - // TODO: Remove. - req.userName(U.readString(in)); req.gridName(U.readString(in)); req.igfsName(U.readString(in)); req.logDirectory(U.readString(in)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java index 99a946a..55495d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java @@ -245,6 +245,6 @@ public class IgfsPathControlRequest extends IgfsMessage { } public final void userName(String userName) { - this.userName = U.fixUserName(userName); + this.userName = IgfsUserContext.fixUserName(userName); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java index 5fb9410..8099985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java @@ -314,15 +314,4 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme @Override public IgfsSecondaryFileSystem asSecondary() { return igfs.asSecondary(); } - - // TODO: Remove. - /** {@inheritDoc} */ - @Override public IgfsEx forUser(String userName) throws IgniteCheckedException { - return igfs.forUser(userName); - } - - /** {@inheritDoc} */ - @Override public String user() { - return igfs.user(); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index c69d144..361f75f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem { /** Property name for URI of file system. */ public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; - /** Property name for user name of file system. */ - public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; + /** Property name for default user name of file system. + * NOTE: for secondary file system this is just a default user name, which is used + * when the 2ndary filesystem is used outside of any user context. + * If another user name is set in the context, 2ndary file system will work on behalf + * of that user, which is different from the default. */ + public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; /** * Stops IGFS cleaning all used resources. @@ -171,19 +175,4 @@ public interface IgfsEx extends IgniteFileSystem { * @return Secondary file system wrapper. */ public IgfsSecondaryFileSystem asSecondary(); - - // TODO: REmove. - /** - * TODO - * @param userName - * @return - * @throws IgniteCheckedException - */ - public IgfsEx forUser(@Nullable String userName) throws IgniteCheckedException; - - /** - * Getter for user name. - * @return user name. - */ - public String user(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 36030a4..a0f1c3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -43,7 +43,6 @@ import org.jetbrains.annotations.*; import org.jsr166.*; import java.io.*; -import java.lang.ref.*; import java.net.*; import java.util.*; import java.util.concurrent.*; @@ -57,6 +56,7 @@ import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*; /** * Cache-based IGFS implementation. + * This is a singleton: only 1 IgfsImpl exists in Ignite node. */ public class IgfsImpl implements IgfsEx { /** Default permissions for file system entry. */ @@ -77,9 +77,6 @@ public class IgfsImpl implements IgfsEx { /** FS configuration. */ private final FileSystemConfiguration cfg; - /** The user name this Fs works on behalf of. */ - private final String user; - /** IGFS context. */ private IgfsContext igfsCtx; @@ -125,8 +122,6 @@ public class IgfsImpl implements IgfsEx { /** Eviction policy (if set). */ private IgfsPerBlockLruEvictionPolicy evictPlc; - private final ConcurrentMap<String, Reference<IgfsImpl>> userToIgfsExMap = new ConcurrentHashMap<>(); - /** Pool for threads working in DUAL mode. */ private IgniteThreadPoolExecutor dualPool; @@ -148,63 +143,7 @@ public class IgfsImpl implements IgfsEx { meta = igfsCtx.meta(); data = igfsCtx.data(); - user = U.fixUserName(getSecondaryFsUser()); - - assert user != null; - - initIgfsSecondaryFileSystem(); - - // Check whether IGFS LRU eviction policy is set on data cache. - String dataCacheName = igfsCtx.configuration().getDataCacheName(); - - for (CacheConfiguration cacheCfg : igfsCtx.kernalContext().config().getCacheConfiguration()) { - if (F.eq(dataCacheName, cacheCfg.getName())) { - EvictionPolicy evictPlc = cacheCfg.getEvictionPolicy(); - - if (evictPlc != null & evictPlc instanceof IgfsPerBlockLruEvictionPolicy) - this.evictPlc = (IgfsPerBlockLruEvictionPolicy)evictPlc; - - break; - } - } - - topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name()); - - igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr); - igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); - } - - /** - * Gets secondary file system user, or null, if no secondary file system is present. - * @return the secondary file system user. - */ - @Nullable protected String getSecondaryFsUser() { - FileSystemConfiguration fsCfg = igfsCtx.configuration(); - - if (fsCfg == null) - return null; - - IgfsSecondaryFileSystem sec = fsCfg.getSecondaryFileSystem(); - - if (sec == null) - return null; - - return sec.getUser(); - } - - /** - * Initializes 2ndary file system. - * @throws IgniteCheckedException on error - */ - private void initIgfsSecondaryFileSystem() throws IgniteCheckedException { - // This is the "prototype" (in terms of GOF pattern) of the 2ndary Fs to be cloned - // for the specified user: - final IgfsSecondaryFileSystem secondaryFs0 = cfg.getSecondaryFileSystem(); - - if (secondaryFs0 == null) - secondaryFs = null; - else - secondaryFs = secondaryFs0.forUser(user); + secondaryFs = cfg.getSecondaryFileSystem(); /* Default IGFS mode. */ final IgfsMode dfltMode; @@ -263,6 +202,9 @@ public class IgfsImpl implements IgfsEx { secondaryPaths = new IgfsPaths(secondaryFs == null ? null : secondaryFs.properties(), dfltMode, modeRslvr.modesOrdered()); + dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L, + new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null; + // Check whether IGFS LRU eviction policy is set on data cache. String dataCacheName = igfsCtx.configuration().getDataCacheName(); @@ -281,9 +223,6 @@ public class IgfsImpl implements IgfsEx { igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr); igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); - - dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L, - new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null; } /** @@ -2169,87 +2108,6 @@ public class IgfsImpl implements IgfsEx { throw new IllegalStateException("Failed to perform IGFS action because grid is stopping."); } - /** {@inheritDoc} */ - @Override public IgfsEx forUser(String userName) throws IgniteCheckedException { - -// Wrapper w = map.get(userName); -// -// if (w != null) -// return w.get(); -// else { -// Wrapper newWrapper = new Wrapper(); -// Wrapper old = map.putIfAbsent(userName, newWrapper); -// -// if (old != null) -// return old.get(); -// else -// return newWrapper.init(); -// } - - - final String userFixed = U.fixUserName(userName); - - if (this.user == userFixed) - return this; // same user - - Reference<IgfsImpl> ref = userToIgfsExMap.get(userFixed); - - IgfsImpl val = (ref == null) ? null : ref.get(); - - if (val == null) { - // Create impl from the same context but with different user: - IgfsImpl newVal = new IgfsImpl(igfsCtx) { - @Override protected String getSecondaryFsUser() { - return userFixed; - } - }; - - - - final Reference<IgfsImpl> newRef = new WeakReference<>(newVal); - - while (true) { - ref = userToIgfsExMap.get(userFixed); - - if (ref == null) { - ref = userToIgfsExMap.putIfAbsent(userFixed, newRef); - - if (ref == null) { - ref = newRef; - val = newVal; - - break; // ref replaced with newRef - } - } - - val = ref.get(); - - if (val != null) - break; // there is an existing value - - boolean replaced = userToIgfsExMap.replace(userFixed, ref, newRef); - - if (replaced) { - ref = newRef; - val = newVal; - - break; - } - } - } - - assert val == ref.get(); - assert val != null; - assert F.eq(val.user(), userFixed); - - return val; - } - - /** {@inheritDoc} */ - @Override public String user() { - return user; - } - /** * IGFS thread factory. */ @@ -2280,27 +2138,4 @@ public class IgfsImpl implements IgfsEx { return t; } } - -// private ConcurrentHashMap8<String, Wrapper> map; - -// private static class Wrapper { -// -// private IgfsSecondaryFileSystem fs; -// -// private CountDownLatch latch; -// -// public IgfsSecondaryFileSystem init() { -// // fs = ... -// -// latch.countDown(); -// -// return fs; -// } -// -// public IgfsSecondaryFileSystem get() { -// latch.await(); -// -// return fs; -// } -// } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index 4812c95..d8a8bdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -254,7 +254,9 @@ class IgfsIpcHandler implements IgfsServerHandler { assert userName != null; - final IgfsEx userIgfs = igfs.forUser(userName); + final IgfsEx userIgfs = igfs; //.forUser(userName); + + //IgfsUtils.setContextUser(userName); try { switch (cmd) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java deleted file mode 100644 index 97b0631..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java +++ /dev/null @@ -1,168 +0,0 @@ -package org.apache.ignite.internal.processors.igfs; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import javax.security.auth.*; -import java.security.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * - */ -public abstract class IgniteUserContext { - - private static final IgniteUserContext instance = new AccessControllerIgniteUserContext(); - - /** - * - * @param user - * @param callable - * @param <T> - * @return - * @throws IgniteCheckedException - */ - public abstract <T> T doAs0 (String user, final Callable<T> callable) throws IgniteCheckedException; - - /** - * - * @return - */ - public abstract String getContextUser0(); - - /** - * - * @param user - * @param callable - * @param <T> - * @return - * @throws IgniteCheckedException - */ - public static <T> T doAs(String user, final Callable<T> callable) throws IgniteCheckedException { - return instance.doAs0(user, callable); - } - - /** - * - * @return - */ - public static String getContextUser() { - return instance.getContextUser0(); - } - - private static final class IgnitePrincipal implements Principal { - - private final String name; - - public IgnitePrincipal(String name) { - this.name = name.intern(); - } - - @Override public String getName() { - return name; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - else if (o == null || getClass() != o.getClass()) - return false; - else - //noinspection StringEquality - return name == ((IgnitePrincipal)o).name; - } - - @Override public int hashCode() { - return name.hashCode(); - } - - @Override public String toString() { - return name; - } - } - - static class AccessControllerIgniteUserContext extends IgniteUserContext { - /** {@inheritDoc} */ - @Override public <T> T doAs0(String user, final Callable<T> callable) throws IgniteCheckedException { - user = user.intern(); - - try { - //noinspection StringEquality - if (getContextUser0() == user) - return callable.call(); - - Subject subject = new Subject(); - - subject.getPrincipals().add(new IgnitePrincipal(user)); - - return Subject.doAs(subject, new PrivilegedExceptionAction<T>() { - @Override public T run() throws Exception { - return callable.call(); - } - }); - } catch (Exception pae) { - throw U.cast(pae); - } - } - - /** {@inheritDoc} */ - @Override public String getContextUser0() { - AccessControlContext context = AccessController.getContext(); - - Subject subject = Subject.getSubject(context); - - Set<IgnitePrincipal> set = subject.getPrincipals(IgnitePrincipal.class); - - if (set.isEmpty()) - return null; - else - return set.iterator().next().getName(); - } - } - - static class ThreadLocalIgniteUserContext extends IgniteUserContext { - private final ThreadLocal<Stack<String>> userStackThreadLocal = new ThreadLocal<Stack<String>>() { - @Override protected Stack<String> initialValue() { - return new Stack<>(); - } - }; - - /** {@inheritDoc} */ - @Override public <T> T doAs0(String user, Callable<T> callable) throws IgniteCheckedException { - user = user.intern(); - - final Stack<String> stack = userStackThreadLocal.get(); - - try { - //noinspection StringEquality - if (!stack.isEmpty() && stack.peek() == user) - return callable.call(); // correct context is already there - - stack.push(user); - - try { - return callable.call(); - } - finally { - String userPopped = stack.pop(); - //noinspection StringEquality - assert user == userPopped; - } - } catch (Exception e) { - throw U.cast(e); - } - } - - /** {@inheritDoc} */ - @Override public String getContextUser0() { - Stack<String> stack = userStackThreadLocal.get(); - - if (stack.isEmpty()) - return null; - - return stack.peek(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 55785e0..1aac985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9005,16 +9005,4 @@ public abstract class IgniteUtils { return hasShmem; } - - /** - * Provides non-null interned user name - * @param user a user name. - * @return non-null interned user name - */ - public static String fixUserName(@Nullable String user) { - if (user == null) - return ""; - - return user.intern(); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java deleted file mode 100644 index ed0cb97..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java +++ /dev/null @@ -1,247 +0,0 @@ -package org.apache.ignite.hadoop.fs; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; -import org.jsr166.*; - -import java.lang.ref.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Maps values by keys. - * Uses affective strategy of object caching: - * Values are lazily created via special {@link ValueFactory}; - * - * When a value is not used longer than 'expirationTime', the value gets held by {@link WeakReference}, and may - * disappear from the map, if it is no longer strongly reachable from the client code. - * The values must implement {@link AccessTimeAware} interface in order to give information about last access - * time to a value object. - * - * If a value has expired and has been removed from the map, the method {@link #get} will return null, - * unless method {@link #getOrCreate} is invoked, which will create a new value and map it to the key again. - */ -public class ExpirableMap<K, T extends ExpirableMap.AccessTimeAware> { - - private final ConcurrentMap<K, Wrapper> map = new ConcurrentHashMap8<>(); - - private final ValueFactory<K, T> factory; - - private final long expirationTimeMs; - - private final ReferenceQueue<T> refQueue = new ReferenceQueue<>(); - - public ExpirableMap(ValueFactory<K, T> factory, final long expirationTimeMs) { - this.factory = factory; - - this.expirationTimeMs = expirationTimeMs; - - // Expiration checker thread: - Thread t = new Thread(new Runnable() { - @Override public void run() { - while (true) { - System.out.println("checking expiration."); - updateExpiration(); - removeStale(); - try { - Thread.sleep(expirationTimeMs); - } catch (InterruptedException ie) { - break; - } - } - } - }); - t.setName("ExpirableMap expiration checker " + Integer.toHexString(this.hashCode())); - t.setDaemon(true); - t.start(); - } - - void updateExpiration() { - for (Wrapper w: map.values()) { - if (w != null) - w.checkExpired(expirationTimeMs); - } - } - - /** - * Gets cached or creates a new value of V. - * @param k the key to associate the value with. - * @return the cached or newly created value, never null. - * @throws IgniteCheckedException on error - */ - public T getOrCreate(K k) throws IgniteCheckedException { - final Wrapper w = new Wrapper(k); - - try { - while (true) { - Wrapper wOld = map.putIfAbsent(k, w); - - if (wOld == null) { - // new wrapper 'w' has been put: - w.init(); - - return w.getValue(); - } - else { - // get the value from existing wrapper: - T v = wOld.getValue(); - - if (v != null) - return v; // value found in the old wrapper. - - // The value expired and possibly destroyed. - // We have to replace the wrapper with a new one: - if (map.replace(k, wOld, w)) { - w.init(); - - return w.getValue(); - } - // Somebody already replaced the wrapper, loop again. - } - } - } - catch (InterruptedException ie) { - throw new IgniteException(ie); - } - } - - public @Nullable T get(K k) { - Wrapper w = map.get(k); - - if (w == null) - return null; - - try { - return w.getValue(); - } catch (InterruptedException ie) { - throw new IgniteException(ie); - } - } - - public Set<K> keySet() { - return map.keySet(); - } - - private class Wrapper { - - private final CountDownLatch latch = new CountDownLatch(1); - - private final K key; - - private volatile T v; - - private DataWeakReference<Wrapper, T> weakRef; - - private Wrapper(K key) { - this.key = key; - } - - private void init() throws IgniteCheckedException { - final T v0 = factory.createValue(key); - - if (v0 == null) - throw new IgniteCheckedException("Failed to create value. [key=" + key + ']'); - - weakRef = new DataWeakReference<>(this, v0, refQueue); - - v = v0; - - latch.countDown(); - } - - /** - * Blocks until the value is initialized. - * @return - * @throws InterruptedException - */ - @Nullable T getValue() throws InterruptedException { - latch.await(); - - T v0 = v; - - if (v0 != null) - return v0; - - // Value may be not reachable strongly (expired), but may still be reachable weakly: - return weakRef.get(); - } - - void checkExpired(long expirationTimeMs) { - T v0 = v; - - if (v0 == null) // The value is already expired: - return; - - long a = v0.accessTimeMs(); - - long usedAgo = System.currentTimeMillis() - a; - - if (usedAgo >= expirationTimeMs) { - v = null; // null the strong reference; 'v' remains only weakly reachable. - - System.out.println("expired: " + v0 ); - } - } - } - - void removeStale() { - DataWeakReference<Wrapper,T> ref; - - while ((ref = (DataWeakReference<Wrapper,T>)refQueue.poll()) != null) { - Wrapper w = ref.getData(); - - K key = w.key; - - boolean removed = map.remove(key, w); - - System.out.println("dequeued: " + ref + " -> " + ref.get() + " removed: " + removed); - } - } - - public static interface AccessTimeAware { - public long accessTimeMs(); - } - - /** - * Interface representing the factory that creates map values. - * @param <K> - * @param <V> - */ - public interface ValueFactory <K, V> { - /** - * Creates the new value. - * @param key - * @return - * @throws IgniteCheckedException - */ - public V createValue(K key) throws IgniteCheckedException; - } - - /** - * Weak reference with an associated data object. - * @param <D> type of the data object. - * @param <V> type of the Reference referent. - */ - private static class DataWeakReference <D, V> extends WeakReference<V> { - /** The data object. */ - private final D data; - - /** - * Guess, what is this?? Yes, this is Constructor! - * @param data - * @param referent - * @param q the reference refQueue to refQueue the reference into. - */ - DataWeakReference(D data, V referent, ReferenceQueue q) { - super(referent, q); - this.data = data; - } - - /** - * Getter for the data object. - */ - D getData() { - return data; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 3c9cc8a..64b54e0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.jetbrains.annotations.*; +import org.apache.ignite.hadoop.fs.LazyConcurrentMap.*; import java.io.*; import java.net.*; @@ -36,19 +37,41 @@ import java.util.*; import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; /** - * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. + * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. + * In fact, this class deals with different FileSystems depending on the user context, + * see {@link IgfsUserContext#currentUser()}. */ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable { - /** Hadoop file system. */ - private final FileSystem fileSys; - /** Properties of file system, see {@link #properties()} - * See {@link IgfsEx#SECONDARY_FS_USER_NAME} + * * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH} * See {@link IgfsEx#SECONDARY_FS_URI} + * See {@link IgfsEx#SECONDARY_FS_USER_NAME} * */ private final Map<String, String> props = new HashMap<>(); + /** Secondary file system provider. */ + private final SecondaryFileSystemProvider secProvider; + + /** The default user name. It is used if no user context is set. */ + private final String dfltUserName; + + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private volatile LazyConcurrentMap<String, FileSystem> fileSysLazyMap = new LazyConcurrentMap<>( + new ValueFactory<String, FileSystem>() { + @Override public FileSystem createValue(String key) { + try { + assert !F.isEmpty(key); + + return secProvider.createFileSystem(key); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + /** * Simple constructor that is to be used by default. * @@ -92,27 +115,32 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys if (F.isEmpty(userName)) userName = null; + this.dfltUserName = IgfsUserContext.fixUserName(userName); + try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath, userName); + this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } - fileSys = secProvider.createFileSystem(); + // Test filesystem creation for the default user name. + // The value is stored in the 'fileSysLazyMap' cache. + FileSystem fileSys = fileSysLazyMap.getOrCreate(dfltUserName); - uri = secProvider.uri().toString(); + assert fileSys != null; - if (!uri.endsWith("/")) - uri += "/"; + uri = secProvider.uri().toString(); - if (cfgPath != null) - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + if (!uri.endsWith("/")) + uri += "/"; - if (userName != null) - props.put(SECONDARY_FS_USER_NAME, userName); + if (cfgPath != null) + props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); - props.put(SECONDARY_FS_URI, uri); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } + props.put(SECONDARY_FS_URI, uri); + + props.put(SECONDARY_FS_USER_NAME, dfltUserName); } /** @@ -122,7 +150,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @return Hadoop path. */ private Path convert(IgfsPath path) { - URI uri = fileSys.getUri(); + URI uri = fileSysForUser().getUri(); return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); } @@ -176,7 +204,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public boolean exists(IgfsPath path) { try { - return fileSys.exists(convert(path)); + return fileSysForUser().exists(convert(path)); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); @@ -187,6 +215,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); + final FileSystem fileSys = fileSysForUser(); + try { if (props0.userName() != null || props0.groupName() != null) fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); @@ -206,7 +236,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Override public void rename(IgfsPath src, IgfsPath dest) { // Delegate to the secondary file system. try { - if (!fileSys.rename(convert(src), convert(dest))) + if (!fileSysForUser().rename(convert(src), convert(dest))) throw new IgfsException("Failed to rename (secondary file system returned false) " + "[src=" + src + ", dest=" + dest + ']'); } @@ -218,7 +248,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public boolean delete(IgfsPath path, boolean recursive) { try { - return fileSys.delete(convert(path), recursive); + return fileSysForUser().delete(convert(path), recursive); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); @@ -228,7 +258,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path) { try { - if (!fileSys.mkdirs(convert(path))) + if (!fileSysForUser().mkdirs(convert(path))) throw new IgniteException("Failed to make directories [path=" + path + "]"); } catch (IOException e) { @@ -239,7 +269,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { try { - if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); } catch (IOException e) { @@ -250,7 +280,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Collection<IgfsPath> listPaths(IgfsPath path) { try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); + FileStatus[] statuses = fileSysForUser().listStatus(convert(path)); if (statuses == null) throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); @@ -273,7 +303,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Collection<IgfsFile> listFiles(IgfsPath path) { try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); + FileStatus[] statuses = fileSysForUser().listStatus(convert(path)); if (statuses == null) throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); @@ -300,13 +330,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { - return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize); + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize); } /** {@inheritDoc} */ @Override public OutputStream create(IgfsPath path, boolean overwrite) { try { - return fileSys.create(convert(path), overwrite); + return fileSysForUser().create(convert(path), overwrite); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); @@ -320,7 +350,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); try { - return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, + return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, null); } catch (IOException e) { @@ -334,7 +364,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, @Nullable Map<String, String> props) { try { - return fileSys.append(convert(path), bufSize); + return fileSysForUser().append(convert(path), bufSize); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); @@ -344,7 +374,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public IgfsFile info(final IgfsPath path) { try { - final FileStatus status = fileSys.getFileStatus(convert(path)); + final FileStatus status = fileSysForUser().getFileStatus(convert(path)); if (status == null) return null; @@ -419,7 +449,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys try { // We don't use FileSystem#getUsed() since it counts only the files // in the filesystem root, not all the files recursively. - return fileSys.getContentSummary(new Path("/")).getSpaceConsumed(); + return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed(); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to get used space size of file system."); @@ -427,51 +457,63 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys } /** {@inheritDoc} */ - @Nullable @Override public Map<String, String> properties() { + @Override public Map<String, String> properties() { return props; } /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - try { - fileSys.close(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); + final LazyConcurrentMap<String,FileSystem> map = fileSysLazyMap; + + if (map == null) + return; // already cleared. + + fileSysLazyMap = null; // 'this' will be unusable after #close(). + + List<IOException> ioExs = new LinkedList<>(); + + Set<String> keySet = map.keySet(); + + for (String key: keySet) { + FileSystem fs = map.get(key); + + if (fs != null) { + try { + fs.close(); + } + catch (IOException ioe) { + ioExs.add(ioe); + } + } } + + map.clear(); + + if (!ioExs.isEmpty()) + throw new IgniteCheckedException(ioExs.get(0)); } /** * Gets the underlying {@link FileSystem}. + * This method is used solely for testing. * @return the underlying Hadoop {@link FileSystem}. */ public FileSystem fileSystem() { - return fileSys; + return fileSysForUser(); } /** - * TODO - * @param user2 - * @return - * @throws IgniteCheckedException + * Gets the FileSystem for the current context user. + * @return the FileSystem instance, never null. */ - @Override public IgfsSecondaryFileSystem forUser(String user2) throws IgniteCheckedException { - String user = props.get(SECONDARY_FS_USER_NAME); - if (F.eq(user, user2)) - return this; - else { - String uri = props.get(SECONDARY_FS_URI); - String cfgPath = props.get(SECONDARY_FS_CONFIG_PATH); + private FileSystem fileSysForUser() { + String user = IgfsUserContext.currentUser(); - return new IgniteHadoopIgfsSecondaryFileSystem(uri, cfgPath, user2); - } - } + if (F.isEmpty(user)) + user = dfltUserName; // default is never empty. - /** - * TODO - * @return - */ - @Override public String getUser() { - return props.get(SECONDARY_FS_USER_NAME); + assert !F.isEmpty(user); + + return fileSysLazyMap.getOrCreate(user); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java new file mode 100644 index 0000000..f2951f4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java @@ -0,0 +1,156 @@ +package org.apache.ignite.hadoop.fs; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + */ +public class LazyConcurrentMap<K, V> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public LazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + final ValueWrapper wNew = new ValueWrapper(k); + + ValueWrapper w = map.putIfAbsent(k, wNew); + + if (w == null) { + // new wrapper 'w' has been put, so init the value: + wNew.init(); + + w = wNew; + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (InterruptedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Gets the value without any attempt to create a new one. + * @param k the key + * @return the value, or null if there is no value for this key. + */ + public @Nullable V get(K k) { + ValueWrapper w = map.get(k); + + if (w == null) + return null; + + try { + return w.getValue(); + } catch (InterruptedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Gets the keySet of this map, + * the contract is as per {@link ConcurrentMap#keySet()} + * @return the set of keys, never null. + */ + public Set<K> keySet() { + return map.keySet(); + } + + /** + * Clears the map. + * Follows the contract of {@link ConcurrentMap#clear()} + */ + public void clear() { + map.clear(); + } + + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Value creation latch */ + private final CountDownLatch vlueCrtLatch = new CountDownLatch(1); + + /** the key */ + private final K key; + + /** the value */ + private volatile V v; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + v = v0; + + vlueCrtLatch.countDown(); + } + + /** + * Blocks until the value is initialized. + * @return the value + * @throws InterruptedException + */ + @Nullable V getValue() throws InterruptedException { + vlueCrtLatch.await(); + + return v; + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Must never return null. + * @param key the key to create value for + * @return the value. + * @throws IgniteException on failure. + */ + public V createValue(K key); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 2dd7ddb..0a02d88 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -194,26 +194,14 @@ public class IgniteHadoopFileSystem extends FileSystem { public static String getHadoopUser(@Nullable Configuration cfg) throws IOException { String user = null; - // First, try to get the user from MR Job configuration: - if (cfg != null) - user = cfg.get(MRJobConfig.USER_NAME); - - // 2nd, try to get it from UserGroupInformation (may return any result if we're - // inside UserGroupInformation.doAs(...) closure): - if (user == null) { - UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser(); - if (currentUgi != null) - user = currentUgi.getShortUserName(); - } + UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser(); + if (currentUgi != null) + user = currentUgi.getShortUserName(); - // 3rd, get the default system (process owner) user name (defaults to "anonymous" in case of null): - if (user == null) - user = DFLT_USER_NAME; + user = IgfsUserContext.fixUserName(user); assert user != null; - user = U.fixUserName(user); - return user; } @@ -259,8 +247,6 @@ public class IgniteHadoopFileSystem extends FileSystem { user = getHadoopUser(cfg); - //setUser(user); - // Override sequential reads before prefetch if needed. seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); @@ -329,10 +315,10 @@ public class IgniteHadoopFileSystem extends FileSystem { String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath, - user); + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + + secondaryFs = secProvider.createFileSystem(user); - secondaryFs = secProvider.createFileSystem(); secondaryUri = secProvider.uri(); } catch (IOException e) { @@ -894,19 +880,19 @@ public class IgniteHadoopFileSystem extends FileSystem { return path.makeQualified(getUri(), null); } - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - @Deprecated // TODO: remove this method. - public void setUser(String userName) { - //System.out.println(this + ": ##### setting user = " + userName + ", thread = " + Thread.currentThread()); - assert F.eq(user, userName); - //this.userName.set(userName); - - //setWorkingDirectory(null); - } +// /** +// * Set user name and default working directory for current thread. +// * +// * @param userName User name. +// */ +// @Deprecated // TODO: remove this method. +// public void setUser(String userName) { +// //System.out.println(this + ": ##### setting user = " + userName + ", thread = " + Thread.currentThread()); +// assert F.eq(user, userName); +// //this.userName.set(userName); +// +// //setWorkingDirectory(null); +// } /** {@inheritDoc} */ @Override public void setWorkingDirectory(Path newPath) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 71413f5..fd97ed6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -291,10 +291,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath, - user); + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); - secondaryFs = secProvider.createAbstractFileSystem(); + secondaryFs = secProvider.createAbstractFileSystem(user); secondaryUri = secProvider.uri(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index ca4da3c..e49da8e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.security.*; +import org.apache.ignite.igfs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -38,8 +39,8 @@ public class SecondaryFileSystemProvider { /** The secondary filesystem URI, never null. */ private final URI uri; - /** Optional user name to log into secondary filesystem with. */ - private @Nullable final String userName; +// /** Optional user name to log into secondary filesystem with. */ +// private @Nullable final String userName; /** * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be @@ -49,12 +50,12 @@ public class SecondaryFileSystemProvider { * property in the provided configuration. * @param secConfPath the secondary Fs path (file path on the local file system, optional). * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. - * @param userName User name. + //* @param userName User name. * @throws IOException */ public SecondaryFileSystemProvider(final @Nullable String secUri, - final @Nullable String secConfPath, @Nullable String userName) throws IOException { - this.userName = userName; + final @Nullable String secConfPath/*, @Nullable String userName*/) throws IOException { + //this.userName = userName; if (secConfPath != null) { URL url = U.resolveIgniteUrl(secConfPath); @@ -90,12 +91,14 @@ public class SecondaryFileSystemProvider { * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. * @throws IOException */ - public FileSystem createFileSystem() throws IOException { + public FileSystem createFileSystem(String userName) throws IOException { + userName = IgfsUserContext.fixUserName(userName); + final FileSystem fileSys; - if (userName == null) - fileSys = FileSystem.get(uri, cfg); - else { +// if (userName == null) +// fileSys = FileSystem.get(uri, cfg); +// else { try { fileSys = FileSystem.get(uri, cfg, userName); } @@ -104,7 +107,7 @@ public class SecondaryFileSystemProvider { throw new IOException("Failed to create file system due to interrupt.", e); } - } +// } return fileSys; } @@ -113,10 +116,12 @@ public class SecondaryFileSystemProvider { * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs. * @throws IOException in case of error. */ - public AbstractFileSystem createAbstractFileSystem() throws IOException { - if (userName == null) - return AbstractFileSystem.get(uri, cfg); - else { + public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException { + userName = IgfsUserContext.fixUserName(userName); + +// if (userName == null) +// return AbstractFileSystem.get(uri, cfg); +// else { String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName); @@ -132,7 +137,7 @@ public class SecondaryFileSystemProvider { throw new IOException("Failed to create file system due to interrupt.", ie); } - } +// } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index df17198..2b1d836 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -58,9 +58,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { this.user = userName; - this.igfs = igfs.forUser(userName); + this.igfs = igfs; //.forUser(userName); - assert this.user == this.igfs.user(); + //assert this.user == this.igfs.user(); this.log = log; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java index 9cee867..061eed7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -152,7 +152,6 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { final IgfsHandshakeRequest req = new IgfsHandshakeRequest(); - req.userName(userName); req.gridName(grid); req.igfsName(igfs); req.logDirectory(logDir); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index 9e84c51..b089995 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra primaryConfFullPath = null; SecondaryFileSystemProvider provider = - new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath, null); + new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath); - primaryFs = provider.createFileSystem(); + primaryFs = provider.createFileSystem(null); primaryFsUri = provider.uri(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3efeeaef/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index 63c0b21..5972294 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -916,16 +916,6 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes @Override public IgfsSecondaryFileSystem asSecondary() { return null; } - - /** {@inheritDoc} */ - @Override public IgfsEx forUser(String userName) throws IgniteCheckedException { - return this; - } - - /** {@inheritDoc} */ - @Override public String user() { - return null; - } } /**