[IGNITE-494]: 2 new classes committed for review.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/58a5bff8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/58a5bff8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/58a5bff8

Branch: refs/heads/ignite-218
Commit: 58a5bff89e477f74c1cefc75f5b7c42f78dd8622
Parents: 2f6826c
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Wed Apr 15 21:10:00 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Wed Apr 15 21:10:00 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgniteUserContext.java      | 168 +++++++++++++
 .../apache/ignite/hadoop/fs/ExpirableMap.java   | 247 +++++++++++++++++++
 2 files changed, 415 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58a5bff8/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java
new file mode 100644
index 0000000..97b0631
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgniteUserContext.java
@@ -0,0 +1,168 @@
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.security.auth.*;
+import java.security.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public abstract class IgniteUserContext {
+
+    private static final IgniteUserContext instance = new 
AccessControllerIgniteUserContext();
+
+    /**
+     *
+     * @param user
+     * @param callable
+     * @param <T>
+     * @return
+     * @throws IgniteCheckedException
+     */
+    public abstract <T> T doAs0 (String user, final Callable<T> callable) 
throws IgniteCheckedException;
+
+    /**
+     *
+     * @return
+     */
+    public abstract String getContextUser0();
+
+    /**
+     *
+     * @param user
+     * @param callable
+     * @param <T>
+     * @return
+     * @throws IgniteCheckedException
+     */
+    public static <T> T doAs(String user, final Callable<T> callable) throws 
IgniteCheckedException {
+        return instance.doAs0(user, callable);
+    }
+
+    /**
+     *
+     * @return
+     */
+    public static String getContextUser() {
+        return instance.getContextUser0();
+    }
+
+    private static final class IgnitePrincipal implements Principal {
+
+        private final String name;
+
+        public IgnitePrincipal(String name) {
+            this.name = name.intern();
+        }
+
+        @Override public String getName() {
+            return name;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            else if (o == null || getClass() != o.getClass())
+                return false;
+            else
+                //noinspection StringEquality
+                return name == ((IgnitePrincipal)o).name;
+        }
+
+        @Override public int hashCode() {
+            return name.hashCode();
+        }
+
+        @Override public String toString() {
+            return name;
+        }
+    }
+
+    static class AccessControllerIgniteUserContext extends IgniteUserContext {
+        /** {@inheritDoc} */
+        @Override public <T> T doAs0(String user, final Callable<T> callable) 
throws IgniteCheckedException {
+            user = user.intern();
+
+            try {
+                //noinspection StringEquality
+                if (getContextUser0() == user)
+                    return callable.call();
+
+                Subject subject = new Subject();
+
+                subject.getPrincipals().add(new IgnitePrincipal(user));
+
+                return Subject.doAs(subject, new 
PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return callable.call();
+                    }
+                });
+            } catch (Exception pae) {
+                throw U.cast(pae);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String getContextUser0() {
+            AccessControlContext context = AccessController.getContext();
+
+            Subject subject = Subject.getSubject(context);
+
+            Set<IgnitePrincipal> set = 
subject.getPrincipals(IgnitePrincipal.class);
+
+            if (set.isEmpty())
+                return null;
+            else
+                return set.iterator().next().getName();
+        }
+    }
+
+    static class ThreadLocalIgniteUserContext extends IgniteUserContext {
+        private final ThreadLocal<Stack<String>> userStackThreadLocal = new 
ThreadLocal<Stack<String>>() {
+            @Override protected Stack<String> initialValue() {
+                return new Stack<>();
+            }
+        };
+
+        /** {@inheritDoc} */
+        @Override public <T> T doAs0(String user, Callable<T> callable) throws 
IgniteCheckedException {
+            user = user.intern();
+
+            final Stack<String> stack = userStackThreadLocal.get();
+
+            try {
+                //noinspection StringEquality
+                if (!stack.isEmpty() && stack.peek() == user)
+                    return callable.call(); // correct context is already there
+
+                stack.push(user);
+
+                try {
+                    return callable.call();
+                }
+                finally {
+                    String userPopped = stack.pop();
+                    //noinspection StringEquality
+                    assert user == userPopped;
+                }
+            } catch (Exception e) {
+                throw U.cast(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String getContextUser0() {
+            Stack<String> stack = userStackThreadLocal.get();
+
+            if (stack.isEmpty())
+                return null;
+
+            return stack.peek();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58a5bff8/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java
new file mode 100644
index 0000000..ed0cb97
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/ExpirableMap.java
@@ -0,0 +1,247 @@
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.lang.ref.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Maps values by keys.
+ * Uses affective strategy of object caching:
+ * Values are lazily created via special {@link ValueFactory};
+ *
+ * When a value is not used longer than 'expirationTime', the value gets held 
by {@link WeakReference}, and may
+ * disappear from the map, if it is no longer strongly reachable from the 
client code.
+ * The values must implement {@link AccessTimeAware} interface in order to 
give information about last access
+ * time to a value object.
+ *
+ * If a value has expired and has been removed from the map, the method {@link 
#get} will return null,
+ * unless method {@link #getOrCreate} is invoked, which will create a new 
value and map it to the key again.
+ */
+public class ExpirableMap<K, T extends ExpirableMap.AccessTimeAware> {
+
+    private final ConcurrentMap<K, Wrapper> map = new ConcurrentHashMap8<>();
+
+    private final ValueFactory<K, T> factory;
+
+    private final long expirationTimeMs;
+
+    private final ReferenceQueue<T> refQueue = new ReferenceQueue<>();
+
+    public ExpirableMap(ValueFactory<K, T> factory, final long 
expirationTimeMs) {
+        this.factory = factory;
+
+        this.expirationTimeMs = expirationTimeMs;
+
+        // Expiration checker thread:
+        Thread t = new Thread(new Runnable() {
+            @Override public void run() {
+                while (true) {
+                    System.out.println("checking expiration.");
+                    updateExpiration();
+                    removeStale();
+                    try {
+                        Thread.sleep(expirationTimeMs);
+                    } catch (InterruptedException ie) {
+                        break;
+                    }
+                }
+            }
+        });
+        t.setName("ExpirableMap expiration checker " + 
Integer.toHexString(this.hashCode()));
+        t.setDaemon(true);
+        t.start();
+    }
+
+    void updateExpiration() {
+        for (Wrapper w: map.values()) {
+            if (w != null)
+                w.checkExpired(expirationTimeMs);
+        }
+    }
+
+    /**
+     * Gets cached or creates a new value of V.
+     * @param k the key to associate the value with.
+     * @return the cached or newly created value, never null.
+     * @throws IgniteCheckedException on error
+     */
+    public T getOrCreate(K k) throws IgniteCheckedException {
+        final Wrapper w = new Wrapper(k);
+
+        try {
+            while (true) {
+                Wrapper wOld = map.putIfAbsent(k, w);
+
+                if (wOld == null) {
+                    // new wrapper 'w' has been put:
+                    w.init();
+
+                    return w.getValue();
+                }
+                else {
+                    // get the value from existing wrapper:
+                    T v = wOld.getValue();
+
+                    if (v != null)
+                        return v; // value found in the old wrapper.
+
+                    // The value expired and possibly destroyed.
+                    // We have to replace the wrapper with a new one:
+                    if (map.replace(k, wOld, w)) {
+                        w.init();
+
+                        return w.getValue();
+                    }
+                    // Somebody already replaced the wrapper, loop again.
+                }
+            }
+        }
+        catch (InterruptedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    public @Nullable T get(K k) {
+        Wrapper w = map.get(k);
+
+        if (w == null)
+            return null;
+
+        try {
+            return w.getValue();
+        } catch (InterruptedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    public Set<K> keySet() {
+        return map.keySet();
+    }
+
+    private class Wrapper {
+
+        private final CountDownLatch latch = new CountDownLatch(1);
+
+        private final K key;
+
+        private volatile T v;
+
+        private DataWeakReference<Wrapper, T> weakRef;
+
+        private Wrapper(K key) {
+            this.key = key;
+        }
+
+        private void init() throws IgniteCheckedException {
+            final T v0 = factory.createValue(key);
+
+            if (v0 == null)
+                throw new IgniteCheckedException("Failed to create value. 
[key=" + key + ']');
+
+            weakRef = new DataWeakReference<>(this, v0, refQueue);
+
+            v = v0;
+
+            latch.countDown();
+        }
+
+        /**
+         * Blocks until the value is initialized.
+         * @return
+         * @throws InterruptedException
+         */
+        @Nullable T getValue() throws InterruptedException {
+            latch.await();
+
+            T v0 = v;
+
+            if (v0 != null)
+                return v0;
+
+            // Value may be not reachable strongly (expired), but may still be 
reachable weakly:
+            return weakRef.get();
+        }
+
+        void checkExpired(long expirationTimeMs) {
+            T v0 = v;
+
+            if (v0 == null) // The value is already expired:
+                return;
+
+            long a = v0.accessTimeMs();
+
+            long usedAgo = System.currentTimeMillis() - a;
+
+            if (usedAgo >= expirationTimeMs) {
+                v = null; // null the strong reference; 'v' remains only 
weakly reachable.
+
+                System.out.println("expired: " + v0 );
+            }
+        }
+    }
+
+    void removeStale() {
+        DataWeakReference<Wrapper,T> ref;
+
+        while ((ref = (DataWeakReference<Wrapper,T>)refQueue.poll()) != null) {
+            Wrapper w = ref.getData();
+
+            K key = w.key;
+
+            boolean removed = map.remove(key, w);
+
+            System.out.println("dequeued: " + ref + " -> " + ref.get() + " 
removed: " + removed);
+        }
+    }
+
+    public static interface AccessTimeAware {
+        public long accessTimeMs();
+    }
+
+    /**
+     * Interface representing the factory that creates map values.
+     * @param <K>
+     * @param <V>
+     */
+    public interface ValueFactory <K, V> {
+        /**
+         * Creates the new value.
+         * @param key
+         * @return
+         * @throws IgniteCheckedException
+         */
+        public V createValue(K key) throws IgniteCheckedException;
+    }
+
+    /**
+     * Weak reference with an associated data object.
+     * @param <D> type of the data object.
+     * @param <V> type of the Reference referent.
+     */
+    private static class DataWeakReference <D, V> extends WeakReference<V> {
+        /** The data object. */
+        private final D data;
+
+        /**
+         * Guess, what is this?? Yes, this is Constructor!
+         * @param data
+         * @param referent
+         * @param q the reference refQueue to refQueue the reference into.
+         */
+        DataWeakReference(D data, V referent, ReferenceQueue q) {
+            super(referent, q);
+            this.data = data;
+        }
+
+        /**
+         * Getter for the data object.
+         */
+        D getData() {
+            return data;
+        }
+    }
+}

Reply via email to