Repository: incubator-ignite Updated Branches: refs/heads/ignite-143 bab9e351f -> d0c88b79f
IGNITE-143 - Fixing tests Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/83cfcc12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/83cfcc12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/83cfcc12 Branch: refs/heads/ignite-143 Commit: 83cfcc127a4074dfda48ea840d09b22840c74f7a Parents: bab9e35 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu Feb 12 16:50:59 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu Feb 12 16:50:59 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 37 ++------ .../continuous/CacheContinuousQueryEntry.java | 14 --- .../continuous/CacheContinuousQueryManager.java | 92 +++++++++++++++----- 3 files changed, 77 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/83cfcc12/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index e1906e2..c863d77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1165,11 +1165,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> subjId, null, taskName); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) { - EventType type = old != null || oldBytes != null ? EventType.UPDATED : EventType.CREATED; - - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, type); - } + if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1328,7 +1325,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes, EventType.REMOVED); + cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1637,7 +1634,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, eventType(op)); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2210,8 +2207,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updateMetrics(op, metrics); if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, - eventType(op)); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -3218,8 +3214,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (!skipQryNtf) { if (!preload && (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null, - EventType.CREATED); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null); cctx.dataStructures().onEntryUpdated(key, false); } @@ -4383,26 +4378,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> return cctx.marshaller().unmarshal(res, ldr); } - /** - * @param op Operation. - * @return Event type. - */ - private EventType eventType(GridCacheOperation op) { - switch (op) { - case CREATE: - return EventType.CREATED; - - case UPDATE: - return EventType.UPDATED; - - case DELETE: - return EventType.REMOVED; - - default: - throw new IllegalStateException("Invalid operation: " + op); - } - } - /** {@inheritDoc} */ @Override public boolean equals(Object o) { // Identity comparison left on purpose. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/83cfcc12/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index c8d9fec..72269c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -96,20 +96,6 @@ class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externaliz } /** - * Unmarshals value from bytes if needed. - * - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws IgniteCheckedException In case of error. - */ - void initValue(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - if (newVal == null && newValBytes != null && !newValBytes.isNull()) - newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); - } - - /** * @param marsh Marshaller. * @throws IgniteCheckedException In case of error. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/83cfcc12/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index ac113c9..93fbbe7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.security.*; import org.jdk8.backport.*; +import javax.cache.*; import javax.cache.configuration.*; import javax.cache.event.*; import java.io.*; @@ -120,7 +121,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @throws IgniteCheckedException In case of error. */ public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, GridCacheValueBytes newBytes, - V oldVal, GridCacheValueBytes oldBytes, EventType type) throws IgniteCheckedException { + V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException { assert e != null; assert key != null; @@ -134,20 +135,41 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K if (F.isEmpty(lsnrCol)) return; - oldVal = cctx.unwrapTemporary(oldVal); + boolean hasNewVal = newVal != null || (newBytes != null && !newBytes.isNull()); + boolean hasOldVal = oldVal != null || (oldBytes != null && !oldBytes.isNull()); - CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, newVal, newBytes, oldVal, oldBytes); + assert hasNewVal || hasOldVal; - e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader()); + EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; - CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( - cctx.kernalContext().grid().jcache(cctx.name()), type, e0); + boolean initialized = false; boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); boolean recordIgniteEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { + if (!initialized) { + if (lsnr.oldValueRequired()) { + oldVal = cctx.unwrapTemporary(oldVal); + + if (oldVal == null && oldBytes != null && !oldBytes.isNull()) + oldVal = oldBytes.isPlain() ? (V)oldBytes.get() : cctx.marshaller().<V>unmarshal(oldBytes.get + (), cctx.deploy().globalLoader()); + } + + if (newVal == null && newBytes != null && !newBytes.isNull()) + newVal = newBytes.isPlain() ? (V)newBytes.get() : cctx.marshaller().<V>unmarshal(newBytes.get(), + cctx.deploy().globalLoader()); + } + + CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, newVal, newBytes, + lsnr.oldValueRequired() ? oldVal : null, lsnr.oldValueRequired() ? oldBytes : null); + + CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().publicJCache(cctx.name()), evtType, e0); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + } } /** @@ -155,8 +177,10 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @param key Key. * @param oldVal Old value. * @param oldBytes Old value bytes. + * @throws IgniteCheckedException In case of error. */ - public void onEntryExpired(GridCacheEntryEx<K, V> e, K key, V oldVal, GridCacheValueBytes oldBytes) { + public void onEntryExpired(GridCacheEntryEx<K, V> e, K key, V oldVal, GridCacheValueBytes oldBytes) + throws IgniteCheckedException { assert e != null; assert key != null; @@ -169,16 +193,30 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K return; if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) { - CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, null, null, oldVal, oldBytes); - - CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( - cctx.kernalContext().grid().jcache(cctx.name()), EXPIRED, e0); - boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) + boolean initialized = false; + + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { + if (!initialized) { + if (lsnr.oldValueRequired()) { + oldVal = cctx.unwrapTemporary(oldVal); + + if (oldVal == null && oldBytes != null && !oldBytes.isNull()) + oldVal = oldBytes.isPlain() ? (V)oldBytes.get() : + cctx.marshaller().<V>unmarshal(oldBytes.get(), cctx.deploy().globalLoader()); + } + } + + CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, null, null, + lsnr.oldValueRequired() ? oldVal : null, lsnr.oldValueRequired() ? oldBytes : null); + + CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().publicJCache(cctx.name()), EXPIRED, e0); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + } } } @@ -251,7 +289,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr); if (old != null) - throw new IgniteCheckedException("Listener is already registered for configuration: " + cfg); + throw new IllegalArgumentException("Listener is already registered for configuration: " + cfg); try { lsnr.execute(); @@ -443,11 +481,12 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K if (types == 0) throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces."); - CacheEntryUpdatedListener<K, V> locLsnr = (CacheEntryUpdatedListener<K, V>)new JCacheQueryLocalListener<>( - locLsnrImpl); + CacheEntryUpdatedListener<K, V> locLsnr = (CacheEntryUpdatedListener<K, V>)new JCacheQueryLocalListener( + locLsnrImpl, cctx.kernalContext().cache().publicJCache(cctx.name())); CacheEntryEventFilter<K, V> rmtFilter = (CacheEntryEventFilter<K, V>)new JCacheQueryRemoteFilter<>( - cfg.getCacheEntryEventFilterFactory().create(), types); + cfg.getCacheEntryEventFilterFactory() != null ? cfg.getCacheEntryEventFilterFactory().create() : null, + types); routineId = executeQuery0( locLsnr, @@ -481,20 +520,27 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K */ private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { /** */ - private CacheEntryListener<K, V> impl; + private final CacheEntryListener<K, V> impl; + + /** */ + private final Cache<K, V> cache; /** * @param impl Listener. */ - private JCacheQueryLocalListener(CacheEntryListener<K, V> impl) { + JCacheQueryLocalListener(CacheEntryListener<K, V> impl, Cache<K, V> cache) { assert impl != null; + assert cache != null; this.impl = impl; + this.cache = cache; } /** {@inheritDoc} */ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { for (CacheEntryEvent<? extends K, ? extends V> evt : evts) { + + switch (evt.getEventType()) { case CREATED: assert impl instanceof CacheEntryCreatedListener; @@ -534,11 +580,15 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @param evt Event. * @return Singleton iterable. */ + @SuppressWarnings("unchecked") private Iterable<CacheEntryEvent<? extends K, ? extends V>> singleton( CacheEntryEvent<? extends K, ? extends V> evt) { + assert evt instanceof CacheContinuousQueryEvent; + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(1); - evts.add(evt); + evts.add(new CacheContinuousQueryEvent<>(cache, evt.getEventType(), + ((CacheContinuousQueryEvent<? extends K, ? extends V>)evt).entry())); return evts; }