ignite-43
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/13740b70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/13740b70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/13740b70 Branch: refs/heads/ignite-53 Commit: 13740b7095469e54b1748dd4914283a0b2a2304b Parents: f9f0231 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jan 12 11:45:06 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jan 12 17:21:01 2015 +0300 ---------------------------------------------------------------------- .../grid/kernal/GridEventConsumeHandler.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 21 +- .../GridCacheDataStructuresManager.java | 5 +- .../GridCacheContinuousQueryAdapter.java | 14 +- .../GridCacheContinuousQueryHandler.java | 23 +- .../GridCacheContinuousQueryHandlerV2.java | 4 +- .../GridCacheContinuousQueryListener.java | 3 +- .../GridCacheContinuousQueryManager.java | 110 +++-- .../continuous/GridContinuousMessage.java | 52 +- .../continuous/GridContinuousMessageType.java | 5 +- .../continuous/GridContinuousProcessor.java | 181 ++++++- .../service/GridServiceProcessor.java | 4 +- .../IgniteCacheEntryListenerAbstractTest.java | 482 ++++++++++++++++--- ...eCacheEntryListenerEagerTtlDisabledTest.java | 22 + .../junits/common/GridCommonAbstractTest.java | 13 + .../bamboo/GridDataGridTestSuite.java | 1 + 16 files changed, 772 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java index a48f6e1..778b353 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java @@ -142,7 +142,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { } } - ctx.continuous().addNotification(nodeId, routineId, wrapper, null); + ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false); } catch (IgniteCheckedException e) { U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index b19e44f..aee3806 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -3342,12 +3342,23 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> obsolete = true; // Success, will return "true". } - if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) - cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, EVT_CACHE_OBJECT_EXPIRED, - null, false, expiredVal, expiredVal != null || hasOldBytes, null, null, null); - } - + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { + cctx.events().addEvent(partition(), + key, + cctx.localNodeId(), + null, + EVT_CACHE_OBJECT_EXPIRED, + null, + false, + expiredVal, + expiredVal != null || hasOldBytes, + null, + null, + null); + } + cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null); + } } } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java index db072d6..384329b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java @@ -679,7 +679,10 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager } }); - queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, true, false); + queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, + true, + false, + false); } GridCacheQueueProxy queue = queuesMap.get(header.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java index c00c961..06eb2a8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java @@ -213,12 +213,12 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou /** {@inheritDoc} */ @Override public void execute() throws IgniteCheckedException { - execute(null, false, false); + execute(null, false, false, false); } /** {@inheritDoc} */ @Override public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException { - execute(prj, false, false); + execute(prj, false, false, false); } /** @@ -227,9 +227,13 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou * @param prj Grid projection. * @param internal If {@code true} then query notified about internal entries updates. * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. + * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. * @throws IgniteCheckedException If failed. */ - public void execute(@Nullable ClusterGroup prj, boolean internal, boolean entryLsnr) throws IgniteCheckedException { + public void execute(@Nullable ClusterGroup prj, + boolean internal, + boolean entryLsnr, + boolean sync) throws IgniteCheckedException { if (locCb == null) throw new IllegalStateException("Mandatory local callback is not set for the query: " + this); @@ -282,6 +286,7 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou prjPred, internal, entryLsnr, + sync, ctx.kernalContext().job().currentTaskNameHash()); } else { @@ -291,7 +296,8 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou rmtFilter, prjPred, internal, - entryLsnr); + entryLsnr, + sync); } routineId = ctx.kernalContext().continuous().startRoutine(hnd, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java index e61b2a2..9162b89 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java @@ -65,6 +65,9 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Entry listener flag. */ private boolean entryLsnr; + /** Synchronous listener flag. */ + private boolean sync; + /** * Required by {@link Externalizable}. */ @@ -80,6 +83,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param prjPred Projection predicate. * @param internal If {@code true} then query is notified about internal entries updates. * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. + * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. */ GridCacheContinuousQueryHandler(@Nullable String cacheName, Object topic, @@ -87,7 +91,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { @Nullable IgnitePredicate<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> filter, @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal, - boolean entryLsnr) { + boolean entryLsnr, + boolean sync) { assert topic != null; assert cb != null; @@ -98,6 +103,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.prjPred = prjPred; this.internal = internal; this.entryLsnr = entryLsnr; + this.sync = sync; } /** {@inheritDoc} */ @@ -150,7 +156,9 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } - @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { + @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, + boolean recordEvt, + boolean sync) { boolean notify; GridCacheFlag[] f = cacheContext(ctx).forceLocalRead(); @@ -185,7 +193,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { depMgr.prepare(e); } - ctx.continuous().addNotification(nodeId, routineId, e, topic); + ctx.continuous().addNotification(nodeId, routineId, e, topic, sync); } catch (IgniteCheckedException ex) { U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); @@ -250,12 +258,13 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } }; - return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr); + return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr, sync); } /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - manager(ctx).iterate(internal, routineId, entryLsnr); + if (!entryLsnr) + manager(ctx).iterate(internal, routineId); } /** {@inheritDoc} */ @@ -382,6 +391,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { out.writeBoolean(internal); out.writeBoolean(entryLsnr); + + out.writeBoolean(sync); } /** {@inheritDoc} */ @@ -407,6 +418,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { internal = in.readBoolean(); entryLsnr = in.readBoolean(); + + sync = in.readBoolean(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java index 63209ec..b4216d5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java @@ -43,6 +43,7 @@ public class GridCacheContinuousQueryHandlerV2<K, V> extends GridCacheContinuous * @param prjPred Projection predicate. * @param internal If {@code true} then query is notified about internal entries updates. * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. + * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. * @param taskHash Task hash. */ public GridCacheContinuousQueryHandlerV2(@Nullable String cacheName, @@ -52,8 +53,9 @@ public class GridCacheContinuousQueryHandlerV2<K, V> extends GridCacheContinuous @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal, boolean entryLsnr, + boolean sync, int taskHash) { - super(cacheName, topic, cb, filter, prjPred, internal, entryLsnr); + super(cacheName, topic, cb, filter, prjPred, internal, entryLsnr, sync); this.taskHash = taskHash; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java index dd6b5f9..2707428 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java @@ -23,6 +23,7 @@ interface GridCacheContinuousQueryListener<K, V> { * * @param e Entry. * @param recordEvt Whether to record event. + * @param sync {@code True} if event is synchronous. */ - public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt); + public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt, boolean sync); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java index 8c1b70e..8bcbceb 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java @@ -185,21 +185,23 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt if (F.isEmpty(lsnrCol)) return; - GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( - cctx, - e.wrap(false), - key, - null, - null, - oldVal, - oldBytes, - true); - - for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { - if (!lsnr.entryListener()) - continue; - - lsnr.onEntryUpdate(e0, false); + if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) { + GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( + cctx, + e.wrap(false), + key, + null, + null, + oldVal, + oldBytes, + true); + + for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { + if (!lsnr.entryListener()) + continue; + + lsnr.onEntryUpdate(e0, false); + } } } @@ -252,7 +254,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt qry.remoteFilter(fltr); - qry.execute(null, false, true); + qry.execute(null, false, true, lsnrCfg.isSynchronous()); } catch (IgniteCheckedException e) { lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it. @@ -279,12 +281,17 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * @param lsnr Listener. * @param internal Internal flag. * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. + * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. * @return Whether listener was actually registered. */ - boolean registerListener(UUID lsnrId, GridCacheContinuousQueryListener<K, V> lsnr, + boolean registerListener(UUID lsnrId, + GridCacheContinuousQueryListener<K, V> lsnr, boolean internal, - boolean entryLsnr) { - ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr); + boolean entryLsnr, + boolean sync) { + assert !sync || entryLsnr; + + ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr, sync); boolean added; @@ -326,36 +333,33 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * Iterates through existing data. * * @param internal Internal flag. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @param id Listener ID. */ - void iterate(boolean internal, UUID id, boolean entryLsnr) { + void iterate(boolean internal, UUID id) { ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id); assert info != null; - if (!entryLsnr) { - Set<GridCacheEntry<K, V>> entries; - - if (cctx.isReplicated()) - entries = internal ? cctx.cache().entrySetx() : - cctx.cache().entrySet(); - else - entries = internal ? cctx.cache().primaryEntrySetx() : - cctx.cache().primaryEntrySet(); - - for (GridCacheEntry<K, V> e : entries) { - GridCacheContinuousQueryEntry<K, V> qryEntry = new GridCacheContinuousQueryEntry<>(cctx, - e, - e.getKey(), - e.getValue(), - null, - null, - null, - false); - - info.onIterate(qryEntry, !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)); - } + Set<GridCacheEntry<K, V>> entries; + + if (cctx.isReplicated()) + entries = internal ? cctx.cache().entrySetx() : + cctx.cache().entrySet(); + else + entries = internal ? cctx.cache().primaryEntrySetx() : + cctx.cache().primaryEntrySet(); + + for (GridCacheEntry<K, V> e : entries) { + GridCacheContinuousQueryEntry<K, V> qryEntry = new GridCacheContinuousQueryEntry<>(cctx, + e, + e.getKey(), + e.getValue(), + null, + null, + null, + false); + + info.onIterate(qryEntry, !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)); } info.flushPending(); @@ -369,18 +373,26 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt private final GridCacheContinuousQueryListener<K, V> lsnr; /** Pending entries. */ - private Collection<PendingEntry<K, V>> pending = new LinkedList<>(); + private Collection<PendingEntry<K, V>> pending; + + /** */ + private final boolean entryLsnr; /** */ - private boolean entryLsnr; + private final boolean sync; /** * @param lsnr Listener. * @param entryLsnr {@code True} if listener created for {@link CacheEntryListener}. + * @param sync {@code True} if listener is synchronous. */ - private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr) { + private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr, boolean sync) { this.lsnr = lsnr; this.entryLsnr = entryLsnr; + this.sync = sync; + + if (!entryLsnr) + pending = new LinkedList<>(); } /** @@ -399,7 +411,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt } if (notifyLsnr) - lsnr.onEntryUpdate(e, recordEvt); + lsnr.onEntryUpdate(e, recordEvt, sync); } /** @@ -407,7 +419,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * @param recordEvt Whether to record event. */ void onIterate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { - lsnr.onEntryUpdate(e, recordEvt); + lsnr.onEntryUpdate(e, recordEvt, sync); } /** @@ -423,7 +435,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt } for (PendingEntry<K, V> e : pending0) - lsnr.onEntryUpdate(e.entry, e.recordEvt); + lsnr.onEntryUpdate(e.entry, e.recordEvt, sync); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java index 09b7171..27a6429 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.continuous; +import org.apache.ignite.lang.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.util.direct.*; import org.gridgain.grid.util.typedef.internal.*; @@ -19,6 +20,8 @@ import java.io.*; import java.nio.*; import java.util.*; +import static org.gridgain.grid.kernal.processors.continuous.GridContinuousMessageType.*; + /** * Continuous processor message. */ @@ -40,6 +43,9 @@ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { /** Serialized message data. */ private byte[] dataBytes; + /** Future ID for synchronous event notifications. */ + private IgniteUuid futId; + /** * Required by {@link Externalizable}. */ @@ -50,14 +56,19 @@ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { /** * @param type Message type. * @param routineId Consume ID. + * @param futId Future ID. * @param data Optional message data. */ - GridContinuousMessage(GridContinuousMessageType type, UUID routineId, @Nullable Object data) { + GridContinuousMessage(GridContinuousMessageType type, + @Nullable UUID routineId, + @Nullable IgniteUuid futId, + @Nullable Object data) { assert type != null; - assert routineId != null; + assert routineId != null || type == MSG_EVT_ACK; this.type = type; this.routineId = routineId; + this.futId = futId; this.data = data; } @@ -78,6 +89,7 @@ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { /** * @return Message data. */ + @SuppressWarnings("unchecked") public <T> T data() { return (T)data; } @@ -103,14 +115,21 @@ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { this.dataBytes = dataBytes; } + /** + * @return Future ID for synchronous event notification. + */ + @Nullable public IgniteUuid futureId() { + return futId; + } + /** {@inheritDoc} */ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) @Override public GridTcpCommunicationMessageAdapter clone() { - GridContinuousMessage clone = new GridContinuousMessage(); + GridContinuousMessage _clone = new GridContinuousMessage(); - clone0(clone); + clone0(_clone); - return clone; + return _clone; } /** {@inheritDoc} */ @@ -119,6 +138,7 @@ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { clone.type = type; clone.routineId = routineId; + clone.futId = futId; clone.data = data; clone.dataBytes = dataBytes; } @@ -143,12 +163,18 @@ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { commState.idx++; case 1: - if (!commState.putUuid(routineId)) + if (!commState.putGridUuid(futId)) return false; commState.idx++; case 2: + if (!commState.putUuid(routineId)) + return false; + + commState.idx++; + + case 3: if (!commState.putEnum(type)) return false; @@ -176,6 +202,16 @@ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { commState.idx++; case 1: + IgniteUuid futId0 = commState.getGridUuid(); + + if (futId0 == GRID_UUID_NOT_READ) + return false; + + futId = futId0; + + commState.idx++; + + case 2: UUID routineId0 = commState.getUuid(); if (routineId0 == UUID_NOT_READ) @@ -185,13 +221,13 @@ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { commState.idx++; - case 2: + case 3: if (buf.remaining() < 1) return false; byte type0 = commState.getByte(); - type = GridContinuousMessageType.fromOrdinal(type0); + type = fromOrdinal(type0); commState.idx++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java index 4ebc33a..b07a6e8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java @@ -28,7 +28,10 @@ enum GridContinuousMessageType { MSG_STOP_ACK, /** Remote event notification. */ - MSG_EVT_NOTIFICATION; + MSG_EVT_NOTIFICATION, + + /** Event notification acknowledgement for synchronous events. */ + MSG_EVT_ACK; /** Enumerated values. */ private static final GridContinuousMessageType[] VALS = values(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java index 14d89f3..e7f8619 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java @@ -69,6 +69,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Pending start requests. */ private final Map<UUID, Collection<GridContinuousMessage>> pending = new HashMap<>(); + /** */ + private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>(); + /** Stopped IDs. */ private final Collection<UUID> stopped = new HashSet<>(); @@ -206,6 +209,21 @@ public class GridContinuousProcessor extends GridProcessorAdapter { unregisterRemote(routineId); } + for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { + SyncMessageAckFuture fut = e.getValue(); + + if (fut.nodeId().equals(nodeId)) { + SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); + + if (fut0 != null) { + ClusterTopologyException err = new ClusterTopologyException( + "Node left grid while sending message to: " + nodeId); + + fut0.onDone(err); + } + } + } + break; default: @@ -255,6 +273,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { break; + case MSG_EVT_ACK: + processMessageAck(msg); + + break; + default: assert false : "Unexpected message received: " + msg.type(); } @@ -343,8 +366,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @return Future. */ @SuppressWarnings("TooBroadScope") - public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd, int bufSize, long interval, - boolean autoUnsubscribe, @Nullable IgnitePredicate<ClusterNode> prjPred) { + public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd, + int bufSize, + long interval, + boolean autoUnsubscribe, + @Nullable IgnitePredicate<ClusterNode> prjPred) { assert hnd != null; assert bufSize > 0; assert interval >= 0; @@ -432,7 +458,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // these nodes. for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) { if (nodeIds.add(e.getKey())) - e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, reqData)); + e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData)); } // Register routine locally. @@ -484,7 +510,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Send start requests. try { - GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, reqData); + GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData); sendWithRetries(nodes, req, null); } @@ -590,13 +616,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Register acknowledge timeout (timeout object will be removed when // future is completed). fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId, - new GridContinuousMessage(MSG_STOP_REQ, routineId, null))); + new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null))); // Send stop requests. try { for (ClusterNode node : nodes) { try { - sendWithRetries(node.id(), new GridContinuousMessage(MSG_STOP_REQ, routineId, null), null); + sendWithRetries(node.id(), + new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null), + null); } catch (ClusterTopologyException ignored) { U.warn(log, "Failed to send stop request (node left topology): " + node.id()); @@ -624,11 +652,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param nodeId ID of the node that started routine. * @param routineId Routine ID. * @param obj Notification object. - * @param orderedTopic Topic for ordered notifications. - * If {@code null}, non-ordered message will be sent. + * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. + * @param sync If {@code true} then waits for event acknowledgment. * @throws IgniteCheckedException In case of error. */ - public void addNotification(UUID nodeId, UUID routineId, @Nullable Object obj, @Nullable Object orderedTopic) + public void addNotification(UUID nodeId, + UUID routineId, + @Nullable Object obj, + @Nullable Object orderedTopic, + boolean sync) throws IgniteCheckedException { assert nodeId != null; assert routineId != null; @@ -638,29 +670,55 @@ public class GridContinuousProcessor extends GridProcessorAdapter { RemoteRoutineInfo info = rmtInfos.get(routineId); if (info != null) { - Collection<Object> toSnd = info.add(obj); + assert info.interval == 0 || !sync; + + if (sync) { + SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, nodeId); + + IgniteUuid futId = IgniteUuid.randomUuid(); + + syncMsgFuts.put(futId, fut); + + try { + sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic); + } + catch (IgniteCheckedException e) { + syncMsgFuts.remove(futId); - if (toSnd != null) - sendNotification(nodeId, routineId, toSnd, orderedTopic); + throw e; + } + + fut.get(ackTimeout); + } + else { + Collection<Object> toSnd = info.add(obj); + + if (toSnd != null) + sendNotification(nodeId, routineId, null, toSnd, orderedTopic); + } } } /** * @param nodeId Node ID. * @param routineId Routine ID. + * @param futId Future ID. * @param toSnd Notification object to send. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. * @throws IgniteCheckedException In case of error. */ - private void sendNotification(UUID nodeId, UUID routineId, Collection<Object> toSnd, + private void sendNotification(UUID nodeId, + UUID routineId, + @Nullable IgniteUuid futId, + Collection<Object> toSnd, @Nullable Object orderedTopic) throws IgniteCheckedException { assert nodeId != null; assert routineId != null; assert toSnd != null; assert !toSnd.isEmpty(); - sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, toSnd), orderedTopic); + sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd), orderedTopic); } /** @@ -722,7 +780,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } try { - sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, err), null); + sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err), null); } catch (ClusterTopologyException ignored) { if (log.isDebugEnabled()) @@ -783,7 +841,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { unregisterRemote(routineId); try { - sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null), null); + sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null), null); } catch (ClusterTopologyException ignored) { if (log.isDebugEnabled()) @@ -815,19 +873,45 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param msg Message. + */ + private void processMessageAck(GridContinuousMessage msg) { + assert msg.futureId() != null; + + SyncMessageAckFuture fut = syncMsgFuts.remove(msg.futureId()); + + if (fut != null) + fut.onDone(); + } + + /** * @param nodeId Sender ID. - * @param ntf Notification. + * @param msg Message. */ - private void processNotification(UUID nodeId, GridContinuousMessage ntf) { + private void processNotification(UUID nodeId, GridContinuousMessage msg) { assert nodeId != null; - assert ntf != null; + assert msg != null; - UUID routineId = ntf.routineId(); + UUID routineId = msg.routineId(); - LocalRoutineInfo routine = locInfos.get(routineId); + try { + LocalRoutineInfo routine = locInfos.get(routineId); - if (routine != null) - routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>)ntf.data(), ctx); + if (routine != null) + routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>) msg.data(), ctx); + } + finally { + if (msg.futureId() != null) { + try { + sendWithRetries(nodeId, + new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null), + null); + } + catch (IgniteCheckedException e) { + log.error("Failed to send event acknowledgment to node: " + nodeId, e); + } + } + } } /** @@ -871,8 +955,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @return Whether listener was actually registered. * @throws IgniteCheckedException In case of error. */ - private boolean registerHandler(final UUID nodeId, final UUID routineId, final GridContinuousHandler hnd, - int bufSize, final long interval, boolean autoUnsubscribe, boolean loc) throws IgniteCheckedException { + private boolean registerHandler(final UUID nodeId, + final UUID routineId, + final GridContinuousHandler hnd, + int bufSize, + final long interval, + boolean autoUnsubscribe, + boolean loc) throws IgniteCheckedException { assert nodeId != null; assert routineId != null; assert hnd != null; @@ -915,7 +1004,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (toSnd != null) { try { - sendNotification(nodeId, routineId, toSnd, hnd.orderedTopic()); + sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic()); } catch (ClusterTopologyException ignored) { if (log.isDebugEnabled()) @@ -1637,6 +1726,46 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * Future for stop routine. + */ + private static class SyncMessageAckFuture extends GridFutureAdapter<Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID nodeId; + + /** + * Required by {@link Externalizable}. + */ + public SyncMessageAckFuture() { + // No-op. + } + + /** + * @param ctx Kernal context. + * @param nodeId Master node ID. + */ + SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) { + super(ctx); + + this.nodeId = nodeId; + } + + /** + * @return Master node ID. + */ + UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SyncMessageAckFuture.class, this); + } + } + + /** * Timeout object for stop process. */ private class StopTimeoutObject extends GridTimeoutObjectAdapter { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java index d8dfc97..5d9edc9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java @@ -124,13 +124,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { cfgQry.localCallback(new DeploymentListener()); - cfgQry.execute(ctx.grid().forLocal(), true, false); + cfgQry.execute(ctx.grid().forLocal(), true, false, false); assignQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery(); assignQry.localCallback(new AssignmentListener()); - assignQry.execute(ctx.grid().forLocal(), true, false); + assignQry.execute(ctx.grid().forLocal(), true, false, false); } finally { if (ctx.deploy().enabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 6e3221a..04867d9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -10,6 +10,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; @@ -17,12 +18,16 @@ import org.jetbrains.annotations.*; import javax.cache.configuration.*; import javax.cache.event.*; +import javax.cache.expiry.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import static java.util.concurrent.TimeUnit.*; import static javax.cache.event.EventType.*; import static org.gridgain.grid.cache.GridCacheMode.*; +import static org.gridgain.grid.cache.GridCachePreloadMode.*; /** * @@ -35,6 +40,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb private static volatile CountDownLatch evtsLatch; /** */ + private static volatile CountDownLatch syncEvtLatch; + + /** */ private Integer lastKey = 0; /** */ @@ -48,12 +56,212 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb if (lsnrCfg != null) cfg.addCacheEntryListenerConfiguration(lsnrCfg); + cfg.setEagerTtl(eagerTtl()); + + cfg.setPreloadMode(SYNC); + return cfg; } /** * @throws Exception If failed. */ + public void testSynchronousEvents() throws Exception { + final CacheEntryCreatedListener<Integer, Integer> lsnr = new CreateUpdateRemoveExpireListener() { + @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + super.onRemoved(evts); + + awaitLatch(); + } + + @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + super.onCreated(evts); + + awaitLatch(); + } + + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + super.onUpdated(evts); + + awaitLatch(); + } + + private void awaitLatch() { + try { + assertTrue(syncEvtLatch.await(5000, MILLISECONDS)); + } + catch (InterruptedException e) { + fail("Unexpected exception: " + e); + } + } + }; + + CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return lsnr; + } + }, + null, + true, + true + ); + + IgniteCache<Integer, Integer> cache = jcache(); + + cache.registerCacheEntryListener(lsnrCfg); + + try { + for (Integer key : keys()) { + log.info("Check synchronous create event [key=" + key + ']'); + + syncEvent(key, 1, cache, 1); + + checkEvent(evts.iterator(), key, CREATED, 1, null); + + log.info("Check synchronous update event [key=" + key + ']'); + + syncEvent(key, 2, cache, 1); + + checkEvent(evts.iterator(), key, UPDATED, 2, 1); + + log.info("Check synchronous remove event [key=" + key + ']'); + + syncEvent(key, null, cache, 1); + + checkEvent(evts.iterator(), key, REMOVED, null, 2); + + log.info("Check synchronous expire event [key=" + key + ']'); + + syncEvent(key, + 3, + cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 1000))), + eagerTtl() ? 2 : 1); + + checkEvent(evts.iterator(), key, CREATED, 3, null); + + if (!eagerTtl()) { + U.sleep(1100); + + assertNull(primaryCache(key, cache.getName()).get(key)); + + evtsLatch.await(5000, MILLISECONDS); + + assertEquals(1, evts.size()); + } + + checkEvent(evts.iterator(), key, EXPIRED, null, 3); + + assertEquals(0, evts.size()); + } + } + finally { + cache.deregisterCacheEntryListener(lsnrCfg); + } + } + + /** + * @throws Exception If failed. + */ + public void testSynchronousEventsListenerNodeFailed() throws Exception { + if (cacheMode() != PARTITIONED) + return; + + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return new NoOpCreateUpdateListener(); + } + }, + null, + true, + true + ); + + final Ignite grid = startGrid(gridCount()); + + try { + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache = jcache(0); + + Map<Integer, Integer> vals = new HashMap<>(); + + for (Integer key : nearKeys(grid.cache(null), 100, 1_000_000)) + vals.put(key, 1); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + U.sleep(500); + + stopGrid(grid.name()); + + done.set(true); + + return null; + } + }); + + while (!done.get()) + cache.putAll(vals); + + fut.get(); + } + finally { + stopGrid(gridCount()); + } + } + + /** + * @param key Key. + * @param val Value. + * @param cache Cache. + * @param expEvts Expected events number. + * @throws Exception If failed. + */ + private void syncEvent(Integer key, Integer val, IgniteCache<Integer, Integer> cache, int expEvts) + throws Exception { + evts = new ArrayList<>(); + + evtsLatch = new CountDownLatch(expEvts); + + syncEvtLatch = new CountDownLatch(1); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + assertFalse(done.get()); + + U.sleep(500); + + assertFalse(done.get()); + + syncEvtLatch.countDown(); + + return null; + } + }); + + if (val != null) + cache.put(key, val); + else + cache.remove(key); + + done.set(true); + + fut.get(); + + evtsLatch.await(5000, MILLISECONDS); + + assertEquals(expEvts, evts.size()); + } + + /** + * @throws Exception If failed. + */ public void testEvents() throws Exception { CacheEntryCreatedListener<Integer, Integer> createLsnr = new CacheEntryCreatedListener<Integer, Integer>() { @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { @@ -76,6 +284,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } }; + CacheEntryExpiredListener<Integer, Integer> expireLsnr = new CacheEntryExpiredListener<Integer, Integer>() { + @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + onEvent(evt); + } + }; + IgniteCache<Integer, Integer> cache = jcache(); Map<Integer, Integer> vals = new HashMap<>(); @@ -88,29 +303,33 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb for (Integer key : keys()) { log.info("Check create event [key=" + key + ']'); - checkEvents(cache, createLsnr, key, true, false, false); + checkEvents(cache, createLsnr, key, true, false, false, false); log.info("Check update event [key=" + key + ']'); - checkEvents(cache, updateLsnr, key, false, true, false); + checkEvents(cache, updateLsnr, key, false, true, false, false); log.info("Check remove event [key=" + key + ']'); - checkEvents(cache, rmvLsnr, key, false, false, true); + checkEvents(cache, rmvLsnr, key, false, false, true, false); + + log.info("Check expire event [key=" + key + ']'); + + checkEvents(cache, expireLsnr, key, false, false, false, true); log.info("Check create/update events [key=" + key + ']'); - checkEvents(cache, new CreateUpdateListener(), key, true, true, false); + checkEvents(cache, new CreateUpdateListener(), key, true, true, false, false); - log.info("Check create/update/remove events [key=" + key + ']'); + log.info("Check create/update/remove/expire events [key=" + key + ']'); - checkEvents(cache, new CreateUpdateRemoveListener(), key, true, true, true); + checkEvents(cache, new CreateUpdateRemoveExpireListener(), key, true, true, true, true); } CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( new Factory<CacheEntryListener<Integer, Integer>>() { @Override public CacheEntryListener<Integer, Integer> create() { - return new CreateUpdateRemoveListener(); + return new CreateUpdateRemoveExpireListener(); } }, new TestFilterFactory(), @@ -122,13 +341,21 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb log.info("Check filter."); - checkFilter(cache, vals); - - cache.deregisterCacheEntryListener(lsnrCfg); + try { + checkFilter(cache, vals); + } + finally { + cache.deregisterCacheEntryListener(lsnrCfg); + } cache.putAll(vals); - checkListenerOnStart(vals); + try { + checkListenerOnStart(vals); + } + finally { + cache.removeAll(vals.keySet()); + } } /** @@ -140,7 +367,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb lsnrCfg = new MutableCacheEntryListenerConfiguration<>( new Factory<CacheEntryListener<Integer, Integer>>() { @Override public CacheEntryListener<Integer, Integer> create() { - return new CreateUpdateRemoveListener(); + return new CreateUpdateRemoveExpireListener(); } }, null, @@ -150,20 +377,25 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb Ignite grid = startGrid(gridCount()); - IgniteCache<Integer, Integer> cache = grid.jcache(null); + try { + awaitPartitionMapExchange(); - Integer key = Integer.MAX_VALUE; + IgniteCache<Integer, Integer> cache = grid.jcache(null); - log.info("Check create/update/remove events for listener in configuration [key=" + key + ']'); + Integer key = Integer.MAX_VALUE; - checkEvents(cache, lsnrCfg, key, true, true, true); + log.info("Check create/update/remove events for listener in configuration [key=" + key + ']'); - stopGrid(gridCount()); + checkEvents(cache, lsnrCfg, key, true, true, true, true); + } + finally { + stopGrid(gridCount()); + } lsnrCfg = new MutableCacheEntryListenerConfiguration<>( new Factory<CacheEntryListener<Integer, Integer>>() { @Override public CacheEntryListener<Integer, Integer> create() { - return new CreateUpdateRemoveListener(); + return new CreateUpdateRemoveExpireListener(); } }, new TestFilterFactory(), @@ -173,13 +405,21 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb grid = startGrid(gridCount()); - cache = grid.jcache(null); + try { + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache = grid.jcache(null); - log.info("Check filter for listener in configuration."); + log.info("Check filter for listener in configuration."); - checkFilter(cache, vals); + if (cacheMode() == LOCAL) + cache.putAll(vals); - stopGrid(gridCount()); + checkFilter(cache, vals); + } + finally { + stopGrid(gridCount()); + } } /** @@ -189,6 +429,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @param create {@code True} if listens for create events. * @param update {@code True} if listens for update events. * @param rmv {@code True} if listens for remove events. + * @param expire {@code True} if listens for expire events. * @throws Exception If failed. */ private void checkEvents( @@ -197,7 +438,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb Integer key, boolean create, boolean update, - boolean rmv) throws Exception { + boolean rmv, + boolean expire) throws Exception { CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( new Factory<CacheEntryListener<Integer, Integer>>() { @Override public CacheEntryListener<Integer, Integer> create() { @@ -211,7 +453,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cache.registerCacheEntryListener(lsnrCfg); - checkEvents(cache, lsnrCfg, key, create, update, rmv); + checkEvents(cache, lsnrCfg, key, create, update, rmv, expire); } /** @@ -222,7 +464,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb private void checkFilter(IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception { evts = new ArrayList<>(); - final int expEvts = (vals.size() / 2) * 3; // Remove, create and update for half of modified entries. + final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for half of modified entries. evtsLatch = new CountDownLatch(expEvts); @@ -235,55 +477,87 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb for (Integer key : vals.keySet()) newVals.put(key, -1); - cache.putAll(newVals); + cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 100))).putAll(newVals); + + U.sleep(200); - evtsLatch.await(5000, TimeUnit.MILLISECONDS); + if (!eagerTtl()) { // Provoke expire events if eager ttl is disabled. + for (Integer key : newVals.keySet()) + assertNull(primaryCache(key, cache.getName()).get(key)); + } + + evtsLatch.await(5000, MILLISECONDS); assertEquals(expEvts, evts.size()); - Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator(); + Set<Integer> rmvd = new HashSet<>(); + Set<Integer> created = new HashSet<>(); + Set<Integer> updated = new HashSet<>(); + Set<Integer> expired = new HashSet<>(); - for (Integer key : vals.keySet()) { - if (key % 2 == 0) { - CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next(); + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + assertTrue(evt.getKey() % 2 == 0); - assertTrue(evt.getKey() % 2 == 0); - assertTrue(vals.keySet().contains(evt.getKey())); - assertEquals(REMOVED, evt.getEventType()); - assertNull(evt.getValue()); - assertEquals(vals.get(evt.getKey()), evt.getOldValue()); + assertTrue(vals.keySet().contains(evt.getKey())); - iter.remove(); - } - } + switch (evt.getEventType()) { + case REMOVED: + assertNull(evt.getValue()); - for (Integer key : vals.keySet()) { - if (key % 2 == 0) { - CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next(); + assertEquals(vals.get(evt.getKey()), evt.getOldValue()); - assertTrue(evt.getKey() % 2 == 0); - assertTrue(vals.keySet().contains(evt.getKey())); - assertEquals(CREATED, evt.getEventType()); - assertEquals(vals.get(evt.getKey()), evt.getValue()); - assertNull(evt.getOldValue()); + assertTrue(rmvd.add(evt.getKey())); - iter.remove(); - } - } + break; + + case CREATED: + assertEquals(vals.get(evt.getKey()), evt.getValue()); + + assertNull(evt.getOldValue()); + + assertTrue(rmvd.contains(evt.getKey())); + + assertTrue(created.add(evt.getKey())); + + break; + + case UPDATED: + assertEquals(-1, (int)evt.getValue()); + + assertEquals(vals.get(evt.getKey()), evt.getOldValue()); + + assertTrue(rmvd.contains(evt.getKey())); - for (Integer key : vals.keySet()) { - if (key % 2 == 0) { - CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next(); + assertTrue(created.contains(evt.getKey())); - assertTrue(evt.getKey() % 2 == 0); - assertTrue(vals.keySet().contains(evt.getKey())); - assertEquals(UPDATED, evt.getEventType()); - assertEquals(-1, (int) evt.getValue()); - assertEquals(vals.get(evt.getKey()), evt.getOldValue()); + assertTrue(updated.add(evt.getKey())); - iter.remove(); + break; + + case EXPIRED: + assertNull(evt.getValue()); + + assertEquals(-1, (int)evt.getOldValue()); + + assertTrue(rmvd.contains(evt.getKey())); + + assertTrue(created.contains(evt.getKey())); + + assertTrue(updated.contains(evt.getKey())); + + assertTrue(expired.add(evt.getKey())); + + break; + + default: + fail("Unexpected type: " + evt.getEventType()); } } + + assertEquals(vals.size() / 2, rmvd.size()); + assertEquals(vals.size() / 2, created.size()); + assertEquals(vals.size() / 2, updated.size()); + assertEquals(vals.size() / 2, expired.size()); } /** @@ -293,6 +567,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @param create {@code True} if listens for create events. * @param update {@code True} if listens for update events. * @param rmv {@code True} if listens for remove events. + * @param expire {@code True} if listens for expire events. * @throws Exception If failed. */ private void checkEvents( @@ -301,7 +576,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb Integer key, boolean create, boolean update, - boolean rmv) throws Exception { + boolean rmv, + boolean expire) throws Exception { GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { cache.registerCacheEntryListener(lsnrCfg); @@ -315,7 +591,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb int expEvts = 0; if (create) - expEvts += 2; + expEvts += 4; if (update) expEvts += (UPDATES + 1); @@ -323,6 +599,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb if (rmv) expEvts += 2; + if (expire) + expEvts += 2; + evts = new ArrayList<>(); evtsLatch = new CountDownLatch(expEvts); @@ -338,6 +617,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb assertTrue(cache.remove(key)); + IgniteCache<Integer, Integer> expirePlcCache = + cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100))); + + expirePlcCache.put(key, 10); + + U.sleep(200); + + if (!eagerTtl()) + assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled. + IgniteCache<Integer, Integer> cache1 = cache; if (gridCount() > 1) @@ -349,7 +638,17 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb assertTrue(cache1.remove(key)); - evtsLatch.await(5000, TimeUnit.MILLISECONDS); + IgniteCache<Integer, Integer> expirePlcCache1 = + cache1.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100))); + + expirePlcCache1.put(key, 20); + + U.sleep(200); + + if (!eagerTtl()) + assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled. + + evtsLatch.await(5000, MILLISECONDS); assertEquals(expEvts, evts.size()); @@ -367,6 +666,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb checkEvent(iter, key, REMOVED, null, UPDATES); if (create) + checkEvent(iter, key, CREATED, 10, null); + + if (expire) + checkEvent(iter, key, EXPIRED, null, 10); + + if (create) checkEvent(iter, key, CREATED, 1, null); if (update) @@ -375,9 +680,15 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb if (rmv) checkEvent(iter, key, REMOVED, null, 2); + if (create) + checkEvent(iter, key, CREATED, 20, null); + + if (expire) + checkEvent(iter, key, EXPIRED, null, 20); + assertEquals(0, evts.size()); - log.info("Remove listener. "); + log.info("Remove listener."); cache.deregisterCacheEntryListener(lsnrCfg); @@ -453,6 +764,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb return keys; } + /** + * @return Value for configuration property {@link GridCacheConfiguration#isEagerTtl()}. + */ + protected boolean eagerTtl() { + return true; + } + /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { super.afterTestsStopped(); @@ -466,7 +784,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @param evt Event. */ private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { - //System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']'); + // System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']'); assert evt != null; assert evt.getSource() != null : evt; @@ -524,12 +842,44 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - static class CreateUpdateRemoveListener extends CreateUpdateListener - implements CacheEntryRemovedListener<Integer, Integer> { + static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>, + CacheEntryUpdatedListener<Integer, Integer> { + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + assert evt != null; + assert evt.getSource() != null : evt; + assert evt.getEventType() != null : evt; + assert evt.getKey() != null : evt; + } + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + assert evt != null; + assert evt.getSource() != null : evt; + assert evt.getEventType() != null : evt; + assert evt.getKey() != null : evt; + } + } + } + + /** + * + */ + static class CreateUpdateRemoveExpireListener extends CreateUpdateListener + implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> { /** {@inheritDoc} */ @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) onEvent(evt); } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + onEvent(evt); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java new file mode 100644 index 0000000..6b7110d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java @@ -0,0 +1,22 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.*; + +/** + * Tests expire events when {@link GridCacheConfiguration#isEagerTtl()} is disabled. + */ +public class IgniteCacheEntryListenerEagerTtlDisabledTest extends IgniteCacheEntryListenerTxTest { + /** {@inheritDoc} */ + @Override protected boolean eagerTtl() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java index f40d941..25f954a 100644 --- a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java @@ -480,6 +480,19 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } /** + * @param key Key. + * @param cacheName Cache name. + * @return Cache. + */ + protected <K, V> IgniteCache<K, V> primaryCache(Object key, @Nullable String cacheName) { + ClusterNode node = grid(0).cache(cacheName).affinity().mapKeyToNode(key); + + assertNotNull(node); + + return grid((String)node.attribute(GridNodeAttributes.ATTR_GRID_NAME)).jcache(cacheName); + } + + /** * @param cache Cache. * @return Collection of keys for which given cache is primary. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java index efe49fb..6c01b2f 100644 --- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java +++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java @@ -44,6 +44,7 @@ public class GridDataGridTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheEntryListenerTxTest.class); suite.addTestSuite(IgniteCacheEntryListenerTxReplicatedTest.class); suite.addTestSuite(IgniteCacheEntryListenerTxLocalTest.class); + suite.addTestSuite(IgniteCacheEntryListenerEagerTtlDisabledTest.class); suite.addTest(IgniteCacheExpiryPolicyTestSuite.suite());