Repository: incubator-ignite Updated Branches: refs/heads/ignite-43 [created] dc3faefe4
# ignite-43 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dc3faefe Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dc3faefe Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dc3faefe Branch: refs/heads/ignite-43 Commit: dc3faefe4984b329c324ce0f2d17f14a9cc17403 Parents: 180720f Author: sboikov <sboi...@gridgain.com> Authored: Tue Dec 30 15:23:52 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Dec 30 17:54:54 2014 +0300 ---------------------------------------------------------------------- .../apache/ignite/cache/CacheEntryEvent.java | 56 +++- .../processors/cache/IgniteCacheProxy.java | 325 ++++++++++++++++++- .../grid/kernal/GridEventConsumeHandler.java | 4 +- .../IgniteCacheEntryListenerAbstractTest.java | 110 +++++++ 4 files changed, 476 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc3faefe/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java index 42c6158..b3a4f52 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java @@ -10,27 +10,61 @@ package org.apache.ignite.cache; import org.apache.ignite.*; +import org.apache.ignite.events.*; import javax.cache.event.*; -import java.util.*; /** - * TODO: Add class description. * - * @author @java.author - * @version @java.version */ -public abstract class CacheEntryEvent<K, V> extends javax.cache.event.CacheEntryEvent<K, V> { +public class CacheEntryEvent<K, V> extends javax.cache.event.CacheEntryEvent<K, V> { /** */ - private UUID nodeId; + private final IgniteCacheEvent evt; - protected CacheEntryEvent(IgniteCache source, EventType eventType, UUID nodeId) { - super(source, eventType); + /** + * @param src Cache. + * @param type Event type. + * @param evt Ignite event. + */ + public CacheEntryEvent(IgniteCache src, EventType type, IgniteCacheEvent evt) { + super(src, type); - this.nodeId = nodeId; + this.evt = evt; } - public UUID getNodeId() { - return nodeId; + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public V getOldValue() { + return (V)evt.oldValue(); + } + + /** {@inheritDoc} */ + @Override public boolean isOldValueAvailable() { + return evt.hasOldValue(); + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return evt.key(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public V getValue() { + return (V)evt.newValue(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> cls) { + if (cls.equals(IgniteCacheEvent.class)) + return (T)evt; + + throw new IllegalArgumentException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CacheEntryEvent [evtType=" + getEventType() + ", evt=" + evt + ']'; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc3faefe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index a985fde..47d0722 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -11,8 +11,11 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.CacheEntryEvent; import org.apache.ignite.cache.query.*; +import org.apache.ignite.events.*; import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -23,6 +26,7 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.configuration.*; +import javax.cache.event.*; import javax.cache.expiry.*; import javax.cache.integration.*; import javax.cache.processor.*; @@ -30,6 +34,8 @@ import java.io.*; import java.util.*; import java.util.concurrent.locks.*; +import static org.apache.ignite.events.IgniteEventType.*; + /** * Cache proxy. */ @@ -765,15 +771,322 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } /** {@inheritDoc} */ - @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration cacheEntryLsnrConfiguration) { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + A.notNull(lsnrCfg, "lsnrCfg"); + + Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory(); + + A.notNull(factory, "cacheEntryListenerFactory"); + + CacheEntryListener lsnr = factory.create(); + + A.notNull(lsnr, "lsnr"); + + EventCallback cb = new EventCallback(lsnr); + + Set<Integer> types = new HashSet<>(); + + if (cb.create() || cb.update()) + types.add(EVT_CACHE_OBJECT_PUT); + + if (cb.remove()) + types.add(EVT_CACHE_OBJECT_REMOVED); + + if (cb.expire()) + types.add(EVT_CACHE_OBJECT_EXPIRED); + + if (types.isEmpty()) + throw new IllegalArgumentException(); + + int[] types0 = new int[types.size()]; + + int i = 0; + + for (Integer type : types) + types0[i++] = type; + + EventFilter fltr = new EventFilter(cb.create(), + cb.update(), + lsnrCfg.getCacheEntryEventFilterFactory(), + ignite(), + ctx.name()); + + IgniteFuture<UUID> fut = ctx.kernalContext().continuous().startRoutine( + new GridEventConsumeHandler(cb, fltr, types0), + 1, + 0, + true, + null); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + finally { + gate.leave(prev); + } + } + + /** + * + */ + static class EventFilter implements IgnitePredicate<IgniteEvent>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private boolean update; + + /** */ + private boolean create; + + /** */ + private Factory<CacheEntryEventFilter> fltrFactory; + + /** */ + private CacheEntryEventFilter fltr; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteCache cache; + + /** */ + private String cacheName; + + /** + * + */ + public EventFilter() { + // No-op. + } + + /** + * @param create {@code True} if listens for create event. + * @param update {@code True} if listens for create event. + * @param fltrFactory Filter factory. + * @param ignite Ignite instance. + * @param cacheName Cache name. + */ + EventFilter( + boolean create, + boolean update, + Factory<CacheEntryEventFilter> fltrFactory, + Ignite ignite, + @Nullable String cacheName) { + this.update = update; + this.create = create; + this.fltrFactory = fltrFactory; + this.ignite = ignite; + this.cacheName = cacheName; + + if (fltrFactory != null) + fltr = fltrFactory.create(); + + cache = ignite.jcache(cacheName); + + assert cache != null : cacheName; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean apply(IgniteEvent evt) { + assert evt instanceof IgniteCacheEvent : evt; + + IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt; + + EventType evtType; + + switch (cacheEvt.type()) { + case EVT_CACHE_OBJECT_REMOVED: { + evtType = EventType.REMOVED; + + break; + } + + case EVT_CACHE_OBJECT_PUT: { + assert update || create; + + if (cacheEvt.hasOldValue()) { + if (!update) + return false; + + evtType = EventType.UPDATED; + } + else { + if (!create) + return false; + + evtType = EventType.CREATED; + } + + break; + } + + case EVT_CACHE_OBJECT_EXPIRED: { + evtType = EventType.EXPIRED; + + break; + } + + default: + assert false : cacheEvt; + + throw new IgniteException("Unexpected event: " + cacheEvt); + } + + if (cache == null) { + cache = ignite.jcache(cacheName); + + assert cache != null : cacheName; + } + + return fltr == null || fltr.evaluate(new CacheEntryEvent(cache, evtType, cacheEvt)); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeBoolean(create); + + out.writeBoolean(update); + + U.writeString(out, cacheName); + + out.writeObject(fltrFactory); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + create = in.readBoolean(); + + update = in.readBoolean(); + + cacheName = U.readString(in); + + fltrFactory = (Factory<CacheEntryEventFilter>)in.readObject(); + + if (fltrFactory != null) + fltr = fltrFactory.create(); + } + } + + /** + * + */ + class EventCallback implements IgniteBiPredicate<UUID, IgniteEvent> { + /** */ + private final CacheEntryCreatedListener createLsnr; + + /** */ + private final CacheEntryUpdatedListener updateLsnr; + + /** */ + private final CacheEntryRemovedListener rmvLsnr; + + /** */ + private final CacheEntryExpiredListener expireLsnr; + + /** + * @param lsnr Listener. + */ + EventCallback(CacheEntryListener lsnr) { + createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null; + updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null; + rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null; + expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null; + } + + /** + * @return {@code True} if listens for create event. + */ + boolean create() { + return createLsnr != null; + } + + /** + * @return {@code True} if listens for update event. + */ + boolean update() { + return updateLsnr != null; + } + + /** + * @return {@code True} if listens for remove event. + */ + boolean remove() { + return rmvLsnr != null; + } + + /** + * @return {@code True} if listens for expire event. + */ + boolean expire() { + return expireLsnr != null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean apply(UUID uuid, IgniteEvent evt) { + assert evt instanceof IgniteCacheEvent : evt; + + IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt; + + switch (cacheEvt.type()) { + case EVT_CACHE_OBJECT_REMOVED: { + assert rmvLsnr != null; + + CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.REMOVED, cacheEvt); + + rmvLsnr.onRemoved(Collections.singleton(evt0)); + + break; + } + + case EVT_CACHE_OBJECT_PUT: { + if (cacheEvt.hasOldValue()) { + assert updateLsnr != null; + + CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.UPDATED, cacheEvt); + + updateLsnr.onUpdated(Collections.singleton(evt0)); + } + else { + assert createLsnr != null; + + CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.CREATED, cacheEvt); + + createLsnr.onCreated(Collections.singleton(evt0)); + } + + break; + } + + case EVT_CACHE_OBJECT_EXPIRED: { + assert expireLsnr != null; + + CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.EXPIRED, cacheEvt); + + expireLsnr.onExpired(Collections.singleton(evt0)); + + break; + } + } + + return false; + } } /** {@inheritDoc} */ - @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration cacheEntryLsnrConfiguration) { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc3faefe/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java index a48f6e1..49fbb81 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java @@ -31,7 +31,7 @@ import static org.apache.ignite.events.IgniteEventType.*; /** * Continuous routine handler for remote event listening. */ -class GridEventConsumeHandler implements GridContinuousHandler { +public class GridEventConsumeHandler implements GridContinuousHandler { /** */ private static final long serialVersionUID = 0L; @@ -76,7 +76,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { * @param filter Filter. * @param types Types. */ - GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, IgniteEvent> cb, @Nullable IgnitePredicate<IgniteEvent> filter, + public GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, IgniteEvent> cb, @Nullable IgnitePredicate<IgniteEvent> filter, @Nullable int[] types) { this.cb = cb == null ? DFLT_CALLBACK : cb; this.filter = filter; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc3faefe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java new file mode 100644 index 0000000..949b5c6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -0,0 +1,110 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.gridgain.grid.cache.*; + +import javax.cache.configuration.*; +import javax.cache.event.*; + +import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest { + @Override protected int gridCount() { + return 3; + } + + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } + + @Override + protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setIncludeEventTypes(IgniteEventType.EVT_CACHE_OBJECT_PUT); + + return cfg; + } + + @Override + protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { + GridCacheConfiguration ccfg = super.cacheConfiguration(gridName); + + //ccfg.setBackups(1); + + return ccfg; + } + + public void testEvent() throws Exception { + Ignite ignite = ignite(0); + + /* + ignite.events().remoteListen(new IgniteBiPredicate<UUID, IgniteEvent>() { + @Override public boolean apply(UUID uuid, IgniteEvent e) { + IgniteCacheEvent evt0 = (IgniteCacheEvent)e; + + System.out.println("Event: " + uuid + " " + evt0.eventNode() + " " + e); + + return false; + } + }, null, IgniteEventType.EVT_CACHE_OBJECT_PUT); + */ + + IgniteCache<Integer, Integer> cache = jcache(); + + final CacheEntryCreatedListener<Integer, Integer> lsnr = new CacheEntryCreatedListener<Integer, Integer>() { + @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) throws CacheEntryListenerException { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + System.out.println("Event: " + evt.getEventType() + " " + evt.getKey() + " " + evt.getOldValue() + " " + evt.getValue()); + } + } + }; + + CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration( + new Factory<CacheEntryListener>() { + @Override public CacheEntryListener create() { + return lsnr; + } + }, + null, + true, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + + ignite(1).cache(null).put(1, 1); + ignite(0).cache(null).put(1, 2); + + Thread.sleep(2000); + } +}