[IGNITE-494]: 2 new classes committed for review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/58a5bff8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/58a5bff8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/58a5bff8 Branch: refs/heads/ignite-218 Commit: 58a5bff89e477f74c1cefc75f5b7c42f78dd8622 Parents: 2f6826c Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Wed Apr 15 21:10:00 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Wed Apr 15 21:10:00 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgniteUserContext.java | 168 +++++++++++++ .../apache/ignite/hadoop/fs/ExpirableMap.java | 247 +++++++++++++++++++ 2 files changed, 415 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58a5bff8/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java new file mode 100644 index 0000000..97b0631 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java @@ -0,0 +1,168 @@ +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.security.auth.*; +import java.security.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public abstract class IgniteUserContext { + + private static final IgniteUserContext instance = new AccessControllerIgniteUserContext(); + + /** + * + * @param user + * @param callable + * @param <T> + * @return + * @throws IgniteCheckedException + */ + public abstract <T> T doAs0 (String user, final Callable<T> callable) throws IgniteCheckedException; + + /** + * + * @return + */ + public abstract String getContextUser0(); + + /** + * + * @param user + * @param callable + * @param <T> + * @return + * @throws IgniteCheckedException + */ + public static <T> T doAs(String user, final Callable<T> callable) throws IgniteCheckedException { + return instance.doAs0(user, callable); + } + + /** + * + * @return + */ + public static String getContextUser() { + return instance.getContextUser0(); + } + + private static final class IgnitePrincipal implements Principal { + + private final String name; + + public IgnitePrincipal(String name) { + this.name = name.intern(); + } + + @Override public String getName() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + else if (o == null || getClass() != o.getClass()) + return false; + else + //noinspection StringEquality + return name == ((IgnitePrincipal)o).name; + } + + @Override public int hashCode() { + return name.hashCode(); + } + + @Override public String toString() { + return name; + } + } + + static class AccessControllerIgniteUserContext extends IgniteUserContext { + /** {@inheritDoc} */ + @Override public <T> T doAs0(String user, final Callable<T> callable) throws IgniteCheckedException { + user = user.intern(); + + try { + //noinspection StringEquality + if (getContextUser0() == user) + return callable.call(); + + Subject subject = new Subject(); + + subject.getPrincipals().add(new IgnitePrincipal(user)); + + return Subject.doAs(subject, new PrivilegedExceptionAction<T>() { + @Override public T run() throws Exception { + return callable.call(); + } + }); + } catch (Exception pae) { + throw U.cast(pae); + } + } + + /** {@inheritDoc} */ + @Override public String getContextUser0() { + AccessControlContext context = AccessController.getContext(); + + Subject subject = Subject.getSubject(context); + + Set<IgnitePrincipal> set = subject.getPrincipals(IgnitePrincipal.class); + + if (set.isEmpty()) + return null; + else + return set.iterator().next().getName(); + } + } + + static class ThreadLocalIgniteUserContext extends IgniteUserContext { + private final ThreadLocal<Stack<String>> userStackThreadLocal = new ThreadLocal<Stack<String>>() { + @Override protected Stack<String> initialValue() { + return new Stack<>(); + } + }; + + /** {@inheritDoc} */ + @Override public <T> T doAs0(String user, Callable<T> callable) throws IgniteCheckedException { + user = user.intern(); + + final Stack<String> stack = userStackThreadLocal.get(); + + try { + //noinspection StringEquality + if (!stack.isEmpty() && stack.peek() == user) + return callable.call(); // correct context is already there + + stack.push(user); + + try { + return callable.call(); + } + finally { + String userPopped = stack.pop(); + //noinspection StringEquality + assert user == userPopped; + } + } catch (Exception e) { + throw U.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public String getContextUser0() { + Stack<String> stack = userStackThreadLocal.get(); + + if (stack.isEmpty()) + return null; + + return stack.peek(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58a5bff8/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java new file mode 100644 index 0000000..ed0cb97 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java @@ -0,0 +1,247 @@ +package org.apache.ignite.hadoop.fs; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import java.lang.ref.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Maps values by keys. + * Uses affective strategy of object caching: + * Values are lazily created via special {@link ValueFactory}; + * + * When a value is not used longer than 'expirationTime', the value gets held by {@link WeakReference}, and may + * disappear from the map, if it is no longer strongly reachable from the client code. + * The values must implement {@link AccessTimeAware} interface in order to give information about last access + * time to a value object. + * + * If a value has expired and has been removed from the map, the method {@link #get} will return null, + * unless method {@link #getOrCreate} is invoked, which will create a new value and map it to the key again. + */ +public class ExpirableMap<K, T extends ExpirableMap.AccessTimeAware> { + + private final ConcurrentMap<K, Wrapper> map = new ConcurrentHashMap8<>(); + + private final ValueFactory<K, T> factory; + + private final long expirationTimeMs; + + private final ReferenceQueue<T> refQueue = new ReferenceQueue<>(); + + public ExpirableMap(ValueFactory<K, T> factory, final long expirationTimeMs) { + this.factory = factory; + + this.expirationTimeMs = expirationTimeMs; + + // Expiration checker thread: + Thread t = new Thread(new Runnable() { + @Override public void run() { + while (true) { + System.out.println("checking expiration."); + updateExpiration(); + removeStale(); + try { + Thread.sleep(expirationTimeMs); + } catch (InterruptedException ie) { + break; + } + } + } + }); + t.setName("ExpirableMap expiration checker " + Integer.toHexString(this.hashCode())); + t.setDaemon(true); + t.start(); + } + + void updateExpiration() { + for (Wrapper w: map.values()) { + if (w != null) + w.checkExpired(expirationTimeMs); + } + } + + /** + * Gets cached or creates a new value of V. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteCheckedException on error + */ + public T getOrCreate(K k) throws IgniteCheckedException { + final Wrapper w = new Wrapper(k); + + try { + while (true) { + Wrapper wOld = map.putIfAbsent(k, w); + + if (wOld == null) { + // new wrapper 'w' has been put: + w.init(); + + return w.getValue(); + } + else { + // get the value from existing wrapper: + T v = wOld.getValue(); + + if (v != null) + return v; // value found in the old wrapper. + + // The value expired and possibly destroyed. + // We have to replace the wrapper with a new one: + if (map.replace(k, wOld, w)) { + w.init(); + + return w.getValue(); + } + // Somebody already replaced the wrapper, loop again. + } + } + } + catch (InterruptedException ie) { + throw new IgniteException(ie); + } + } + + public @Nullable T get(K k) { + Wrapper w = map.get(k); + + if (w == null) + return null; + + try { + return w.getValue(); + } catch (InterruptedException ie) { + throw new IgniteException(ie); + } + } + + public Set<K> keySet() { + return map.keySet(); + } + + private class Wrapper { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final K key; + + private volatile T v; + + private DataWeakReference<Wrapper, T> weakRef; + + private Wrapper(K key) { + this.key = key; + } + + private void init() throws IgniteCheckedException { + final T v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteCheckedException("Failed to create value. [key=" + key + ']'); + + weakRef = new DataWeakReference<>(this, v0, refQueue); + + v = v0; + + latch.countDown(); + } + + /** + * Blocks until the value is initialized. + * @return + * @throws InterruptedException + */ + @Nullable T getValue() throws InterruptedException { + latch.await(); + + T v0 = v; + + if (v0 != null) + return v0; + + // Value may be not reachable strongly (expired), but may still be reachable weakly: + return weakRef.get(); + } + + void checkExpired(long expirationTimeMs) { + T v0 = v; + + if (v0 == null) // The value is already expired: + return; + + long a = v0.accessTimeMs(); + + long usedAgo = System.currentTimeMillis() - a; + + if (usedAgo >= expirationTimeMs) { + v = null; // null the strong reference; 'v' remains only weakly reachable. + + System.out.println("expired: " + v0 ); + } + } + } + + void removeStale() { + DataWeakReference<Wrapper,T> ref; + + while ((ref = (DataWeakReference<Wrapper,T>)refQueue.poll()) != null) { + Wrapper w = ref.getData(); + + K key = w.key; + + boolean removed = map.remove(key, w); + + System.out.println("dequeued: " + ref + " -> " + ref.get() + " removed: " + removed); + } + } + + public static interface AccessTimeAware { + public long accessTimeMs(); + } + + /** + * Interface representing the factory that creates map values. + * @param <K> + * @param <V> + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. + * @param key + * @return + * @throws IgniteCheckedException + */ + public V createValue(K key) throws IgniteCheckedException; + } + + /** + * Weak reference with an associated data object. + * @param <D> type of the data object. + * @param <V> type of the Reference referent. + */ + private static class DataWeakReference <D, V> extends WeakReference<V> { + /** The data object. */ + private final D data; + + /** + * Guess, what is this?? Yes, this is Constructor! + * @param data + * @param referent + * @param q the reference refQueue to refQueue the reference into. + */ + DataWeakReference(D data, V referent, ReferenceQueue q) { + super(referent, q); + this.data = data; + } + + /** + * Getter for the data object. + */ + D getData() { + return data; + } + } +}