Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-43 [created] dc3faefe4


# ignite-43


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dc3faefe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dc3faefe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dc3faefe

Branch: refs/heads/ignite-43
Commit: dc3faefe4984b329c324ce0f2d17f14a9cc17403
Parents: 180720f
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Dec 30 15:23:52 2014 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Dec 30 17:54:54 2014 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheEntryEvent.java    |  56 +++-
 .../processors/cache/IgniteCacheProxy.java      | 325 ++++++++++++++++++-
 .../grid/kernal/GridEventConsumeHandler.java    |   4 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 110 +++++++
 4 files changed, 476 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc3faefe/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java
index 42c6158..b3a4f52 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java
@@ -10,27 +10,61 @@
 package org.apache.ignite.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.events.*;
 
 import javax.cache.event.*;
-import java.util.*;
 
 /**
- * TODO: Add class description.
  *
- * @author @java.author
- * @version @java.version
  */
-public abstract class CacheEntryEvent<K, V> extends 
javax.cache.event.CacheEntryEvent<K, V> {
+public class CacheEntryEvent<K, V> extends 
javax.cache.event.CacheEntryEvent<K, V> {
     /** */
-    private UUID nodeId;
+    private final IgniteCacheEvent evt;
 
-    protected CacheEntryEvent(IgniteCache source, EventType eventType, UUID 
nodeId) {
-        super(source, eventType);
+    /**
+     * @param src Cache.
+     * @param type Event type.
+     * @param evt Ignite event.
+     */
+    public CacheEntryEvent(IgniteCache src, EventType type, IgniteCacheEvent 
evt) {
+        super(src, type);
 
-        this.nodeId = nodeId;
+        this.evt = evt;
     }
 
-    public UUID getNodeId() {
-        return nodeId;
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public V getOldValue() {
+        return (V)evt.oldValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isOldValueAvailable() {
+        return evt.hasOldValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public K getKey() {
+        return evt.key();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public V getValue() {
+        return (V)evt.newValue();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> T unwrap(Class<T> cls) {
+        if (cls.equals(IgniteCacheEvent.class))
+            return (T)evt;
+
+        throw new IllegalArgumentException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "CacheEntryEvent [evtType=" + getEventType() + ", evt=" + evt + 
']';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc3faefe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index a985fde..47d0722 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -11,8 +11,11 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.CacheEntryEvent;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.events.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
@@ -23,6 +26,7 @@ import org.jetbrains.annotations.*;
 
 import javax.cache.*;
 import javax.cache.configuration.*;
+import javax.cache.event.*;
 import javax.cache.expiry.*;
 import javax.cache.integration.*;
 import javax.cache.processor.*;
@@ -30,6 +34,8 @@ import java.io.*;
 import java.util.*;
 import java.util.concurrent.locks.*;
 
+import static org.apache.ignite.events.IgniteEventType.*;
+
 /**
  * Cache proxy.
  */
@@ -765,15 +771,322 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     }
 
     /** {@inheritDoc} */
-    @Override public void 
registerCacheEntryListener(CacheEntryListenerConfiguration 
cacheEntryLsnrConfiguration) {
-        // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+    @Override public void 
registerCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            A.notNull(lsnrCfg, "lsnrCfg");
+
+            Factory<CacheEntryListener<? super K, ? super V>> factory = 
lsnrCfg.getCacheEntryListenerFactory();
+
+            A.notNull(factory, "cacheEntryListenerFactory");
+
+            CacheEntryListener lsnr = factory.create();
+
+            A.notNull(lsnr, "lsnr");
+
+            EventCallback cb = new EventCallback(lsnr);
+
+            Set<Integer> types = new HashSet<>();
+
+            if (cb.create() || cb.update())
+                types.add(EVT_CACHE_OBJECT_PUT);
+
+            if (cb.remove())
+                types.add(EVT_CACHE_OBJECT_REMOVED);
+
+            if (cb.expire())
+                types.add(EVT_CACHE_OBJECT_EXPIRED);
+
+            if (types.isEmpty())
+                throw new IllegalArgumentException();
+
+            int[] types0 = new int[types.size()];
+
+            int i = 0;
+
+            for (Integer type : types)
+                types0[i++] = type;
+
+            EventFilter fltr = new EventFilter(cb.create(),
+                cb.update(),
+                lsnrCfg.getCacheEntryEventFilterFactory(),
+                ignite(),
+                ctx.name());
+
+            IgniteFuture<UUID> fut = 
ctx.kernalContext().continuous().startRoutine(
+                new GridEventConsumeHandler(cb, fltr, types0),
+                1,
+                0,
+                true,
+                null);
+
+            try {
+                fut.get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /**
+     *
+     */
+    static class EventFilter implements IgnitePredicate<IgniteEvent>, 
Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private boolean update;
+
+        /** */
+        private boolean create;
+
+        /** */
+        private Factory<CacheEntryEventFilter> fltrFactory;
+
+        /** */
+        private CacheEntryEventFilter fltr;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private IgniteCache cache;
+
+        /** */
+        private String cacheName;
+
+        /**
+         *
+         */
+        public EventFilter() {
+            // No-op.
+        }
+
+        /**
+         * @param create {@code True} if listens for create event.
+         * @param update {@code True} if listens for create event.
+         * @param fltrFactory Filter factory.
+         * @param ignite Ignite instance.
+         * @param cacheName Cache name.
+         */
+        EventFilter(
+            boolean create,
+            boolean update,
+            Factory<CacheEntryEventFilter> fltrFactory,
+            Ignite ignite,
+            @Nullable String cacheName) {
+            this.update = update;
+            this.create = create;
+            this.fltrFactory = fltrFactory;
+            this.ignite = ignite;
+            this.cacheName = cacheName;
+
+            if (fltrFactory != null)
+                fltr = fltrFactory.create();
+
+            cache = ignite.jcache(cacheName);
+
+            assert cache != null : cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public boolean apply(IgniteEvent evt) {
+            assert evt instanceof IgniteCacheEvent : evt;
+
+            IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt;
+
+            EventType evtType;
+
+            switch (cacheEvt.type()) {
+                case EVT_CACHE_OBJECT_REMOVED: {
+                    evtType = EventType.REMOVED;
+
+                    break;
+                }
+
+                case EVT_CACHE_OBJECT_PUT: {
+                    assert update || create;
+
+                    if (cacheEvt.hasOldValue()) {
+                        if (!update)
+                            return false;
+
+                        evtType = EventType.UPDATED;
+                    }
+                    else {
+                        if (!create)
+                            return false;
+
+                        evtType = EventType.CREATED;
+                    }
+
+                    break;
+                }
+
+                case EVT_CACHE_OBJECT_EXPIRED: {
+                    evtType = EventType.EXPIRED;
+
+                    break;
+                }
+
+                default:
+                    assert false : cacheEvt;
+
+                    throw new IgniteException("Unexpected event: " + cacheEvt);
+            }
+
+            if (cache == null) {
+                cache = ignite.jcache(cacheName);
+
+                assert cache != null : cacheName;
+            }
+
+            return fltr == null || fltr.evaluate(new CacheEntryEvent(cache, 
evtType, cacheEvt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            out.writeBoolean(create);
+
+            out.writeBoolean(update);
+
+            U.writeString(out, cacheName);
+
+            out.writeObject(fltrFactory);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            create = in.readBoolean();
+
+            update = in.readBoolean();
+
+            cacheName = U.readString(in);
+
+            fltrFactory = (Factory<CacheEntryEventFilter>)in.readObject();
+
+            if (fltrFactory != null)
+                fltr = fltrFactory.create();
+        }
+    }
+
+    /**
+     *
+     */
+    class EventCallback implements IgniteBiPredicate<UUID, IgniteEvent> {
+        /** */
+        private final CacheEntryCreatedListener createLsnr;
+
+        /** */
+        private final CacheEntryUpdatedListener updateLsnr;
+
+        /** */
+        private final CacheEntryRemovedListener rmvLsnr;
+
+        /** */
+        private final CacheEntryExpiredListener expireLsnr;
+
+        /**
+         * @param lsnr Listener.
+         */
+        EventCallback(CacheEntryListener lsnr) {
+            createLsnr = lsnr instanceof CacheEntryCreatedListener ? 
(CacheEntryCreatedListener)lsnr : null;
+            updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? 
(CacheEntryUpdatedListener)lsnr : null;
+            rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? 
(CacheEntryRemovedListener)lsnr : null;
+            expireLsnr = lsnr instanceof CacheEntryExpiredListener ? 
(CacheEntryExpiredListener)lsnr : null;
+        }
+
+        /**
+         * @return {@code True} if listens for create event.
+         */
+        boolean create() {
+            return createLsnr != null;
+        }
+
+        /**
+         * @return {@code True} if listens for update event.
+         */
+        boolean update() {
+            return updateLsnr != null;
+        }
+
+        /**
+         * @return {@code True} if listens for remove event.
+         */
+        boolean remove() {
+            return rmvLsnr != null;
+        }
+
+        /**
+         * @return {@code True} if listens for expire event.
+         */
+        boolean expire() {
+            return expireLsnr != null;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public boolean apply(UUID uuid, IgniteEvent evt) {
+            assert evt instanceof IgniteCacheEvent : evt;
+
+            IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt;
+
+            switch (cacheEvt.type()) {
+                case EVT_CACHE_OBJECT_REMOVED: {
+                    assert rmvLsnr != null;
+
+                    CacheEntryEvent evt0 = new 
CacheEntryEvent(IgniteCacheProxy.this, EventType.REMOVED, cacheEvt);
+
+                    rmvLsnr.onRemoved(Collections.singleton(evt0));
+
+                    break;
+                }
+
+                case EVT_CACHE_OBJECT_PUT: {
+                    if (cacheEvt.hasOldValue()) {
+                        assert updateLsnr != null;
+
+                        CacheEntryEvent evt0 = new 
CacheEntryEvent(IgniteCacheProxy.this, EventType.UPDATED, cacheEvt);
+
+                        updateLsnr.onUpdated(Collections.singleton(evt0));
+                    }
+                    else {
+                        assert createLsnr != null;
+
+                        CacheEntryEvent evt0 = new 
CacheEntryEvent(IgniteCacheProxy.this, EventType.CREATED, cacheEvt);
+
+                        createLsnr.onCreated(Collections.singleton(evt0));
+                    }
+
+                    break;
+                }
+
+                case EVT_CACHE_OBJECT_EXPIRED: {
+                    assert expireLsnr != null;
+
+                    CacheEntryEvent evt0 = new 
CacheEntryEvent(IgniteCacheProxy.this, EventType.EXPIRED, cacheEvt);
+
+                    expireLsnr.onExpired(Collections.singleton(evt0));
+
+                    break;
+                }
+            }
+
+            return false;
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration 
cacheEntryLsnrConfiguration) {
-        // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+    @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) {
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc3faefe/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
index a48f6e1..49fbb81 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
@@ -31,7 +31,7 @@ import static org.apache.ignite.events.IgniteEventType.*;
 /**
  * Continuous routine handler for remote event listening.
  */
-class GridEventConsumeHandler implements GridContinuousHandler {
+public class GridEventConsumeHandler implements GridContinuousHandler {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -76,7 +76,7 @@ class GridEventConsumeHandler implements 
GridContinuousHandler {
      * @param filter Filter.
      * @param types Types.
      */
-    GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, IgniteEvent> cb, 
@Nullable IgnitePredicate<IgniteEvent> filter,
+    public GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, 
IgniteEvent> cb, @Nullable IgnitePredicate<IgniteEvent> filter,
         @Nullable int[] types) {
         this.cb = cb == null ? DFLT_CALLBACK : cb;
         this.filter = filter;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc3faefe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
new file mode 100644
index 0000000..949b5c6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -0,0 +1,110 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.gridgain.grid.cache.*;
+
+import javax.cache.configuration.*;
+import javax.cache.event.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAbstractTest extends 
IgniteCacheAbstractTest {
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+
+    @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return PRIMARY;
+    }
+
+    @Override
+    protected IgniteConfiguration getConfiguration(String gridName) throws 
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setIncludeEventTypes(IgniteEventType.EVT_CACHE_OBJECT_PUT);
+
+        return cfg;
+    }
+
+    @Override
+    protected GridCacheConfiguration cacheConfiguration(String gridName) 
throws Exception {
+        GridCacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        //ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    public void testEvent() throws Exception {
+        Ignite ignite = ignite(0);
+
+        /*
+        ignite.events().remoteListen(new IgniteBiPredicate<UUID, 
IgniteEvent>() {
+            @Override public boolean apply(UUID uuid, IgniteEvent e) {
+                IgniteCacheEvent evt0 = (IgniteCacheEvent)e;
+
+                System.out.println("Event: " + uuid + " " + evt0.eventNode() + 
" " + e);
+
+                return false;
+            }
+        }, null, IgniteEventType.EVT_CACHE_OBJECT_PUT);
+        */
+
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        final CacheEntryCreatedListener<Integer, Integer> lsnr = new 
CacheEntryCreatedListener<Integer, Integer>() {
+            @Override public void onCreated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) throws CacheEntryListenerException {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt 
: evts) {
+                    System.out.println("Event: " + evt.getEventType() + " " + 
evt.getKey() + " " + evt.getOldValue() + " " + evt.getValue());
+                }
+            }
+        };
+
+        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration(
+            new Factory<CacheEntryListener>() {
+                @Override public CacheEntryListener create() {
+                    return lsnr;
+                }
+            },
+            null,
+            true,
+            false
+        );
+
+        cache.registerCacheEntryListener(lsnrCfg);
+
+        ignite(1).cache(null).put(1, 1);
+        ignite(0).cache(null).put(1, 2);
+
+        Thread.sleep(2000);
+    }
+}

Reply via email to