# ignite-43
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9f02315 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9f02315 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9f02315 Branch: refs/heads/ignite-65 Commit: f9f02315992b046106b8fad38b14c37887e6103c Parents: dc3faef Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 31 12:03:37 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 31 15:53:08 2014 +0300 ---------------------------------------------------------------------- .../apache/ignite/cache/CacheEntryEvent.java | 29 +- .../processors/cache/IgniteCacheProxy.java | 323 +---------- .../grid/cache/GridCacheConfiguration.java | 1 + .../cache/query/GridCacheContinuousQuery.java | 10 +- .../grid/kernal/GridEventConsumeHandler.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 49 +- .../processors/cache/GridCacheProcessor.java | 2 +- .../GridCacheDataStructuresManager.java | 2 +- .../GridCacheContinuousQueryAdapter.java | 39 +- .../GridCacheContinuousQueryEntry.java | 28 +- .../GridCacheContinuousQueryHandler.java | 23 +- .../GridCacheContinuousQueryHandlerV2.java | 12 +- .../GridCacheContinuousQueryManager.java | 498 ++++++++++++++++- .../service/GridServiceProcessor.java | 4 +- .../IgniteCacheEntryListenerAbstractTest.java | 535 +++++++++++++++++-- ...IgniteCacheEntryListenerAtomicLocalTest.java | 41 ++ ...eCacheEntryListenerAtomicReplicatedTest.java | 24 + .../IgniteCacheEntryListenerAtomicTest.java | 47 ++ .../IgniteCacheEntryListenerTxLocalTest.java | 41 ++ ...gniteCacheEntryListenerTxReplicatedTest.java | 24 + .../cache/IgniteCacheEntryListenerTxTest.java | 41 ++ .../bamboo/GridDataGridTestSuite.java | 7 + 22 files changed, 1341 insertions(+), 443 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java index b3a4f52..b480ca4 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java @@ -10,7 +10,7 @@ package org.apache.ignite.cache; import org.apache.ignite.*; -import org.apache.ignite.events.*; +import org.gridgain.grid.cache.query.*; import javax.cache.event.*; @@ -19,52 +19,49 @@ import javax.cache.event.*; */ public class CacheEntryEvent<K, V> extends javax.cache.event.CacheEntryEvent<K, V> { /** */ - private final IgniteCacheEvent evt; + private final GridCacheContinuousQueryEntry<K, V> e; /** * @param src Cache. * @param type Event type. - * @param evt Ignite event. + * @param e Ignite event. */ - public CacheEntryEvent(IgniteCache src, EventType type, IgniteCacheEvent evt) { + public CacheEntryEvent(IgniteCache src, EventType type, GridCacheContinuousQueryEntry<K, V> e) { super(src, type); - this.evt = evt; + this.e = e; } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") @Override public V getOldValue() { - return (V)evt.oldValue(); + return e.getOldValue(); } /** {@inheritDoc} */ @Override public boolean isOldValueAvailable() { - return evt.hasOldValue(); + return e.getOldValue() != null; } /** {@inheritDoc} */ @Override public K getKey() { - return evt.key(); + return e.getKey(); } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") @Override public V getValue() { - return (V)evt.newValue(); + return e.getValue(); } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> cls) { - if (cls.equals(IgniteCacheEvent.class)) - return (T)evt; - throw new IllegalArgumentException(); } /** {@inheritDoc} */ @Override public String toString() { - return "CacheEntryEvent [evtType=" + getEventType() + ", evt=" + evt + ']'; + return "CacheEntryEvent [evtType=" + getEventType() + + ", key=" + getKey() + + ", val=" + getValue() + + ", oldVal=" + getOldValue() + ']'; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 47d0722..b526f15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -11,11 +11,8 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.CacheEntryEvent; import org.apache.ignite.cache.query.*; -import org.apache.ignite.events.*; import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -26,7 +23,6 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.configuration.*; -import javax.cache.event.*; import javax.cache.expiry.*; import javax.cache.integration.*; import javax.cache.processor.*; @@ -34,8 +30,6 @@ import java.io.*; import java.util.*; import java.util.concurrent.locks.*; -import static org.apache.ignite.events.IgniteEventType.*; - /** * Cache proxy. */ @@ -771,325 +765,36 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } /** {@inheritDoc} */ - @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { + @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - A.notNull(lsnrCfg, "lsnrCfg"); - - Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory(); - - A.notNull(factory, "cacheEntryListenerFactory"); - - CacheEntryListener lsnr = factory.create(); - - A.notNull(lsnr, "lsnr"); - - EventCallback cb = new EventCallback(lsnr); - - Set<Integer> types = new HashSet<>(); - - if (cb.create() || cb.update()) - types.add(EVT_CACHE_OBJECT_PUT); - - if (cb.remove()) - types.add(EVT_CACHE_OBJECT_REMOVED); - - if (cb.expire()) - types.add(EVT_CACHE_OBJECT_EXPIRED); - - if (types.isEmpty()) - throw new IllegalArgumentException(); - - int[] types0 = new int[types.size()]; - - int i = 0; - - for (Integer type : types) - types0[i++] = type; - - EventFilter fltr = new EventFilter(cb.create(), - cb.update(), - lsnrCfg.getCacheEntryEventFilterFactory(), - ignite(), - ctx.name()); - - IgniteFuture<UUID> fut = ctx.kernalContext().continuous().startRoutine( - new GridEventConsumeHandler(cb, fltr, types0), - 1, - 0, - true, - null); - - try { - fut.get(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + ctx.continuousQueries().registerCacheEntryListener(lsnrCfg); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { gate.leave(prev); } } - /** - * - */ - static class EventFilter implements IgnitePredicate<IgniteEvent>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private boolean update; - - /** */ - private boolean create; - - /** */ - private Factory<CacheEntryEventFilter> fltrFactory; - - /** */ - private CacheEntryEventFilter fltr; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - private IgniteCache cache; - - /** */ - private String cacheName; - - /** - * - */ - public EventFilter() { - // No-op. - } - - /** - * @param create {@code True} if listens for create event. - * @param update {@code True} if listens for create event. - * @param fltrFactory Filter factory. - * @param ignite Ignite instance. - * @param cacheName Cache name. - */ - EventFilter( - boolean create, - boolean update, - Factory<CacheEntryEventFilter> fltrFactory, - Ignite ignite, - @Nullable String cacheName) { - this.update = update; - this.create = create; - this.fltrFactory = fltrFactory; - this.ignite = ignite; - this.cacheName = cacheName; - - if (fltrFactory != null) - fltr = fltrFactory.create(); - - cache = ignite.jcache(cacheName); - - assert cache != null : cacheName; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean apply(IgniteEvent evt) { - assert evt instanceof IgniteCacheEvent : evt; - - IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt; - - EventType evtType; - - switch (cacheEvt.type()) { - case EVT_CACHE_OBJECT_REMOVED: { - evtType = EventType.REMOVED; - - break; - } - - case EVT_CACHE_OBJECT_PUT: { - assert update || create; - - if (cacheEvt.hasOldValue()) { - if (!update) - return false; - - evtType = EventType.UPDATED; - } - else { - if (!create) - return false; - - evtType = EventType.CREATED; - } - - break; - } - - case EVT_CACHE_OBJECT_EXPIRED: { - evtType = EventType.EXPIRED; - - break; - } - - default: - assert false : cacheEvt; - - throw new IgniteException("Unexpected event: " + cacheEvt); - } - - if (cache == null) { - cache = ignite.jcache(cacheName); - - assert cache != null : cacheName; - } - - return fltr == null || fltr.evaluate(new CacheEntryEvent(cache, evtType, cacheEvt)); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(create); - - out.writeBoolean(update); - - U.writeString(out, cacheName); - - out.writeObject(fltrFactory); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - create = in.readBoolean(); - - update = in.readBoolean(); - - cacheName = U.readString(in); - - fltrFactory = (Factory<CacheEntryEventFilter>)in.readObject(); - - if (fltrFactory != null) - fltr = fltrFactory.create(); - } - } - - /** - * - */ - class EventCallback implements IgniteBiPredicate<UUID, IgniteEvent> { - /** */ - private final CacheEntryCreatedListener createLsnr; - - /** */ - private final CacheEntryUpdatedListener updateLsnr; - - /** */ - private final CacheEntryRemovedListener rmvLsnr; - - /** */ - private final CacheEntryExpiredListener expireLsnr; - - /** - * @param lsnr Listener. - */ - EventCallback(CacheEntryListener lsnr) { - createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null; - updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null; - rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null; - expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null; - } - - /** - * @return {@code True} if listens for create event. - */ - boolean create() { - return createLsnr != null; - } - - /** - * @return {@code True} if listens for update event. - */ - boolean update() { - return updateLsnr != null; - } + /** {@inheritDoc} */ + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - /** - * @return {@code True} if listens for remove event. - */ - boolean remove() { - return rmvLsnr != null; + try { + ctx.continuousQueries().deregisterCacheEntryListener(lsnrCfg); } - - /** - * @return {@code True} if listens for expire event. - */ - boolean expire() { - return expireLsnr != null; + catch (IgniteCheckedException e) { + throw cacheException(e); } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean apply(UUID uuid, IgniteEvent evt) { - assert evt instanceof IgniteCacheEvent : evt; - - IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt; - - switch (cacheEvt.type()) { - case EVT_CACHE_OBJECT_REMOVED: { - assert rmvLsnr != null; - - CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.REMOVED, cacheEvt); - - rmvLsnr.onRemoved(Collections.singleton(evt0)); - - break; - } - - case EVT_CACHE_OBJECT_PUT: { - if (cacheEvt.hasOldValue()) { - assert updateLsnr != null; - - CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.UPDATED, cacheEvt); - - updateLsnr.onUpdated(Collections.singleton(evt0)); - } - else { - assert createLsnr != null; - - CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.CREATED, cacheEvt); - - createLsnr.onCreated(Collections.singleton(evt0)); - } - - break; - } - - case EVT_CACHE_OBJECT_EXPIRED: { - assert expireLsnr != null; - - CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.EXPIRED, cacheEvt); - - expireLsnr.onExpired(Collections.singleton(evt0)); - - break; - } - } - - return false; + finally { + gate.leave(prev); } } /** {@inheritDoc} */ - @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { - } - - /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { // TODO IGNITE-1. throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java index 5a9a675..149bee9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java @@ -362,6 +362,7 @@ public class GridCacheConfiguration extends MutableConfiguration { interceptor = cc.getInterceptor(); invalidate = cc.isInvalidate(); keepPortableInStore = cc.isKeepPortableInStore(); + listenerConfigurations = cc.listenerConfigurations; offHeapMaxMem = cc.getOffHeapMaxMemory(); maxConcurrentAsyncOps = cc.getMaxConcurrentAsyncOperations(); maxQryIterCnt = cc.getMaximumQueryIteratorCount(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java index a2675aa..60db85a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java @@ -136,13 +136,13 @@ public interface GridCacheContinuousQuery<K, V> extends AutoCloseable { * can get deadlocks. * * @param cb Local callback. - * @deprecated Deprecated in favor of {@link #localCallback(org.apache.ignite.lang.IgniteBiPredicate)} method. + * @deprecated Deprecated in favor of {@link #localCallback(IgniteBiPredicate)} method. */ @Deprecated public void callback(@Nullable IgniteBiPredicate<UUID, Collection<Map.Entry<K, V>>> cb); /** - * Gets local callback. See {@link #callback(org.apache.ignite.lang.IgniteBiPredicate)} for more information. + * Gets local callback. See {@link #callback(IgniteBiPredicate)} for more information. * * @return Local callback. * @deprecated Deprecated in favor of {@link #localCallback()} method. @@ -167,7 +167,7 @@ public interface GridCacheContinuousQuery<K, V> extends AutoCloseable { public void filter(@Nullable IgniteBiPredicate<K, V> filter); /** - * Gets key-value filter. See {@link #filter(org.apache.ignite.lang.IgniteBiPredicate)} for more information. + * Gets key-value filter. See {@link #filter(IgniteBiPredicate)} for more information. * * @return Key-value filter. * @deprecated Deprecated in favor of {@link #remoteFilter()} method. @@ -197,7 +197,7 @@ public interface GridCacheContinuousQuery<K, V> extends AutoCloseable { public void localCallback(IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> locCb); /** - * Gets local callback. See {@link #callback(org.apache.ignite.lang.IgniteBiPredicate)} for more information. + * Gets local callback. See {@link #callback(IgniteBiPredicate)} for more information. * * @return Local callback. */ @@ -218,7 +218,7 @@ public interface GridCacheContinuousQuery<K, V> extends AutoCloseable { public void remoteFilter(@Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter); /** - * Gets key-value filter. See {@link #filter(org.apache.ignite.lang.IgniteBiPredicate)} for more information. + * Gets key-value filter. See {@link #filter(IgniteBiPredicate)} for more information. * * @return Key-value filter. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java index 49fbb81..a48f6e1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java @@ -31,7 +31,7 @@ import static org.apache.ignite.events.IgniteEventType.*; /** * Continuous routine handler for remote event listening. */ -public class GridEventConsumeHandler implements GridContinuousHandler { +class GridEventConsumeHandler implements GridContinuousHandler { /** */ private static final long serialVersionUID = 0L; @@ -76,7 +76,7 @@ public class GridEventConsumeHandler implements GridContinuousHandler { * @param filter Filter. * @param types Types. */ - public GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, IgniteEvent> cb, @Nullable IgnitePredicate<IgniteEvent> filter, + GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, IgniteEvent> cb, @Nullable IgnitePredicate<IgniteEvent> filter, @Nullable int[] types) { this.cb = cb == null ? DFLT_CALLBACK : cb; this.filter = filter; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index bb493cc..b19e44f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -775,9 +775,23 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> ret = old; } - if (evt && expired && cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { - cctx.events().addEvent(partition(), key, tx, owner, EVT_CACHE_OBJECT_EXPIRED, null, false, expiredVal, - expiredVal != null || hasOldBytes, subjId, null, taskName); + if (evt && expired) { + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { + cctx.events().addEvent(partition(), + key, + tx, + owner, + EVT_CACHE_OBJECT_EXPIRED, + null, + false, + expiredVal, + expiredVal != null || hasOldBytes, + subjId, + null, + taskName); + } + + cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null); // No more notifications. evt = false; @@ -1147,7 +1161,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (mode == GridCacheMode.LOCAL || mode == GridCacheMode.REPLICATED || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes); + cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1309,7 +1323,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (mode == GridCacheMode.LOCAL || mode == GridCacheMode.REPLICATED || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes); + cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1580,7 +1594,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (metrics) cctx.cache().metrics0().onWrite(); - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes); + cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, op == DELETE); @@ -2038,7 +2052,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> cctx.cache().metrics0().onWrite(); if (primary || cctx.isReplicated()) - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes); + cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, op == DELETE); @@ -2959,8 +2973,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> /** {@inheritDoc} */ @SuppressWarnings({"RedundantTypeArguments"}) - @Override public boolean initialValue(V val, byte[] valBytes, GridCacheVersion ver, long ttl, long expireTime, - boolean preload, long topVer, GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException { + @Override public boolean initialValue(V val, + byte[] valBytes, + GridCacheVersion ver, + long ttl, + long expireTime, + boolean preload, + long topVer, + GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException { if (cctx.isUnmarshalValues() && valBytes != null && val == null && isNewLocked()) val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); @@ -2996,8 +3016,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> drReplicate(drType, val, valBytes, ver); if (!skipQryNtf) { - if (cctx.affinity().primary(cctx.localNode(), key, topVer) || cctx.isReplicated()) - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), null, null); + if (cctx.affinity().primary(cctx.localNode(), key, topVer) || cctx.isReplicated()) { + cctx.continuousQueries().onEntryUpdate(this, + key, + val, + valueBytesUnlocked(), + null, + null, + preload); + } cctx.dataStructures().onEntryUpdated(key, false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index 8e9cea0..46755c6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -43,6 +43,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.configuration.*; import javax.management.*; import java.util.*; @@ -811,7 +812,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheCtx.cache().start(); - if (log.isInfoEnabled()) log.info("Started cache [name=" + cfg.getName() + ", mode=" + cfg.getCacheMode() + ']'); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java index 35b10b7..db072d6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java @@ -679,7 +679,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager } }); - queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, true); + queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, true, false); } GridCacheQueueProxy queue = queuesMap.get(header.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java index 9b88858..c00c961 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.query.continuous; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry; @@ -24,6 +23,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.util.*; import java.util.concurrent.locks.*; @@ -213,12 +213,12 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou /** {@inheritDoc} */ @Override public void execute() throws IgniteCheckedException { - execute(null, false); + execute(null, false, false); } /** {@inheritDoc} */ @Override public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException { - execute(prj, false); + execute(prj, false, false); } /** @@ -226,9 +226,10 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou * * @param prj Grid projection. * @param internal If {@code true} then query notified about internal entries updates. + * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @throws IgniteCheckedException If failed. */ - public void execute(@Nullable ClusterGroup prj, boolean internal) throws IgniteCheckedException { + public void execute(@Nullable ClusterGroup prj, boolean internal, boolean entryLsnr) throws IgniteCheckedException { if (locCb == null) throw new IllegalStateException("Mandatory local callback is not set for the query: " + this); @@ -271,12 +272,32 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou guard.block(); - GridContinuousHandler hnd = ctx.kernalContext().security().enabled() ? - new GridCacheContinuousQueryHandlerV2<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal, - ctx.kernalContext().job().currentTaskNameHash()) : - new GridCacheContinuousQueryHandler<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal); + GridContinuousHandler hnd; + + if (ctx.kernalContext().security().enabled()) { + hnd = new GridCacheContinuousQueryHandlerV2<>(ctx.name(), + topic, + locCb, + rmtFilter, + prjPred, + internal, + entryLsnr, + ctx.kernalContext().job().currentTaskNameHash()); + } + else { + hnd = new GridCacheContinuousQueryHandler<>(ctx.name(), + topic, + locCb, + rmtFilter, + prjPred, + internal, + entryLsnr); + } - routineId = ctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, autoUnsubscribe, + routineId = ctx.kernalContext().continuous().startRoutine(hnd, + bufSize, + timeInterval, + autoUnsubscribe, prj.predicate()).get(); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java index 991573b..3c5265c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java @@ -77,6 +77,9 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> @GridToStringExclude private GridDeploymentInfo depInfo; + /** */ + private boolean expired; + /** * Required by {@link Externalizable}. */ @@ -85,7 +88,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> impl = null; } - /** + /* * @param ctx Cache context. * @param impl Cache entry. * @param key Key. @@ -93,9 +96,16 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> * @param newValBytes Value bytes. * @param oldVal Old value. * @param oldValBytes Old value bytes. + * @param expired {@code True} if created for expired entry. */ - GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx, GridCacheEntry<K, V> impl, K key, @Nullable V newVal, - @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal, @Nullable GridCacheValueBytes oldValBytes) { + GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx, + GridCacheEntry<K, V> impl, + K key, + @Nullable V newVal, + @Nullable GridCacheValueBytes newValBytes, + @Nullable V oldVal, + @Nullable GridCacheValueBytes oldValBytes, + boolean expired) { assert ctx != null; assert impl != null; assert key != null; @@ -107,6 +117,14 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> this.newValBytes = newValBytes; this.oldVal = oldVal; this.oldValBytes = oldValBytes; + this.expired = expired; + } + + /** + * @return {@code True} if entry expired. + */ + public boolean expired() { + return expired; } /** @@ -710,6 +728,8 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> out.writeObject(newVal); out.writeObject(oldVal); } + + out.writeBoolean(expired); } /** {@inheritDoc} */ @@ -734,6 +754,8 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> newVal = (V)in.readObject(); oldVal = (V)in.readObject(); } + + expired = in.readBoolean(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java index 9d810c0..e61b2a2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java @@ -24,6 +24,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.io.*; import java.util.*; @@ -61,6 +62,9 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Internal flag. */ private boolean internal; + /** Entry listener flag. */ + private boolean entryLsnr; + /** * Required by {@link Externalizable}. */ @@ -75,11 +79,15 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param filter Filter. * @param prjPred Projection predicate. * @param internal If {@code true} then query is notified about internal entries updates. + * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. */ - GridCacheContinuousQueryHandler(@Nullable String cacheName, Object topic, + GridCacheContinuousQueryHandler(@Nullable String cacheName, + Object topic, IgniteBiPredicate<UUID, Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>>> cb, @Nullable IgnitePredicate<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> filter, - @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal) { + @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, + boolean internal, + boolean entryLsnr) { assert topic != null; assert cb != null; @@ -89,6 +97,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.filter = filter; this.prjPred = prjPred; this.internal = internal; + this.entryLsnr = entryLsnr; } /** {@inheritDoc} */ @@ -183,7 +192,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } - if (recordEvt) { + if (!entryLsnr && recordEvt) { ctx.event().record(new IgniteCacheQueryReadEvent<>( ctx.discovery().localNode(), "Continuous query executed.", @@ -241,12 +250,12 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } }; - return manager(ctx).registerListener(nodeId, routineId, lsnr, internal); + return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr); } /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - manager(ctx).iterate(internal, routineId); + manager(ctx).iterate(internal, routineId, entryLsnr); } /** {@inheritDoc} */ @@ -371,6 +380,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { out.writeObject(prjPred); out.writeBoolean(internal); + + out.writeBoolean(entryLsnr); } /** {@inheritDoc} */ @@ -394,6 +405,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { prjPred = (IgnitePredicate<GridCacheEntry<K, V>>)in.readObject(); internal = in.readBoolean(); + + entryLsnr = in.readBoolean(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java index 6a185fd..63209ec 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java @@ -14,6 +14,7 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.io.*; import java.util.*; @@ -41,13 +42,18 @@ public class GridCacheContinuousQueryHandlerV2<K, V> extends GridCacheContinuous * @param filter Filter. * @param prjPred Projection predicate. * @param internal If {@code true} then query is notified about internal entries updates. + * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @param taskHash Task hash. */ - public GridCacheContinuousQueryHandlerV2(@Nullable String cacheName, Object topic, + public GridCacheContinuousQueryHandlerV2(@Nullable String cacheName, + Object topic, IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> cb, @Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter, - @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal, int taskHash) { - super(cacheName, topic, cb, filter, prjPred, internal); + @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, + boolean internal, + boolean entryLsnr, + int taskHash) { + super(cacheName, topic, cb, filter, prjPred, internal, entryLsnr); this.taskHash = taskHash; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java index b67d93c..8c1b70e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java @@ -11,18 +11,25 @@ package org.gridgain.grid.kernal.processors.cache.query.continuous; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.configuration.*; +import javax.cache.event.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static javax.cache.event.EventType.*; import static org.apache.ignite.events.IgniteEventType.*; import static org.gridgain.grid.kernal.GridTopic.*; @@ -48,6 +55,10 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt /** Query sequence number for message topic. */ private final AtomicLong seq = new AtomicLong(); + /** Continues queries created for cache event listeners. */ + private final ConcurrentMap<CacheEntryListenerConfiguration, GridCacheContinuousQuery<K, V>> lsnrQrys = + new ConcurrentHashMap8<>(); + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { // Append cache name to the topic. @@ -55,6 +66,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override protected void onKernalStart0() throws IgniteCheckedException { if (intLsnrCnt.get() > 0 || lsnrCnt.get() > 0) { Collection<ClusterNode> nodes = cctx.discovery().cacheNodes(cctx.name(), -1); @@ -65,6 +77,30 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt "for versions below 6.2.0"); } } + + Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = cctx.config().getCacheEntryListenerConfigurations(); + + if (lsnrCfgs != null) { + IgniteCacheProxy<K, V> cache = cctx.kernalContext().cache().jcache(cctx.name()); + + for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs) + cache.registerCacheEntryListener(cfg); + } + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + super.onKernalStop0(cancel); + + for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) { + try { + deregisterCacheEntryListener(lsnrCfg); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to remove cache entry listener: " + e); + } + } } /** @@ -84,10 +120,16 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * @param newBytes New value bytes. * @param oldVal Old value. * @param oldBytes Old value bytes. + * @param preload {@code True} if entry is updated during preloading. * @throws IgniteCheckedException In case of error. */ - public void onEntryUpdate(GridCacheEntryEx<K, V> e, K key, @Nullable V newVal, - @Nullable GridCacheValueBytes newBytes, V oldVal, @Nullable GridCacheValueBytes oldBytes) throws IgniteCheckedException { + public void onEntryUpdate(GridCacheEntryEx<K, V> e, + K key, + @Nullable V newVal, + @Nullable GridCacheValueBytes newBytes, + V oldVal, + @Nullable GridCacheValueBytes oldBytes, + boolean preload) throws IgniteCheckedException { assert e != null; assert key != null; @@ -104,25 +146,145 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt oldVal = cctx.unwrapTemporary(oldVal); GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( - cctx, e.wrap(false), key, newVal, newBytes, oldVal, oldBytes); + cctx, + e.wrap(false), + key, + newVal, + newBytes, + oldVal, + oldBytes, + false); e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader()); boolean recordEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - for (ListenerInfo<K, V> lsnr : lsnrCol.values()) + for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { + if (preload && lsnr.entryListener()) + continue; + lsnr.onEntryUpdate(e0, recordEvt); + } + } + + /** + * @param e Entry. + * @param key Key. + * @param oldVal Old value. + * @param oldBytes Old value bytes. + */ + public void onEntryExpired(GridCacheEntryEx<K, V> e, + K key, + V oldVal, + @Nullable GridCacheValueBytes oldBytes) { + if (e.isInternal()) + return; + + ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol = lsnrs; + + if (F.isEmpty(lsnrCol)) + return; + + GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( + cctx, + e.wrap(false), + key, + null, + null, + oldVal, + oldBytes, + true); + + for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { + if (!lsnr.entryListener()) + continue; + + lsnr.onEntryUpdate(e0, false); + } + } + + /** + * @param lsnrCfg Listener configuration. + * @throws IgniteCheckedException If failed. + */ + public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) + throws IgniteCheckedException { + GridCacheContinuousQueryAdapter<K, V> qry = null; + + try { + A.notNull(lsnrCfg, "lsnrCfg"); + + Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory(); + + A.notNull(factory, "cacheEntryListenerFactory"); + + CacheEntryListener lsnr = factory.create(); + + A.notNull(lsnr, "lsnr"); + + IgniteCacheProxy<K, V> cache= cctx.kernalContext().cache().jcache(cctx.name()); + + EntryListenerCallback cb = new EntryListenerCallback(cache, lsnr); + + if (!(cb.create() || cb.update() || cb.remove() || cb.expire())) + throw new IllegalArgumentException("Listener must implement one of CacheEntryListener sub-interfaces."); + + qry = (GridCacheContinuousQueryAdapter<K, V>)cctx.cache().queries().createContinuousQuery(); + + GridCacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, qry); + + if (old != null) + throw new IllegalArgumentException("Listener is already registered for configuration: " + lsnrCfg); + + qry.autoUnsubscribe(true); + + qry.bufferSize(1); + + qry.localCallback(cb); + + EntryListenerFilter<K, V> fltr = new EntryListenerFilter<>(cb.create(), + cb.update(), + cb.remove(), + cb.expire(), + lsnrCfg.getCacheEntryEventFilterFactory(), + cctx.kernalContext().grid(), + cctx.name()); + + qry.remoteFilter(fltr); + + qry.execute(null, false, true); + } + catch (IgniteCheckedException e) { + lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it. + + throw e; + } + } + + /** + * @param lsnrCfg Listener configuration. + * @throws IgniteCheckedException If failed. + */ + public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) throws IgniteCheckedException { + A.notNull(lsnrCfg, "lsnrCfg"); + + GridCacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg); + + if (qry != null) + qry.close(); } /** - * @param nodeId Node ID. * @param lsnrId Listener ID. * @param lsnr Listener. * @param internal Internal flag. + * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @return Whether listener was actually registered. */ - boolean registerListener(UUID nodeId, UUID lsnrId, GridCacheContinuousQueryListener<K, V> lsnr, boolean internal) { - ListenerInfo<K, V> info = new ListenerInfo<>(lsnr); + boolean registerListener(UUID lsnrId, GridCacheContinuousQueryListener<K, V> lsnr, + boolean internal, + boolean entryLsnr) { + ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr); boolean added; @@ -164,25 +326,36 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * Iterates through existing data. * * @param internal Internal flag. + * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @param id Listener ID. */ - void iterate(boolean internal, UUID id) { + void iterate(boolean internal, UUID id, boolean entryLsnr) { ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id); assert info != null; - Set<GridCacheEntry<K, V>> entries; - - if (cctx.isReplicated()) - entries = internal ? cctx.cache().entrySetx() : - cctx.cache().entrySet(); - else - entries = internal ? cctx.cache().primaryEntrySetx() : - cctx.cache().primaryEntrySet(); - - for (GridCacheEntry<K, V> e : entries) { - info.onIterate(new GridCacheContinuousQueryEntry<>(cctx, e, e.getKey(), e.getValue(), null, null, null), - !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)); + if (!entryLsnr) { + Set<GridCacheEntry<K, V>> entries; + + if (cctx.isReplicated()) + entries = internal ? cctx.cache().entrySetx() : + cctx.cache().entrySet(); + else + entries = internal ? cctx.cache().primaryEntrySetx() : + cctx.cache().primaryEntrySet(); + + for (GridCacheEntry<K, V> e : entries) { + GridCacheContinuousQueryEntry<K, V> qryEntry = new GridCacheContinuousQueryEntry<>(cctx, + e, + e.getKey(), + e.getValue(), + null, + null, + null, + false); + + info.onIterate(qryEntry, !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)); + } } info.flushPending(); @@ -198,11 +371,16 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt /** Pending entries. */ private Collection<PendingEntry<K, V>> pending = new LinkedList<>(); + /** */ + private boolean entryLsnr; + /** * @param lsnr Listener. + * @param entryLsnr {@code True} if listener created for {@link CacheEntryListener}. */ - private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr) { + private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr) { this.lsnr = lsnr; + this.entryLsnr = entryLsnr; } /** @@ -247,6 +425,13 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt for (PendingEntry<K, V> e : pending0) lsnr.onEntryUpdate(e.entry, e.recordEvt); } + + /** + * @return {@code True} if listener created for {@link CacheEntryListener}. + */ + boolean entryListener() { + return entryLsnr; + } } /** @@ -268,4 +453,275 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt this.recordEvt = recordEvt; } } + + /** + * + */ + static class EntryListenerFilter<K1, V1> implements + IgnitePredicate<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K1, V1>>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private boolean create; + + /** */ + private boolean update; + + /** */ + private boolean rmv; + + /** */ + private boolean expire; + + /** */ + private Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory; + + /** */ + private CacheEntryEventFilter fltr; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteCache cache; + + /** */ + private String cacheName; + + /** + * + */ + public EntryListenerFilter() { + // No-op. + } + + /** + * @param create {@code True} if listens for create events. + * @param update {@code True} if listens for create events. + * @param rmv {@code True} if listens for remove events. + * @param expire {@code True} if listens for expire events. + * @param fltrFactory Filter factory. + * @param ignite Ignite instance. + * @param cacheName Cache name. + */ + EntryListenerFilter( + boolean create, + boolean update, + boolean rmv, + boolean expire, + Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory, + Ignite ignite, + @Nullable String cacheName) { + this.create = create; + this.update = update; + this.rmv = rmv; + this.expire = expire; + this.fltrFactory = fltrFactory; + this.ignite = ignite; + this.cacheName = cacheName; + + if (fltrFactory != null) + fltr = fltrFactory.create(); + + cache = ignite.jcache(cacheName); + + assert cache != null : cacheName; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean apply(org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K1, V1> entry) { + try { + EventType evtType; + + if (entry.getValue() == null) { + if (((GridCacheContinuousQueryEntry)entry).expired()) { // Expire. + if (!expire) + return false; + + evtType = EXPIRED; + } + else { // Remove. + if (!rmv) + return false; + + evtType = REMOVED; + } + } + else { + if (entry.getOldValue() != null) { // Update. + if (!update) + return false; + + evtType = UPDATED; + } + else { // Create. + if (!create) + return false; + + evtType = CREATED; + } + } + + if (cache == null) { + cache = ignite.jcache(cacheName); + + assert cache != null : cacheName; + } + + return fltr == null || fltr.evaluate(new org.apache.ignite.cache.CacheEntryEvent(cache, evtType, entry)); + } + catch (CacheEntryListenerException e) { + LT.warn(ignite.log(), e, "Cache entry event filter error: " + e); + + return false; + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeBoolean(create); + + out.writeBoolean(update); + + out.writeBoolean(rmv); + + out.writeBoolean(expire); + + U.writeString(out, cacheName); + + out.writeObject(fltrFactory); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + create = in.readBoolean(); + + update = in.readBoolean(); + + rmv = in.readBoolean(); + + expire = in.readBoolean(); + + cacheName = U.readString(in); + + fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super V1>>)in.readObject(); + + if (fltrFactory != null) + fltr = fltrFactory.create(); + } + } + + /** + * + */ + private class EntryListenerCallback implements + IgniteBiPredicate<UUID, Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>>> { + /** */ + private final IgniteCacheProxy<K, V> cache; + + /** */ + private final CacheEntryCreatedListener createLsnr; + + /** */ + private final CacheEntryUpdatedListener updateLsnr; + + /** */ + private final CacheEntryRemovedListener rmvLsnr; + + /** */ + private final CacheEntryExpiredListener expireLsnr; + + /** + * @param cache Cache to be used as event source. + * @param lsnr Listener. + */ + EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener lsnr) { + this.cache = cache; + + createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null; + updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null; + rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null; + expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null; + } + + /** + * @return {@code True} if listens for create event. + */ + boolean create() { + return createLsnr != null; + } + + /** + * @return {@code True} if listens for update event. + */ + boolean update() { + return updateLsnr != null; + } + + /** + * @return {@code True} if listens for remove event. + */ + boolean remove() { + return rmvLsnr != null; + } + + /** + * @return {@code True} if listens for expire event. + */ + boolean expire() { + return expireLsnr != null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean apply(UUID uuid, Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> entries) { + for (org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry entry : entries) { + try { + if (entry.getValue() == null) { // Remove. + if (((GridCacheContinuousQueryEntry)entry).expired()) { // Expire. + assert expireLsnr != null; + + org.apache.ignite.cache.CacheEntryEvent evt0 = + new org.apache.ignite.cache.CacheEntryEvent(cache, EXPIRED, entry); + + expireLsnr.onExpired(Collections.singleton(evt0)); + } + else { + assert rmvLsnr != null; + + org.apache.ignite.cache.CacheEntryEvent evt0 = + new org.apache.ignite.cache.CacheEntryEvent(cache, REMOVED, entry); + + rmvLsnr.onRemoved(Collections.singleton(evt0)); + } + } + else if (entry.getOldValue() != null) { // Update. + assert updateLsnr != null; + + org.apache.ignite.cache.CacheEntryEvent evt0 = + new org.apache.ignite.cache.CacheEntryEvent(cache, UPDATED, entry); + + updateLsnr.onUpdated(Collections.singleton(evt0)); + } + else { // Create. + assert createLsnr != null; + + org.apache.ignite.cache.CacheEntryEvent evt0 = + new org.apache.ignite.cache.CacheEntryEvent(cache, CREATED, entry); + + createLsnr.onCreated(Collections.singleton(evt0)); + } + } + catch (CacheEntryListenerException e) { + LT.warn(log, e, "Cache entry listener error: " + e); + } + } + + return true; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java index b8b5998..d8dfc97 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java @@ -124,13 +124,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { cfgQry.localCallback(new DeploymentListener()); - cfgQry.execute(ctx.grid().forLocal(), true); + cfgQry.execute(ctx.grid().forLocal(), true, false); assignQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery(); assignQry.localCallback(new AssignmentListener()); - assignQry.execute(ctx.grid().forLocal(), true); + assignQry.execute(ctx.grid().forLocal(), true, false); } finally { if (ctx.deploy().enabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 949b5c6..6e3221a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -10,88 +10,197 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; import javax.cache.configuration.*; import javax.cache.event.*; -import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; -import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; -import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import java.util.*; +import java.util.concurrent.*; + +import static javax.cache.event.EventType.*; import static org.gridgain.grid.cache.GridCacheMode.*; /** * */ -public class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest { - @Override protected int gridCount() { - return 3; - } +public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest { + /** */ + private static volatile List<CacheEntryEvent<? extends Integer, ? extends Integer>> evts; - @Override protected GridCacheMode cacheMode() { - return PARTITIONED; - } + /** */ + private static volatile CountDownLatch evtsLatch; - @Override protected GridCacheAtomicityMode atomicityMode() { - return ATOMIC; - } + /** */ + private Integer lastKey = 0; - @Override protected GridCacheDistributionMode distributionMode() { - return PARTITIONED_ONLY; - } + /** */ + private CacheEntryListenerConfiguration lsnrCfg; - @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { - return PRIMARY; - } + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { + GridCacheConfiguration cfg = super.cacheConfiguration(gridName); - @Override - protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setIncludeEventTypes(IgniteEventType.EVT_CACHE_OBJECT_PUT); + if (lsnrCfg != null) + cfg.addCacheEntryListenerConfiguration(lsnrCfg); return cfg; } - @Override - protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { - GridCacheConfiguration ccfg = super.cacheConfiguration(gridName); + /** + * @throws Exception If failed. + */ + public void testEvents() throws Exception { + CacheEntryCreatedListener<Integer, Integer> createLsnr = new CacheEntryCreatedListener<Integer, Integer>() { + @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + onEvent(evt); + } + }; + + CacheEntryUpdatedListener<Integer, Integer> updateLsnr = new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + onEvent(evt); + } + }; + + CacheEntryRemovedListener<Integer, Integer> rmvLsnr = new CacheEntryRemovedListener<Integer, Integer>() { + @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + onEvent(evt); + } + }; + + IgniteCache<Integer, Integer> cache = jcache(); + + Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < 100; i++) + vals.put(i + 1_000_000, i); + + cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries. + + for (Integer key : keys()) { + log.info("Check create event [key=" + key + ']'); + + checkEvents(cache, createLsnr, key, true, false, false); + + log.info("Check update event [key=" + key + ']'); + + checkEvents(cache, updateLsnr, key, false, true, false); + + log.info("Check remove event [key=" + key + ']'); + + checkEvents(cache, rmvLsnr, key, false, false, true); + + log.info("Check create/update events [key=" + key + ']'); + + checkEvents(cache, new CreateUpdateListener(), key, true, true, false); + + log.info("Check create/update/remove events [key=" + key + ']'); + + checkEvents(cache, new CreateUpdateRemoveListener(), key, true, true, true); + } + + CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return new CreateUpdateRemoveListener(); + } + }, + new TestFilterFactory(), + true, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + + log.info("Check filter."); + + checkFilter(cache, vals); + + cache.deregisterCacheEntryListener(lsnrCfg); - //ccfg.setBackups(1); + cache.putAll(vals); - return ccfg; + checkListenerOnStart(vals); } - public void testEvent() throws Exception { - Ignite ignite = ignite(0); + /** + * @param vals Values in cache. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void checkListenerOnStart(Map<Integer, Integer> vals) throws Exception { + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return new CreateUpdateRemoveListener(); + } + }, + null, + true, + false + ); - /* - ignite.events().remoteListen(new IgniteBiPredicate<UUID, IgniteEvent>() { - @Override public boolean apply(UUID uuid, IgniteEvent e) { - IgniteCacheEvent evt0 = (IgniteCacheEvent)e; + Ignite grid = startGrid(gridCount()); - System.out.println("Event: " + uuid + " " + evt0.eventNode() + " " + e); + IgniteCache<Integer, Integer> cache = grid.jcache(null); - return false; - } - }, null, IgniteEventType.EVT_CACHE_OBJECT_PUT); - */ + Integer key = Integer.MAX_VALUE; - IgniteCache<Integer, Integer> cache = jcache(); + log.info("Check create/update/remove events for listener in configuration [key=" + key + ']'); + + checkEvents(cache, lsnrCfg, key, true, true, true); - final CacheEntryCreatedListener<Integer, Integer> lsnr = new CacheEntryCreatedListener<Integer, Integer>() { - @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) throws CacheEntryListenerException { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { - System.out.println("Event: " + evt.getEventType() + " " + evt.getKey() + " " + evt.getOldValue() + " " + evt.getValue()); + stopGrid(gridCount()); + + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return new CreateUpdateRemoveListener(); } - } - }; + }, + new TestFilterFactory(), + true, + false + ); - CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration( - new Factory<CacheEntryListener>() { - @Override public CacheEntryListener create() { + grid = startGrid(gridCount()); + + cache = grid.jcache(null); + + log.info("Check filter for listener in configuration."); + + checkFilter(cache, vals); + + stopGrid(gridCount()); + } + + /** + * @param cache Cache. + * @param lsnr Listener. + * @param key Key. + * @param create {@code True} if listens for create events. + * @param update {@code True} if listens for update events. + * @param rmv {@code True} if listens for remove events. + * @throws Exception If failed. + */ + private void checkEvents( + final IgniteCache<Integer, Integer> cache, + final CacheEntryListener<Integer, Integer> lsnr, + Integer key, + boolean create, + boolean update, + boolean rmv) throws Exception { + CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { return lsnr; } }, @@ -102,9 +211,325 @@ public class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTes cache.registerCacheEntryListener(lsnrCfg); - ignite(1).cache(null).put(1, 1); - ignite(0).cache(null).put(1, 2); + checkEvents(cache, lsnrCfg, key, create, update, rmv); + } + + /** + * @param cache Cache. + * @param vals Values in cache. + * @throws Exception If failed. + */ + private void checkFilter(IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception { + evts = new ArrayList<>(); + + final int expEvts = (vals.size() / 2) * 3; // Remove, create and update for half of modified entries. + + evtsLatch = new CountDownLatch(expEvts); + + cache.removeAll(vals.keySet()); + + cache.putAll(vals); + + Map<Integer, Integer> newVals = new HashMap<>(); + + for (Integer key : vals.keySet()) + newVals.put(key, -1); + + cache.putAll(newVals); + + evtsLatch.await(5000, TimeUnit.MILLISECONDS); + + assertEquals(expEvts, evts.size()); + + Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator(); + + for (Integer key : vals.keySet()) { + if (key % 2 == 0) { + CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next(); + + assertTrue(evt.getKey() % 2 == 0); + assertTrue(vals.keySet().contains(evt.getKey())); + assertEquals(REMOVED, evt.getEventType()); + assertNull(evt.getValue()); + assertEquals(vals.get(evt.getKey()), evt.getOldValue()); + + iter.remove(); + } + } + + for (Integer key : vals.keySet()) { + if (key % 2 == 0) { + CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next(); + + assertTrue(evt.getKey() % 2 == 0); + assertTrue(vals.keySet().contains(evt.getKey())); + assertEquals(CREATED, evt.getEventType()); + assertEquals(vals.get(evt.getKey()), evt.getValue()); + assertNull(evt.getOldValue()); + + iter.remove(); + } + } + + for (Integer key : vals.keySet()) { + if (key % 2 == 0) { + CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next(); + + assertTrue(evt.getKey() % 2 == 0); + assertTrue(vals.keySet().contains(evt.getKey())); + assertEquals(UPDATED, evt.getEventType()); + assertEquals(-1, (int) evt.getValue()); + assertEquals(vals.get(evt.getKey()), evt.getOldValue()); + + iter.remove(); + } + } + } + + /** + * @param cache Cache. + * @param lsnrCfg Listener configuration. + * @param key Key. + * @param create {@code True} if listens for create events. + * @param update {@code True} if listens for update events. + * @param rmv {@code True} if listens for remove events. + * @throws Exception If failed. + */ + private void checkEvents( + final IgniteCache<Integer, Integer> cache, + final CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg, + Integer key, + boolean create, + boolean update, + boolean rmv) throws Exception { + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.registerCacheEntryListener(lsnrCfg); + + return null; + } + }, IllegalArgumentException.class, null); + + final int UPDATES = 10; + + int expEvts = 0; + + if (create) + expEvts += 2; + + if (update) + expEvts += (UPDATES + 1); + + if (rmv) + expEvts += 2; + + evts = new ArrayList<>(); + + evtsLatch = new CountDownLatch(expEvts); + + cache.put(key, 0); + + for (int i = 0; i < UPDATES; i++) + cache.put(key, i + 1); + + assertFalse(cache.putIfAbsent(key, -1)); + + assertFalse(cache.remove(key, -1)); + + assertTrue(cache.remove(key)); + + IgniteCache<Integer, Integer> cache1 = cache; + + if (gridCount() > 1) + cache1 = jcache(1); // Do updates from another node. + + cache1.put(key, 1); + + cache1.put(key, 2); + + assertTrue(cache1.remove(key)); + + evtsLatch.await(5000, TimeUnit.MILLISECONDS); + + assertEquals(expEvts, evts.size()); + + Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator(); + + if (create) + checkEvent(iter, key, CREATED, 0, null); + + if (update) { + for (int i = 0; i < UPDATES; i++) + checkEvent(iter, key, UPDATED, i + 1, i); + } + + if (rmv) + checkEvent(iter, key, REMOVED, null, UPDATES); + + if (create) + checkEvent(iter, key, CREATED, 1, null); + + if (update) + checkEvent(iter, key, UPDATED, 2, 1); + + if (rmv) + checkEvent(iter, key, REMOVED, null, 2); + + assertEquals(0, evts.size()); + + log.info("Remove listener. "); + + cache.deregisterCacheEntryListener(lsnrCfg); + + cache.put(key, 1); + + cache.put(key, 2); + + assertTrue(cache.remove(key)); + + U.sleep(500); // Sleep some time to ensure listener was really removed. + + assertEquals(0, evts.size()); + + cache.registerCacheEntryListener(lsnrCfg); + + cache.deregisterCacheEntryListener(lsnrCfg); + } + + /** + * @param iter Received events iterator. + * @param expKey Expected key. + * @param expType Expected type. + * @param expVal Expected value. + * @param expOld Expected old value. + */ + private void checkEvent(Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter, + Integer expKey, + EventType expType, + @Nullable Integer expVal, + @Nullable Integer expOld) { + assertTrue(iter.hasNext()); + + CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next(); + + iter.remove(); + + assertTrue(evt.getSource() instanceof IgniteCacheProxy); + + assertEquals(expKey, evt.getKey()); + + assertEquals(expType, evt.getEventType()); + + assertEquals(expVal, evt.getValue()); + + assertEquals(expOld, evt.getOldValue()); + + if (expOld == null) + assertFalse(evt.isOldValueAvailable()); + else + assertTrue(evt.isOldValueAvailable()); + } + + /** + * @return Test keys. + * @throws Exception If failed. + */ + protected Collection<Integer> keys() throws Exception { + GridCache<Integer, Object> cache = cache(0); + + ArrayList<Integer> keys = new ArrayList<>(); + + keys.add(primaryKeys(cache, 1, lastKey).get(0)); + + if (gridCount() > 1) { + keys.add(backupKeys(cache, 1, lastKey).get(0)); + + if (cache.configuration().getCacheMode() != REPLICATED) + keys.add(nearKeys(cache, 1, lastKey).get(0)); + } + + lastKey = Collections.max(keys) + 1; + + return keys; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + evts = null; + + evtsLatch = null; + } + + /** + * @param evt Event. + */ + private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + //System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']'); + + assert evt != null; + assert evt.getSource() != null : evt; + assert evt.getEventType() != null : evt; + assert evt.getKey() != null : evt; + + evts.add(evt); + + evtsLatch.countDown(); + } + + /** + * + */ + static class TestFilterFactory implements Factory<CacheEntryEventFilter<Integer, Integer>> { + /** {@inheritDoc} */ + @Override public CacheEntryEventFilter<Integer, Integer> create() { + return new TestFilter(); + } + } + + /** + * + */ + static class TestFilter implements CacheEntryEventFilter<Integer, Integer> { + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + assert evt != null; + assert evt.getSource() != null : evt; + assert evt.getEventType() != null : evt; + assert evt.getKey() != null : evt; + + return evt.getKey() % 2 == 0; + } + } + + /** + * + */ + static class CreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>, + CacheEntryUpdatedListener<Integer, Integer> { + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + onEvent(evt); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + onEvent(evt); + } + } - Thread.sleep(2000); + /** + * + */ + static class CreateUpdateRemoveListener extends CreateUpdateListener + implements CacheEntryRemovedListener<Integer, Integer> { + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + onEvent(evt); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicLocalTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicLocalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicLocalTest.java new file mode 100644 index 0000000..5c7ec68 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicLocalTest.java @@ -0,0 +1,41 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheEntryListenerAtomicLocalTest extends IgniteCacheEntryListenerAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +}