ignite-43
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/48dd7fbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/48dd7fbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/48dd7fbf Branch: refs/heads/ignite-99 Commit: 48dd7fbf68771633a4db5986a0173eb82cf7ea6e Parents: 311bbdd Author: sboikov <sboi...@gridgain.com> Authored: Tue Jan 13 10:14:20 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jan 13 11:55:54 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 2 +- .../processors/cache/GridCacheProcessor.java | 1 - .../GridCacheDataStructuresManager.java | 3 +- .../GridCacheContinuousQueryAdapter.java | 44 +++---- .../GridCacheContinuousQueryEntry.java | 46 +++++-- .../GridCacheContinuousQueryHandler.java | 52 +++++--- .../GridCacheContinuousQueryHandlerV2.java | 83 ------------ .../GridCacheContinuousQueryListener.java | 3 +- .../GridCacheContinuousQueryManager.java | 126 +++++++++++-------- .../continuous/GridContinuousProcessor.java | 4 +- .../service/GridServiceProcessor.java | 4 +- .../IgniteCacheEntryListenerAbstractTest.java | 118 +++++++++++------ 12 files changed, 250 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index aee3806..b80f729 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -3016,7 +3016,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> drReplicate(drType, val, valBytes, ver); if (!skipQryNtf) { - if (cctx.affinity().primary(cctx.localNode(), key, topVer) || cctx.isReplicated()) { + if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) { cctx.continuousQueries().onEntryUpdate(this, key, val, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index 46755c6..6dfd00a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -43,7 +43,6 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; -import javax.cache.configuration.*; import javax.management.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java index 384329b..c756492 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java @@ -682,7 +682,8 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, true, false, - false); + false, + true); } GridCacheQueueProxy queue = queuesMap.get(header.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java index 06eb2a8..f72f913 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java @@ -213,12 +213,12 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou /** {@inheritDoc} */ @Override public void execute() throws IgniteCheckedException { - execute(null, false, false, false); + execute(null, false, false, false, true); } /** {@inheritDoc} */ @Override public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException { - execute(prj, false, false, false); + execute(prj, false, false, false, true); } /** @@ -228,12 +228,14 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou * @param internal If {@code true} then query notified about internal entries updates. * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. + * @param oldVal {@code True} if old value is required. * @throws IgniteCheckedException If failed. */ public void execute(@Nullable ClusterGroup prj, boolean internal, boolean entryLsnr, - boolean sync) throws IgniteCheckedException { + boolean sync, + boolean oldVal) throws IgniteCheckedException { if (locCb == null) throw new IllegalStateException("Mandatory local callback is not set for the query: " + this); @@ -276,29 +278,19 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou guard.block(); - GridContinuousHandler hnd; - - if (ctx.kernalContext().security().enabled()) { - hnd = new GridCacheContinuousQueryHandlerV2<>(ctx.name(), - topic, - locCb, - rmtFilter, - prjPred, - internal, - entryLsnr, - sync, - ctx.kernalContext().job().currentTaskNameHash()); - } - else { - hnd = new GridCacheContinuousQueryHandler<>(ctx.name(), - topic, - locCb, - rmtFilter, - prjPred, - internal, - entryLsnr, - sync); - } + int taskNameHash = + ctx.kernalContext().security().enabled() ? ctx.kernalContext().job().currentTaskNameHash() : 0; + + GridContinuousHandler hnd = new GridCacheContinuousQueryHandler<>(ctx.name(), + topic, + locCb, + rmtFilter, + prjPred, + internal, + entryLsnr, + sync, + oldVal, + taskNameHash); routineId = ctx.kernalContext().continuous().startRoutine(hnd, bufSize, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java index 3c5265c..4738961 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java @@ -21,6 +21,7 @@ import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -37,10 +38,13 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> /** */ private static final long serialVersionUID = 0L; + /** Event type enum values. */ + private static final EventType[] EVT_TYPE_VALS = EventType.values(); + /** Cache context. */ @SuppressWarnings("TransientFieldNotInitialized") @GridToStringExclude - private final transient GridCacheContext ctx; + private final transient GridCacheContext<K, V> ctx; /** Cache entry. */ @SuppressWarnings("TransientFieldNotInitialized") @@ -78,7 +82,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> private GridDeploymentInfo depInfo; /** */ - private boolean expired; + private EventType evtType; /** * Required by {@link Externalizable}. @@ -88,7 +92,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> impl = null; } - /* + /** * @param ctx Cache context. * @param impl Cache entry. * @param key Key. @@ -96,7 +100,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> * @param newValBytes Value bytes. * @param oldVal Old value. * @param oldValBytes Old value bytes. - * @param expired {@code True} if created for expired entry. + * @param evtType Event type. */ GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx, GridCacheEntry<K, V> impl, @@ -105,10 +109,11 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal, @Nullable GridCacheValueBytes oldValBytes, - boolean expired) { + EventType evtType) { assert ctx != null; assert impl != null; assert key != null; + assert evtType != null; this.ctx = ctx; this.impl = impl; @@ -117,14 +122,35 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> this.newValBytes = newValBytes; this.oldVal = oldVal; this.oldValBytes = oldValBytes; - this.expired = expired; + this.evtType = evtType; + } + + /** + * @return Cache entry. + */ + GridCacheEntry<K, V> entry() { + return impl; + } + + /** + * @return Cache context. + */ + GridCacheContext<K, V> context() { + return ctx; + } + + /** + * @return New value bytes. + */ + GridCacheValueBytes newValueBytes() { + return newValBytes; } /** * @return {@code True} if entry expired. */ - public boolean expired() { - return expired; + public EventType eventType() { + return evtType; } /** @@ -729,7 +755,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> out.writeObject(oldVal); } - out.writeBoolean(expired); + out.writeByte((byte)evtType.ordinal()); } /** {@inheritDoc} */ @@ -755,7 +781,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> oldVal = (V)in.readObject(); } - expired = in.readBoolean(); + evtType = EVT_TYPE_VALS[in.readByte()]; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java index 9162b89..e9b5678 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java @@ -68,6 +68,12 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Synchronous listener flag. */ private boolean sync; + /** {@code True} if old value is required. */ + private boolean oldVal; + + /** Task name hash code. */ + private int taskHash; + /** * Required by {@link Externalizable}. */ @@ -84,6 +90,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param internal If {@code true} then query is notified about internal entries updates. * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. + * @param oldVal {@code True} if old value is required. + * @param taskHash Task name hash code. */ GridCacheContinuousQueryHandler(@Nullable String cacheName, Object topic, @@ -92,9 +100,12 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal, boolean entryLsnr, - boolean sync) { + boolean sync, + boolean oldVal, + int taskHash) { assert topic != null; assert cb != null; + assert !sync || entryLsnr; this.cacheName = cacheName; this.topic = topic; @@ -104,6 +115,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.internal = internal; this.entryLsnr = entryLsnr; this.sync = sync; + this.oldVal = oldVal; + this.taskHash = taskHash; } /** {@inheritDoc} */ @@ -156,9 +169,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } - @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, - boolean recordEvt, - boolean sync) { + @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { boolean notify; GridCacheFlag[] f = cacheContext(ctx).forceLocalRead(); @@ -172,6 +183,17 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } if (notify) { + if (!oldVal && e.getOldValue() != null) { + e = new GridCacheContinuousQueryEntry<>(e.context(), + e.entry(), + e.getKey(), + e.getValue(), + e.newValueBytes(), + null, + null, + e.eventType()); + } + if (loc) { if (!cb.apply(nodeId, F.<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>>asList(e))) @@ -244,21 +266,11 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } @Nullable private String taskName() { - String taskName = null; - - if (ctx.security().enabled()) { - assert GridCacheContinuousQueryHandler.this instanceof GridCacheContinuousQueryHandlerV2; - - int taskHash = ((GridCacheContinuousQueryHandlerV2)GridCacheContinuousQueryHandler.this).taskHash(); - - taskName = ctx.task().resolveTaskName(taskHash); - } - - return taskName; + return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } }; - return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr, sync); + return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr); } /** {@inheritDoc} */ @@ -393,6 +405,10 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { out.writeBoolean(entryLsnr); out.writeBoolean(sync); + + out.writeBoolean(oldVal); + + out.writeInt(taskHash); } /** {@inheritDoc} */ @@ -420,6 +436,10 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { entryLsnr = in.readBoolean(); sync = in.readBoolean(); + + oldVal = in.readBoolean(); + + taskHash = in.readInt(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java deleted file mode 100644 index b4216d5..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java +++ /dev/null @@ -1,83 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache.query.continuous; - -import org.apache.ignite.lang.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry; -import org.jetbrains.annotations.*; - -import javax.cache.event.*; -import java.io.*; -import java.util.*; - -/** - * Continuous query handler extension. - */ -public class GridCacheContinuousQueryHandlerV2<K, V> extends GridCacheContinuousQueryHandler<K, V> { - /** */ - private static final long serialVersionUID = 2180994610452685320L; - - /** Task hash. */ - private int taskHash; - - /** - * For {@link Externalizable}. - */ - public GridCacheContinuousQueryHandlerV2() { - // No-op. - } - - /** - * @param cacheName Cache name. - * @param topic Topic for ordered messages. - * @param cb Local callback. - * @param filter Filter. - * @param prjPred Projection predicate. - * @param internal If {@code true} then query is notified about internal entries updates. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. - * @param taskHash Task hash. - */ - public GridCacheContinuousQueryHandlerV2(@Nullable String cacheName, - Object topic, - IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> cb, - @Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter, - @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, - boolean internal, - boolean entryLsnr, - boolean sync, - int taskHash) { - super(cacheName, topic, cb, filter, prjPred, internal, entryLsnr, sync); - - this.taskHash = taskHash; - } - - /** - * @return Task hash. - */ - public int taskHash() { - return taskHash; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeInt(taskHash); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - taskHash = in.readInt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java index 2707428..dd6b5f9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java @@ -23,7 +23,6 @@ interface GridCacheContinuousQueryListener<K, V> { * * @param e Entry. * @param recordEvt Whether to record event. - * @param sync {@code True} if event is synchronous. */ - public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt, boolean sync); + public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java index 8bcbceb..56c7020 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java @@ -145,6 +145,9 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt oldVal = cctx.unwrapTemporary(oldVal); + EventType evtType = newVal == null ? REMOVED : + ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? UPDATED : CREATED)); + GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( cctx, e.wrap(false), @@ -153,7 +156,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt newBytes, oldVal, oldBytes, - false); + evtType); e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader()); @@ -194,7 +197,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt null, oldVal, oldBytes, - true); + EXPIRED); for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { if (!lsnr.entryListener()) @@ -254,7 +257,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt qry.remoteFilter(fltr); - qry.execute(null, false, true, lsnrCfg.isSynchronous()); + qry.execute(null, false, true, lsnrCfg.isSynchronous(), lsnrCfg.isOldValueRequired()); } catch (IgniteCheckedException e) { lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it. @@ -281,17 +284,13 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * @param lsnr Listener. * @param internal Internal flag. * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. * @return Whether listener was actually registered. */ boolean registerListener(UUID lsnrId, GridCacheContinuousQueryListener<K, V> lsnr, boolean internal, - boolean entryLsnr, - boolean sync) { - assert !sync || entryLsnr; - - ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr, sync); + boolean entryLsnr) { + ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr); boolean added; @@ -349,6 +348,8 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt entries = internal ? cctx.cache().primaryEntrySetx() : cctx.cache().primaryEntrySet(); + boolean evt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + for (GridCacheEntry<K, V> e : entries) { GridCacheContinuousQueryEntry<K, V> qryEntry = new GridCacheContinuousQueryEntry<>(cctx, e, @@ -357,9 +358,9 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt null, null, null, - false); + CREATED); - info.onIterate(qryEntry, !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)); + info.onIterate(qryEntry, evt); } info.flushPending(); @@ -378,18 +379,13 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt /** */ private final boolean entryLsnr; - /** */ - private final boolean sync; - /** * @param lsnr Listener. * @param entryLsnr {@code True} if listener created for {@link CacheEntryListener}. - * @param sync {@code True} if listener is synchronous. */ - private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr, boolean sync) { + private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr) { this.lsnr = lsnr; this.entryLsnr = entryLsnr; - this.sync = sync; if (!entryLsnr) pending = new LinkedList<>(); @@ -411,7 +407,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt } if (notifyLsnr) - lsnr.onEntryUpdate(e, recordEvt, sync); + lsnr.onEntryUpdate(e, recordEvt); } /** @@ -419,7 +415,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * @param recordEvt Whether to record event. */ void onIterate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { - lsnr.onEntryUpdate(e, recordEvt, sync); + lsnr.onEntryUpdate(e, recordEvt); } /** @@ -435,7 +431,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt } for (PendingEntry<K, V> e : pending0) - lsnr.onEntryUpdate(e.entry, e.recordEvt, sync); + lsnr.onEntryUpdate(e.entry, e.recordEvt); } /** @@ -546,44 +542,47 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt @SuppressWarnings("unchecked") @Override public boolean apply(org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K1, V1> entry) { try { - EventType evtType; + EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); - if (entry.getValue() == null) { - if (((GridCacheContinuousQueryEntry)entry).expired()) { // Expire. + switch (evtType) { + case EXPIRED: if (!expire) return false; - evtType = EXPIRED; - } - else { // Remove. + break; + + case REMOVED: if (!rmv) return false; - evtType = REMOVED; - } - } - else { - if (entry.getOldValue() != null) { // Update. - if (!update) - return false; + break; - evtType = UPDATED; - } - else { // Create. + case CREATED: if (!create) return false; - evtType = CREATED; - } + break; + + case UPDATED: + if (!update) + return false; + + break; + + default: + assert false : evtType; } + if (fltr == null) + return true; + if (cache == null) { cache = ignite.jcache(cacheName); assert cache != null : cacheName; } - return fltr == null || fltr.evaluate(new org.apache.ignite.cache.CacheEntryEvent(cache, evtType, entry)); + return fltr.evaluate(new org.apache.ignite.cache.CacheEntryEvent(cache, evtType, entry)); } catch (CacheEntryListenerException e) { LT.warn(ignite.log(), e, "Cache entry event filter error: " + e); @@ -690,42 +689,59 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public boolean apply(UUID uuid, Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> entries) { + @Override public boolean apply(UUID uuid, + Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> entries) { for (org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry entry : entries) { try { - if (entry.getValue() == null) { // Remove. - if (((GridCacheContinuousQueryEntry)entry).expired()) { // Expire. + EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); + + switch (evtType) { + case EXPIRED: { assert expireLsnr != null; org.apache.ignite.cache.CacheEntryEvent evt0 = new org.apache.ignite.cache.CacheEntryEvent(cache, EXPIRED, entry); expireLsnr.onExpired(Collections.singleton(evt0)); + + break; } - else { + + case REMOVED: { assert rmvLsnr != null; org.apache.ignite.cache.CacheEntryEvent evt0 = new org.apache.ignite.cache.CacheEntryEvent(cache, REMOVED, entry); rmvLsnr.onRemoved(Collections.singleton(evt0)); + + break; } - } - else if (entry.getOldValue() != null) { // Update. - assert updateLsnr != null; - org.apache.ignite.cache.CacheEntryEvent evt0 = - new org.apache.ignite.cache.CacheEntryEvent(cache, UPDATED, entry); + case UPDATED: { + assert updateLsnr != null; - updateLsnr.onUpdated(Collections.singleton(evt0)); - } - else { // Create. - assert createLsnr != null; + org.apache.ignite.cache.CacheEntryEvent evt0 = + new org.apache.ignite.cache.CacheEntryEvent(cache, UPDATED, entry); - org.apache.ignite.cache.CacheEntryEvent evt0 = - new org.apache.ignite.cache.CacheEntryEvent(cache, CREATED, entry); + updateLsnr.onUpdated(Collections.singleton(evt0)); + + break; + } + + case CREATED: { + assert createLsnr != null; + + org.apache.ignite.cache.CacheEntryEvent evt0 = + new org.apache.ignite.cache.CacheEntryEvent(cache, CREATED, entry); + + createLsnr.onCreated(Collections.singleton(evt0)); + + break; + } - createLsnr.onCreated(Collections.singleton(evt0)); + default: + assert false : evtType; } } catch (CacheEntryListenerException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java index e7f8619..0315e97 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java @@ -898,7 +898,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { LocalRoutineInfo routine = locInfos.get(routineId); if (routine != null) - routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>) msg.data(), ctx); + routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>)msg.data(), ctx); } finally { if (msg.futureId() != null) { @@ -1726,7 +1726,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * Future for stop routine. + * Synchronous message acknowledgement future. */ private static class SyncMessageAckFuture extends GridFutureAdapter<Object> { /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java index 5d9edc9..8cb3e1d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java @@ -124,13 +124,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { cfgQry.localCallback(new DeploymentListener()); - cfgQry.execute(ctx.grid().forLocal(), true, false, false); + cfgQry.execute(ctx.grid().forLocal(), true, false, false, true); assignQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery(); assignQry.localCallback(new AssignmentListener()); - assignQry.execute(ctx.grid().forLocal(), true, false, false); + assignQry.execute(ctx.grid().forLocal(), true, false, false, true); } finally { if (ctx.deploy().enabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 04867d9..b35e1a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -12,6 +12,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; import org.jetbrains.annotations.*; @@ -27,7 +28,6 @@ import java.util.concurrent.atomic.*; import static java.util.concurrent.TimeUnit.*; import static javax.cache.event.EventType.*; import static org.gridgain.grid.cache.GridCacheMode.*; -import static org.gridgain.grid.cache.GridCachePreloadMode.*; /** * @@ -46,7 +46,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb private Integer lastKey = 0; /** */ - private CacheEntryListenerConfiguration lsnrCfg; + private CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg; /** {@inheritDoc} */ @SuppressWarnings("unchecked") @@ -58,14 +58,43 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cfg.setEagerTtl(eagerTtl()); - cfg.setPreloadMode(SYNC); - return cfg; } /** * @throws Exception If failed. */ + public void testNoOldValue() throws Exception { + CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return new CreateUpdateRemoveExpireListener(); + } + }, + null, + false, + true + ); + + IgniteCache<Integer, Integer> cache = jcache(); + + try { + for (Integer key : keys()) { + log.info("Check create/update/remove/expire events, no old value [key=" + key + ']'); + + cache.registerCacheEntryListener(lsnrCfg); + + checkEvents(cache, lsnrCfg, key, true, true, true, true, false); + } + } + finally { + cache.deregisterCacheEntryListener(lsnrCfg); + } + } + + /** + * @throws Exception If failed. + */ public void testSynchronousEvents() throws Exception { final CacheEntryCreatedListener<Integer, Integer> lsnr = new CreateUpdateRemoveExpireListener() { @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { @@ -208,6 +237,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cache.putAll(vals); fut.get(); + + log.info("Update one more time."); + + cache.putAll(vals); } finally { stopGrid(gridCount()); @@ -223,7 +256,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb */ private void syncEvent(Integer key, Integer val, IgniteCache<Integer, Integer> cache, int expEvts) throws Exception { - evts = new ArrayList<>(); + evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>()); evtsLatch = new CountDownLatch(expEvts); @@ -296,7 +329,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb Map<Integer, Integer> vals = new HashMap<>(); for (int i = 0; i < 100; i++) - vals.put(i + 1_000_000, i); + vals.put(i + 2_000_000, i); cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries. @@ -386,7 +419,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb log.info("Check create/update/remove events for listener in configuration [key=" + key + ']'); - checkEvents(cache, lsnrCfg, key, true, true, true, true); + checkEvents(cache, lsnrCfg, key, true, true, true, true, true); } finally { stopGrid(gridCount()); @@ -453,7 +486,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cache.registerCacheEntryListener(lsnrCfg); - checkEvents(cache, lsnrCfg, key, create, update, rmv, expire); + try { + checkEvents(cache, lsnrCfg, key, create, update, rmv, expire, true); + } + finally { + cache.deregisterCacheEntryListener(lsnrCfg); + } } /** @@ -461,8 +499,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @param vals Values in cache. * @throws Exception If failed. */ - private void checkFilter(IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception { - evts = new ArrayList<>(); + private void checkFilter(final IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception { + evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>()); final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for half of modified entries. @@ -472,19 +510,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cache.putAll(vals); - Map<Integer, Integer> newVals = new HashMap<>(); + final Map<Integer, Integer> newVals = new HashMap<>(); for (Integer key : vals.keySet()) newVals.put(key, -1); - cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 100))).putAll(newVals); + cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500))).putAll(newVals); - U.sleep(200); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (Integer key : newVals.keySet()) { + if (primaryCache(key, cache.getName()).get(key) != null) + return false; + } - if (!eagerTtl()) { // Provoke expire events if eager ttl is disabled. - for (Integer key : newVals.keySet()) - assertNull(primaryCache(key, cache.getName()).get(key)); - } + return true; + } + }, 5000); evtsLatch.await(5000, MILLISECONDS); @@ -568,6 +610,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @param update {@code True} if listens for update events. * @param rmv {@code True} if listens for remove events. * @param expire {@code True} if listens for expire events. + * @param oldVal {@code True} if old value should be provided for event. * @throws Exception If failed. */ private void checkEvents( @@ -577,7 +620,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb boolean create, boolean update, boolean rmv, - boolean expire) throws Exception { + boolean expire, + boolean oldVal) throws Exception { GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { cache.registerCacheEntryListener(lsnrCfg); @@ -602,7 +646,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb if (expire) expEvts += 2; - evts = new ArrayList<>(); + evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>()); evtsLatch = new CountDownLatch(expEvts); @@ -659,32 +703,32 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb if (update) { for (int i = 0; i < UPDATES; i++) - checkEvent(iter, key, UPDATED, i + 1, i); + checkEvent(iter, key, UPDATED, i + 1, oldVal ? i : null); } if (rmv) - checkEvent(iter, key, REMOVED, null, UPDATES); + checkEvent(iter, key, REMOVED, null, oldVal ? UPDATES : null); if (create) checkEvent(iter, key, CREATED, 10, null); if (expire) - checkEvent(iter, key, EXPIRED, null, 10); + checkEvent(iter, key, EXPIRED, null, oldVal ? 10 : null); if (create) checkEvent(iter, key, CREATED, 1, null); if (update) - checkEvent(iter, key, UPDATED, 2, 1); + checkEvent(iter, key, UPDATED, 2, oldVal ? 1 : null); if (rmv) - checkEvent(iter, key, REMOVED, null, 2); + checkEvent(iter, key, REMOVED, null, oldVal ? 2 : null); if (create) checkEvent(iter, key, CREATED, 20, null); if (expire) - checkEvent(iter, key, EXPIRED, null, 20); + checkEvent(iter, key, EXPIRED, null, oldVal ? 20 : null); assertEquals(0, evts.size()); @@ -786,10 +830,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { // System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']'); - assert evt != null; - assert evt.getSource() != null : evt; - assert evt.getEventType() != null : evt; - assert evt.getKey() != null : evt; + assertNotNull(evt); + assertNotNull(evt.getSource()); + assertNotNull(evt.getEventType()); + assertNotNull(evt.getKey()); evts.add(evt); @@ -847,20 +891,20 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** {@inheritDoc} */ @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { - assert evt != null; - assert evt.getSource() != null : evt; - assert evt.getEventType() != null : evt; - assert evt.getKey() != null : evt; + assertNotNull(evt); + assertNotNull(evt.getSource()); + assertNotNull(evt.getEventType()); + assertNotNull(evt.getKey()); } } /** {@inheritDoc} */ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { - assert evt != null; - assert evt.getSource() != null : evt; - assert evt.getEventType() != null : evt; - assert evt.getKey() != null : evt; + assertNotNull(evt); + assertNotNull(evt.getSource()); + assertNotNull(evt.getEventType()); + assertNotNull(evt.getKey()); } } }