ignite-43

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/13740b70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/13740b70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/13740b70

Branch: refs/heads/ignite-1
Commit: 13740b7095469e54b1748dd4914283a0b2a2304b
Parents: f9f0231
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Jan 12 11:45:06 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Jan 12 17:21:01 2015 +0300

----------------------------------------------------------------------
 .../grid/kernal/GridEventConsumeHandler.java    |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |  21 +-
 .../GridCacheDataStructuresManager.java         |   5 +-
 .../GridCacheContinuousQueryAdapter.java        |  14 +-
 .../GridCacheContinuousQueryHandler.java        |  23 +-
 .../GridCacheContinuousQueryHandlerV2.java      |   4 +-
 .../GridCacheContinuousQueryListener.java       |   3 +-
 .../GridCacheContinuousQueryManager.java        | 110 +++--
 .../continuous/GridContinuousMessage.java       |  52 +-
 .../continuous/GridContinuousMessageType.java   |   5 +-
 .../continuous/GridContinuousProcessor.java     | 181 ++++++-
 .../service/GridServiceProcessor.java           |   4 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 482 ++++++++++++++++---
 ...eCacheEntryListenerEagerTtlDisabledTest.java |  22 +
 .../junits/common/GridCommonAbstractTest.java   |  13 +
 .../bamboo/GridDataGridTestSuite.java           |   1 +
 16 files changed, 772 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
index a48f6e1..778b353 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
@@ -142,7 +142,7 @@ class GridEventConsumeHandler implements 
GridContinuousHandler {
                                     }
                                 }
 
-                                ctx.continuous().addNotification(nodeId, 
routineId, wrapper, null);
+                                ctx.continuous().addNotification(nodeId, 
routineId, wrapper, null, false);
                             }
                             catch (IgniteCheckedException e) {
                                 U.error(ctx.log(getClass()), "Failed to send 
event notification to node: " + nodeId, e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index b19e44f..aee3806 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -3342,12 +3342,23 @@ public abstract class GridCacheMapEntry<K, V> 
implements GridCacheEntryEx<K, V>
                             obsolete = true; // Success, will return "true".
                     }
 
-                    if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED))
-                        cctx.events().addEvent(partition(), key, 
cctx.localNodeId(), null, EVT_CACHE_OBJECT_EXPIRED,
-                            null, false, expiredVal, expiredVal != null || 
hasOldBytes, null, null, null);
-                }
-
+                    if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
+                        cctx.events().addEvent(partition(),
+                            key,
+                            cctx.localNodeId(),
+                            null,
+                            EVT_CACHE_OBJECT_EXPIRED,
+                            null,
+                            false,
+                            expiredVal,
+                            expiredVal != null || hasOldBytes,
+                            null,
+                            null,
+                            null);
+                    }
 
+                    cctx.continuousQueries().onEntryExpired(this, key, 
expiredVal, null);
+                }
             }
         }
         catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
index db072d6..384329b 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
@@ -679,7 +679,10 @@ public final class GridCacheDataStructuresManager<K, V> 
extends GridCacheManager
                }
             });
 
-            queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? 
cctx.grid().forLocal() : null, true, false);
+            queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? 
cctx.grid().forLocal() : null,
+                true,
+                false,
+                false);
         }
 
         GridCacheQueueProxy queue = queuesMap.get(header.id());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
index c00c961..06eb2a8 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
@@ -213,12 +213,12 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements GridCacheContinuou
 
     /** {@inheritDoc} */
     @Override public void execute() throws IgniteCheckedException {
-        execute(null, false, false);
+        execute(null, false, false, false);
     }
 
     /** {@inheritDoc} */
     @Override public void execute(@Nullable ClusterGroup prj) throws 
IgniteCheckedException {
-        execute(prj, false, false);
+        execute(prj, false, false, false);
     }
 
     /**
@@ -227,9 +227,13 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements GridCacheContinuou
      * @param prj Grid projection.
      * @param internal If {@code true} then query notified about internal 
entries updates.
      * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
+     * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
      * @throws IgniteCheckedException If failed.
      */
-    public void execute(@Nullable ClusterGroup prj, boolean internal, boolean 
entryLsnr) throws IgniteCheckedException {
+    public void execute(@Nullable ClusterGroup prj,
+        boolean internal,
+        boolean entryLsnr,
+        boolean sync) throws IgniteCheckedException {
         if (locCb == null)
             throw new IllegalStateException("Mandatory local callback is not 
set for the query: " + this);
 
@@ -282,6 +286,7 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements GridCacheContinuou
                     prjPred,
                     internal,
                     entryLsnr,
+                    sync,
                     ctx.kernalContext().job().currentTaskNameHash());
             }
             else {
@@ -291,7 +296,8 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements GridCacheContinuou
                     rmtFilter,
                     prjPred,
                     internal,
-                    entryLsnr);
+                    entryLsnr,
+                    sync);
             }
 
             routineId = ctx.kernalContext().continuous().startRoutine(hnd,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
index e61b2a2..9162b89 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
@@ -65,6 +65,9 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
     /** Entry listener flag. */
     private boolean entryLsnr;
 
+    /** Synchronous listener flag. */
+    private boolean sync;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -80,6 +83,7 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
      * @param prjPred Projection predicate.
      * @param internal If {@code true} then query is notified about internal 
entries updates.
      * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
+     * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
      */
     GridCacheContinuousQueryHandler(@Nullable String cacheName,
         Object topic,
@@ -87,7 +91,8 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         @Nullable 
IgnitePredicate<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, 
V>> filter,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred,
         boolean internal,
-        boolean entryLsnr) {
+        boolean entryLsnr,
+        boolean sync) {
         assert topic != null;
         assert cb != null;
 
@@ -98,6 +103,7 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         this.prjPred = prjPred;
         this.internal = internal;
         this.entryLsnr = entryLsnr;
+        this.sync = sync;
     }
 
     /** {@inheritDoc} */
@@ -150,7 +156,9 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
                 }
             }
 
-            @Override public void 
onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) {
+            @Override public void 
onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e,
+                boolean recordEvt,
+                boolean sync) {
                 boolean notify;
 
                 GridCacheFlag[] f = cacheContext(ctx).forceLocalRead();
@@ -185,7 +193,7 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
                                 depMgr.prepare(e);
                             }
 
-                            ctx.continuous().addNotification(nodeId, 
routineId, e, topic);
+                            ctx.continuous().addNotification(nodeId, 
routineId, e, topic, sync);
                         }
                         catch (IgniteCheckedException ex) {
                             U.error(ctx.log(getClass()), "Failed to send event 
notification to node: " + nodeId, ex);
@@ -250,12 +258,13 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
             }
         };
 
-        return manager(ctx).registerListener(routineId, lsnr, internal, 
entryLsnr);
+        return manager(ctx).registerListener(routineId, lsnr, internal, 
entryLsnr, sync);
     }
 
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, 
GridKernalContext ctx) {
-        manager(ctx).iterate(internal, routineId, entryLsnr);
+        if (!entryLsnr)
+            manager(ctx).iterate(internal, routineId);
     }
 
     /** {@inheritDoc} */
@@ -382,6 +391,8 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         out.writeBoolean(internal);
 
         out.writeBoolean(entryLsnr);
+
+        out.writeBoolean(sync);
     }
 
     /** {@inheritDoc} */
@@ -407,6 +418,8 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         internal = in.readBoolean();
 
         entryLsnr = in.readBoolean();
+
+        sync = in.readBoolean();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
index 63209ec..b4216d5 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
@@ -43,6 +43,7 @@ public class GridCacheContinuousQueryHandlerV2<K, V> extends 
GridCacheContinuous
      * @param prjPred Projection predicate.
      * @param internal If {@code true} then query is notified about internal 
entries updates.
      * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
+     * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
      * @param taskHash Task hash.
      */
     public GridCacheContinuousQueryHandlerV2(@Nullable String cacheName,
@@ -52,8 +53,9 @@ public class GridCacheContinuousQueryHandlerV2<K, V> extends 
GridCacheContinuous
         @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred,
         boolean internal,
         boolean entryLsnr,
+        boolean sync,
         int taskHash) {
-        super(cacheName, topic, cb, filter, prjPred, internal, entryLsnr);
+        super(cacheName, topic, cb, filter, prjPred, internal, entryLsnr, 
sync);
 
         this.taskHash = taskHash;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
index dd6b5f9..2707428 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
@@ -23,6 +23,7 @@ interface GridCacheContinuousQueryListener<K, V> {
      *
      * @param e Entry.
      * @param recordEvt Whether to record event.
+     * @param sync {@code True} if event is synchronous.
      */
-    public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt);
+    public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt, boolean sync);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
index 8c1b70e..8bcbceb 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
@@ -185,21 +185,23 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
         if (F.isEmpty(lsnrCol))
             return;
 
-        GridCacheContinuousQueryEntry<K, V> e0 = new 
GridCacheContinuousQueryEntry<>(
-            cctx,
-            e.wrap(false),
-            key,
-            null,
-            null,
-            oldVal,
-            oldBytes,
-            true);
-
-        for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
-            if (!lsnr.entryListener())
-                continue;
-
-            lsnr.onEntryUpdate(e0, false);
+        if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), 
key, -1)) {
+            GridCacheContinuousQueryEntry<K, V> e0 = new 
GridCacheContinuousQueryEntry<>(
+                cctx,
+                e.wrap(false),
+                key,
+                null,
+                null,
+                oldVal,
+                oldBytes,
+                true);
+
+            for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
+                if (!lsnr.entryListener())
+                    continue;
+
+                lsnr.onEntryUpdate(e0, false);
+            }
         }
     }
 
@@ -252,7 +254,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
 
             qry.remoteFilter(fltr);
 
-            qry.execute(null, false, true);
+            qry.execute(null, false, true, lsnrCfg.isSynchronous());
         }
         catch (IgniteCheckedException e) {
             lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to 
execute it.
@@ -279,12 +281,17 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
      * @param lsnr Listener.
      * @param internal Internal flag.
      * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
+     * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
      * @return Whether listener was actually registered.
      */
-    boolean registerListener(UUID lsnrId, GridCacheContinuousQueryListener<K, 
V> lsnr,
+    boolean registerListener(UUID lsnrId,
+        GridCacheContinuousQueryListener<K, V> lsnr,
         boolean internal,
-        boolean entryLsnr) {
-        ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr);
+        boolean entryLsnr,
+        boolean sync) {
+        assert !sync || entryLsnr;
+
+        ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr, sync);
 
         boolean added;
 
@@ -326,36 +333,33 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
      * Iterates through existing data.
      *
      * @param internal Internal flag.
-     * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
      * @param id Listener ID.
      */
-    void iterate(boolean internal, UUID id, boolean entryLsnr) {
+    void iterate(boolean internal, UUID id) {
         ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id);
 
         assert info != null;
 
-        if (!entryLsnr) {
-            Set<GridCacheEntry<K, V>> entries;
-
-            if (cctx.isReplicated())
-                entries = internal ? cctx.cache().entrySetx() :
-                    cctx.cache().entrySet();
-            else
-                entries = internal ? cctx.cache().primaryEntrySetx() :
-                    cctx.cache().primaryEntrySet();
-
-            for (GridCacheEntry<K, V> e : entries) {
-                GridCacheContinuousQueryEntry<K, V> qryEntry = new 
GridCacheContinuousQueryEntry<>(cctx,
-                    e,
-                    e.getKey(),
-                    e.getValue(),
-                    null,
-                    null,
-                    null,
-                    false);
-
-                info.onIterate(qryEntry, !internal && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ));
-            }
+        Set<GridCacheEntry<K, V>> entries;
+
+        if (cctx.isReplicated())
+            entries = internal ? cctx.cache().entrySetx() :
+                cctx.cache().entrySet();
+        else
+            entries = internal ? cctx.cache().primaryEntrySetx() :
+                cctx.cache().primaryEntrySet();
+
+        for (GridCacheEntry<K, V> e : entries) {
+            GridCacheContinuousQueryEntry<K, V> qryEntry = new 
GridCacheContinuousQueryEntry<>(cctx,
+                e,
+                e.getKey(),
+                e.getValue(),
+                null,
+                null,
+                null,
+                false);
+
+            info.onIterate(qryEntry, !internal && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ));
         }
 
         info.flushPending();
@@ -369,18 +373,26 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
         private final GridCacheContinuousQueryListener<K, V> lsnr;
 
         /** Pending entries. */
-        private Collection<PendingEntry<K, V>> pending = new LinkedList<>();
+        private Collection<PendingEntry<K, V>> pending;
+
+        /** */
+        private final boolean entryLsnr;
 
         /** */
-        private boolean entryLsnr;
+        private final boolean sync;
 
         /**
          * @param lsnr Listener.
          * @param entryLsnr {@code True} if listener created for {@link 
CacheEntryListener}.
+         * @param sync {@code True} if listener is synchronous.
          */
-        private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, 
boolean entryLsnr) {
+        private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, 
boolean entryLsnr, boolean sync) {
             this.lsnr = lsnr;
             this.entryLsnr = entryLsnr;
+            this.sync = sync;
+
+            if (!entryLsnr)
+                pending = new LinkedList<>();
         }
 
         /**
@@ -399,7 +411,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
             }
 
             if (notifyLsnr)
-                lsnr.onEntryUpdate(e, recordEvt);
+                lsnr.onEntryUpdate(e, recordEvt, sync);
         }
 
         /**
@@ -407,7 +419,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
          * @param recordEvt Whether to record event.
          */
         void onIterate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt) {
-            lsnr.onEntryUpdate(e, recordEvt);
+            lsnr.onEntryUpdate(e, recordEvt, sync);
         }
 
         /**
@@ -423,7 +435,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
             }
 
             for (PendingEntry<K, V> e : pending0)
-                lsnr.onEntryUpdate(e.entry, e.recordEvt);
+                lsnr.onEntryUpdate(e.entry, e.recordEvt, sync);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java
index 09b7171..27a6429 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.continuous;
 
+import org.apache.ignite.lang.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.util.direct.*;
 import org.gridgain.grid.util.typedef.internal.*;
@@ -19,6 +20,8 @@ import java.io.*;
 import java.nio.*;
 import java.util.*;
 
+import static 
org.gridgain.grid.kernal.processors.continuous.GridContinuousMessageType.*;
+
 /**
  * Continuous processor message.
  */
@@ -40,6 +43,9 @@ public class GridContinuousMessage extends 
GridTcpCommunicationMessageAdapter {
     /** Serialized message data. */
     private byte[] dataBytes;
 
+    /** Future ID for synchronous event notifications. */
+    private IgniteUuid futId;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -50,14 +56,19 @@ public class GridContinuousMessage extends 
GridTcpCommunicationMessageAdapter {
     /**
      * @param type Message type.
      * @param routineId Consume ID.
+     * @param futId Future ID.
      * @param data Optional message data.
      */
-    GridContinuousMessage(GridContinuousMessageType type, UUID routineId, 
@Nullable Object data) {
+    GridContinuousMessage(GridContinuousMessageType type,
+        @Nullable UUID routineId,
+        @Nullable IgniteUuid futId,
+        @Nullable Object data) {
         assert type != null;
-        assert routineId != null;
+        assert routineId != null || type == MSG_EVT_ACK;
 
         this.type = type;
         this.routineId = routineId;
+        this.futId = futId;
         this.data = data;
     }
 
@@ -78,6 +89,7 @@ public class GridContinuousMessage extends 
GridTcpCommunicationMessageAdapter {
     /**
      * @return Message data.
      */
+    @SuppressWarnings("unchecked")
     public <T> T data() {
         return (T)data;
     }
@@ -103,14 +115,21 @@ public class GridContinuousMessage extends 
GridTcpCommunicationMessageAdapter {
         this.dataBytes = dataBytes;
     }
 
+    /**
+     * @return Future ID for synchronous event notification.
+     */
+    @Nullable public IgniteUuid futureId() {
+        return futId;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
     @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridContinuousMessage clone = new GridContinuousMessage();
+        GridContinuousMessage _clone = new GridContinuousMessage();
 
-        clone0(clone);
+        clone0(_clone);
 
-        return clone;
+        return _clone;
     }
 
     /** {@inheritDoc} */
@@ -119,6 +138,7 @@ public class GridContinuousMessage extends 
GridTcpCommunicationMessageAdapter {
 
         clone.type = type;
         clone.routineId = routineId;
+        clone.futId = futId;
         clone.data = data;
         clone.dataBytes = dataBytes;
     }
@@ -143,12 +163,18 @@ public class GridContinuousMessage extends 
GridTcpCommunicationMessageAdapter {
                 commState.idx++;
 
             case 1:
-                if (!commState.putUuid(routineId))
+                if (!commState.putGridUuid(futId))
                     return false;
 
                 commState.idx++;
 
             case 2:
+                if (!commState.putUuid(routineId))
+                    return false;
+
+                commState.idx++;
+
+            case 3:
                 if (!commState.putEnum(type))
                     return false;
 
@@ -176,6 +202,16 @@ public class GridContinuousMessage extends 
GridTcpCommunicationMessageAdapter {
                 commState.idx++;
 
             case 1:
+                IgniteUuid futId0 = commState.getGridUuid();
+
+                if (futId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                futId = futId0;
+
+                commState.idx++;
+
+            case 2:
                 UUID routineId0 = commState.getUuid();
 
                 if (routineId0 == UUID_NOT_READ)
@@ -185,13 +221,13 @@ public class GridContinuousMessage extends 
GridTcpCommunicationMessageAdapter {
 
                 commState.idx++;
 
-            case 2:
+            case 3:
                 if (buf.remaining() < 1)
                     return false;
 
                 byte type0 = commState.getByte();
 
-                type = GridContinuousMessageType.fromOrdinal(type0);
+                type = fromOrdinal(type0);
 
                 commState.idx++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java
index 4ebc33a..b07a6e8 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java
@@ -28,7 +28,10 @@ enum GridContinuousMessageType {
     MSG_STOP_ACK,
 
     /** Remote event notification. */
-    MSG_EVT_NOTIFICATION;
+    MSG_EVT_NOTIFICATION,
+
+    /** Event notification acknowledgement for synchronous events. */
+    MSG_EVT_ACK;
 
     /** Enumerated values. */
     private static final GridContinuousMessageType[] VALS = values();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
index 14d89f3..e7f8619 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
@@ -69,6 +69,9 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /** Pending start requests. */
     private final Map<UUID, Collection<GridContinuousMessage>> pending = new 
HashMap<>();
 
+    /** */
+    private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts 
= new ConcurrentHashMap8<>();
+
     /** Stopped IDs. */
     private final Collection<UUID> stopped = new HashSet<>();
 
@@ -206,6 +209,21 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                                 unregisterRemote(routineId);
                         }
 
+                        for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : 
syncMsgFuts.entrySet()) {
+                            SyncMessageAckFuture fut = e.getValue();
+
+                            if (fut.nodeId().equals(nodeId)) {
+                                SyncMessageAckFuture fut0 = 
syncMsgFuts.remove(e.getKey());
+
+                                if (fut0 != null) {
+                                    ClusterTopologyException err = new 
ClusterTopologyException(
+                                        "Node left grid while sending message 
to: " + nodeId);
+
+                                    fut0.onDone(err);
+                                }
+                            }
+                        }
+
                         break;
 
                     default:
@@ -255,6 +273,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
                         break;
 
+                    case MSG_EVT_ACK:
+                        processMessageAck(msg);
+
+                        break;
+
                     default:
                         assert false : "Unexpected message received: " + 
msg.type();
                 }
@@ -343,8 +366,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @return Future.
      */
     @SuppressWarnings("TooBroadScope")
-    public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd, int 
bufSize, long interval,
-        boolean autoUnsubscribe, @Nullable IgnitePredicate<ClusterNode> 
prjPred) {
+    public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe,
+        @Nullable IgnitePredicate<ClusterNode> prjPred) {
         assert hnd != null;
         assert bufSize > 0;
         assert interval >= 0;
@@ -432,7 +458,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             // these nodes.
             for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : 
pending.entrySet()) {
                 if (nodeIds.add(e.getKey()))
-                    e.getValue().add(new GridContinuousMessage(MSG_START_REQ, 
routineId, reqData));
+                    e.getValue().add(new GridContinuousMessage(MSG_START_REQ, 
routineId, null, reqData));
             }
 
             // Register routine locally.
@@ -484,7 +510,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             // Send start requests.
             try {
-                GridContinuousMessage req = new 
GridContinuousMessage(MSG_START_REQ, routineId, reqData);
+                GridContinuousMessage req = new 
GridContinuousMessage(MSG_START_REQ, routineId, null, reqData);
 
                 sendWithRetries(nodes, req, null);
             }
@@ -590,13 +616,15 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 // Register acknowledge timeout (timeout object will be 
removed when
                 // future is completed).
                 fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, 
routineId,
-                    new GridContinuousMessage(MSG_STOP_REQ, routineId, null)));
+                    new GridContinuousMessage(MSG_STOP_REQ, routineId, null, 
null)));
 
                 // Send stop requests.
                 try {
                     for (ClusterNode node : nodes) {
                         try {
-                            sendWithRetries(node.id(), new 
GridContinuousMessage(MSG_STOP_REQ, routineId, null), null);
+                            sendWithRetries(node.id(),
+                                new GridContinuousMessage(MSG_STOP_REQ, 
routineId, null, null),
+                                null);
                         }
                         catch (ClusterTopologyException ignored) {
                             U.warn(log, "Failed to send stop request (node 
left topology): " + node.id());
@@ -624,11 +652,15 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param nodeId ID of the node that started routine.
      * @param routineId Routine ID.
      * @param obj Notification object.
-     * @param orderedTopic Topic for ordered notifications.
-     *      If {@code null}, non-ordered message will be sent.
+     * @param orderedTopic Topic for ordered notifications. If {@code null}, 
non-ordered message will be sent.
+     * @param sync If {@code true} then waits for event acknowledgment.
      * @throws IgniteCheckedException In case of error.
      */
-    public void addNotification(UUID nodeId, UUID routineId, @Nullable Object 
obj, @Nullable Object orderedTopic)
+    public void addNotification(UUID nodeId,
+        UUID routineId,
+        @Nullable Object obj,
+        @Nullable Object orderedTopic,
+        boolean sync)
         throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
@@ -638,29 +670,55 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         RemoteRoutineInfo info = rmtInfos.get(routineId);
 
         if (info != null) {
-            Collection<Object> toSnd = info.add(obj);
+            assert info.interval == 0 || !sync;
+
+            if (sync) {
+                SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, 
nodeId);
+
+                IgniteUuid futId = IgniteUuid.randomUuid();
+
+                syncMsgFuts.put(futId, fut);
+
+                try {
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), 
orderedTopic);
+                }
+                catch (IgniteCheckedException e) {
+                    syncMsgFuts.remove(futId);
 
-            if (toSnd != null)
-                sendNotification(nodeId, routineId, toSnd, orderedTopic);
+                    throw e;
+                }
+
+                fut.get(ackTimeout);
+            }
+            else {
+                Collection<Object> toSnd = info.add(obj);
+
+                if (toSnd != null)
+                    sendNotification(nodeId, routineId, null, toSnd, 
orderedTopic);
+            }
         }
     }
 
     /**
      * @param nodeId Node ID.
      * @param routineId Routine ID.
+     * @param futId Future ID.
      * @param toSnd Notification object to send.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendNotification(UUID nodeId, UUID routineId, 
Collection<Object> toSnd,
+    private void sendNotification(UUID nodeId,
+        UUID routineId,
+        @Nullable IgniteUuid futId,
+        Collection<Object> toSnd,
         @Nullable Object orderedTopic) throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
         assert toSnd != null;
         assert !toSnd.isEmpty();
 
-        sendWithRetries(nodeId, new 
GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, toSnd), orderedTopic);
+        sendWithRetries(nodeId, new 
GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd), 
orderedTopic);
     }
 
     /**
@@ -722,7 +780,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         }
 
         try {
-            sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, 
routineId, err), null);
+            sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, 
routineId, null, err), null);
         }
         catch (ClusterTopologyException ignored) {
             if (log.isDebugEnabled())
@@ -783,7 +841,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         unregisterRemote(routineId);
 
         try {
-            sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, 
routineId, null), null);
+            sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, 
routineId, null, null), null);
         }
         catch (ClusterTopologyException ignored) {
             if (log.isDebugEnabled())
@@ -815,19 +873,45 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param msg Message.
+     */
+    private void processMessageAck(GridContinuousMessage msg) {
+        assert msg.futureId() != null;
+
+        SyncMessageAckFuture fut = syncMsgFuts.remove(msg.futureId());
+
+        if (fut != null)
+            fut.onDone();
+    }
+
+    /**
      * @param nodeId Sender ID.
-     * @param ntf Notification.
+     * @param msg Message.
      */
-    private void processNotification(UUID nodeId, GridContinuousMessage ntf) {
+    private void processNotification(UUID nodeId, GridContinuousMessage msg) {
         assert nodeId != null;
-        assert ntf != null;
+        assert msg != null;
 
-        UUID routineId = ntf.routineId();
+        UUID routineId = msg.routineId();
 
-        LocalRoutineInfo routine = locInfos.get(routineId);
+        try {
+            LocalRoutineInfo routine = locInfos.get(routineId);
 
-        if (routine != null)
-            routine.hnd.notifyCallback(nodeId, routineId, 
(Collection<?>)ntf.data(), ctx);
+            if (routine != null)
+                routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>) 
msg.data(), ctx);
+        }
+        finally {
+            if (msg.futureId() != null) {
+                try {
+                    sendWithRetries(nodeId,
+                        new GridContinuousMessage(MSG_EVT_ACK, null, 
msg.futureId(), null),
+                        null);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to send event acknowledgment to node: " 
+ nodeId, e);
+                }
+            }
+        }
     }
 
     /**
@@ -871,8 +955,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @return Whether listener was actually registered.
      * @throws IgniteCheckedException In case of error.
      */
-    private boolean registerHandler(final UUID nodeId, final UUID routineId, 
final GridContinuousHandler hnd,
-        int bufSize, final long interval, boolean autoUnsubscribe, boolean 
loc) throws IgniteCheckedException {
+    private boolean registerHandler(final UUID nodeId,
+        final UUID routineId,
+        final GridContinuousHandler hnd,
+        int bufSize,
+        final long interval,
+        boolean autoUnsubscribe,
+        boolean loc) throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
         assert hnd != null;
@@ -915,7 +1004,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
                             if (toSnd != null) {
                                 try {
-                                    sendNotification(nodeId, routineId, toSnd, 
hnd.orderedTopic());
+                                    sendNotification(nodeId, routineId, null, 
toSnd, hnd.orderedTopic());
                                 }
                                 catch (ClusterTopologyException ignored) {
                                     if (log.isDebugEnabled())
@@ -1637,6 +1726,46 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Future for stop routine.
+     */
+    private static class SyncMessageAckFuture extends 
GridFutureAdapter<Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private UUID nodeId;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public SyncMessageAckFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         * @param nodeId Master node ID.
+         */
+        SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) {
+            super(ctx);
+
+            this.nodeId = nodeId;
+        }
+
+        /**
+         * @return Master node ID.
+         */
+        UUID nodeId() {
+            return nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SyncMessageAckFuture.class, this);
+        }
+    }
+
+    /**
      * Timeout object for stop process.
      */
     private class StopTimeoutObject extends GridTimeoutObjectAdapter {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
index d8dfc97..5d9edc9 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
@@ -124,13 +124,13 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
 
             cfgQry.localCallback(new DeploymentListener());
 
-            cfgQry.execute(ctx.grid().forLocal(), true, false);
+            cfgQry.execute(ctx.grid().forLocal(), true, false, false);
 
             assignQry = (GridCacheContinuousQueryAdapter<Object, 
Object>)cache.queries().createContinuousQuery();
 
             assignQry.localCallback(new AssignmentListener());
 
-            assignQry.execute(ctx.grid().forLocal(), true, false);
+            assignQry.execute(ctx.grid().forLocal(), true, false, false);
         }
         finally {
             if (ctx.deploy().enabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 6e3221a..04867d9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -10,6 +10,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;
@@ -17,12 +18,16 @@ import org.jetbrains.annotations.*;
 
 import javax.cache.configuration.*;
 import javax.cache.event.*;
+import javax.cache.expiry.*;
 
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
+import static java.util.concurrent.TimeUnit.*;
 import static javax.cache.event.EventType.*;
 import static org.gridgain.grid.cache.GridCacheMode.*;
+import static org.gridgain.grid.cache.GridCachePreloadMode.*;
 
 /**
  *
@@ -35,6 +40,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
     private static volatile CountDownLatch evtsLatch;
 
     /** */
+    private static volatile CountDownLatch syncEvtLatch;
+
+    /** */
     private Integer lastKey = 0;
 
     /** */
@@ -48,12 +56,212 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         if (lsnrCfg != null)
             cfg.addCacheEntryListenerConfiguration(lsnrCfg);
 
+        cfg.setEagerTtl(eagerTtl());
+
+        cfg.setPreloadMode(SYNC);
+
         return cfg;
     }
 
     /**
      * @throws Exception If failed.
      */
+    public void testSynchronousEvents() throws Exception {
+        final CacheEntryCreatedListener<Integer, Integer> lsnr = new 
CreateUpdateRemoveExpireListener() {
+            @Override public void onRemoved(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                super.onRemoved(evts);
+
+                awaitLatch();
+            }
+
+            @Override public void onCreated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                super.onCreated(evts);
+
+                awaitLatch();
+            }
+
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                super.onUpdated(evts);
+
+                awaitLatch();
+            }
+
+            private void awaitLatch() {
+                try {
+                    assertTrue(syncEvtLatch.await(5000, MILLISECONDS));
+                }
+                catch (InterruptedException e) {
+                    fail("Unexpected exception: " + e);
+                }
+            }
+        };
+
+        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() 
{
+                    return lsnr;
+                }
+            },
+            null,
+            true,
+            true
+        );
+
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        cache.registerCacheEntryListener(lsnrCfg);
+
+        try {
+            for (Integer key : keys()) {
+                log.info("Check synchronous create event [key=" + key + ']');
+
+                syncEvent(key, 1, cache, 1);
+
+                checkEvent(evts.iterator(), key, CREATED, 1, null);
+
+                log.info("Check synchronous update event [key=" + key + ']');
+
+                syncEvent(key, 2, cache, 1);
+
+                checkEvent(evts.iterator(), key, UPDATED, 2, 1);
+
+                log.info("Check synchronous remove event [key=" + key + ']');
+
+                syncEvent(key, null, cache, 1);
+
+                checkEvent(evts.iterator(), key, REMOVED, null, 2);
+
+                log.info("Check synchronous expire event [key=" + key + ']');
+
+                syncEvent(key,
+                    3,
+                    cache.withExpiryPolicy(new ModifiedExpiryPolicy(new 
Duration(MILLISECONDS, 1000))),
+                    eagerTtl() ? 2 : 1);
+
+                checkEvent(evts.iterator(), key, CREATED, 3, null);
+
+                if (!eagerTtl()) {
+                    U.sleep(1100);
+
+                    assertNull(primaryCache(key, cache.getName()).get(key));
+
+                    evtsLatch.await(5000, MILLISECONDS);
+
+                    assertEquals(1, evts.size());
+                }
+
+                checkEvent(evts.iterator(), key, EXPIRED, null, 3);
+
+                assertEquals(0, evts.size());
+            }
+        }
+        finally {
+            cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSynchronousEventsListenerNodeFailed() throws Exception {
+        if (cacheMode() != PARTITIONED)
+            return;
+
+        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() 
{
+                    return new NoOpCreateUpdateListener();
+                }
+            },
+            null,
+            true,
+            true
+        );
+
+        final Ignite grid = startGrid(gridCount());
+
+        try {
+            awaitPartitionMapExchange();
+
+            IgniteCache<Integer, Integer> cache = jcache(0);
+
+            Map<Integer, Integer> vals = new HashMap<>();
+
+            for (Integer key : nearKeys(grid.cache(null), 100, 1_000_000))
+                vals.put(key, 1);
+
+            final AtomicBoolean done = new AtomicBoolean();
+
+            IgniteFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    U.sleep(500);
+
+                    stopGrid(grid.name());
+
+                    done.set(true);
+
+                    return null;
+                }
+            });
+
+            while (!done.get())
+                cache.putAll(vals);
+
+            fut.get();
+        }
+        finally {
+            stopGrid(gridCount());
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param cache Cache.
+     * @param expEvts Expected events number.
+     * @throws Exception If failed.
+     */
+    private void syncEvent(Integer key, Integer val, IgniteCache<Integer, 
Integer> cache, int expEvts)
+        throws Exception {
+        evts = new ArrayList<>();
+
+        evtsLatch = new CountDownLatch(expEvts);
+
+        syncEvtLatch = new CountDownLatch(1);
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                assertFalse(done.get());
+
+                U.sleep(500);
+
+                assertFalse(done.get());
+
+                syncEvtLatch.countDown();
+
+                return null;
+            }
+        });
+
+        if (val != null)
+            cache.put(key, val);
+        else
+            cache.remove(key);
+
+        done.set(true);
+
+        fut.get();
+
+        evtsLatch.await(5000, MILLISECONDS);
+
+        assertEquals(expEvts, evts.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testEvents() throws Exception {
         CacheEntryCreatedListener<Integer, Integer> createLsnr = new 
CacheEntryCreatedListener<Integer, Integer>() {
             @Override public void onCreated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
@@ -76,6 +284,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
             }
         };
 
+        CacheEntryExpiredListener<Integer, Integer> expireLsnr = new 
CacheEntryExpiredListener<Integer, Integer>() {
+            @Override public void onExpired(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt 
: evts)
+                    onEvent(evt);
+            }
+        };
+
         IgniteCache<Integer, Integer> cache = jcache();
 
         Map<Integer, Integer> vals = new HashMap<>();
@@ -88,29 +303,33 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         for (Integer key : keys()) {
             log.info("Check create event [key=" + key + ']');
 
-            checkEvents(cache, createLsnr, key, true, false, false);
+            checkEvents(cache, createLsnr, key, true, false, false, false);
 
             log.info("Check update event [key=" + key + ']');
 
-            checkEvents(cache, updateLsnr, key, false, true, false);
+            checkEvents(cache, updateLsnr, key, false, true, false, false);
 
             log.info("Check remove event [key=" + key + ']');
 
-            checkEvents(cache, rmvLsnr, key, false, false, true);
+            checkEvents(cache, rmvLsnr, key, false, false, true, false);
+
+            log.info("Check expire event [key=" + key + ']');
+
+            checkEvents(cache, expireLsnr, key, false, false, false, true);
 
             log.info("Check create/update events [key=" + key + ']');
 
-            checkEvents(cache, new CreateUpdateListener(), key, true, true, 
false);
+            checkEvents(cache, new CreateUpdateListener(), key, true, true, 
false, false);
 
-            log.info("Check create/update/remove events [key=" + key + ']');
+            log.info("Check create/update/remove/expire events [key=" + key + 
']');
 
-            checkEvents(cache, new CreateUpdateRemoveListener(), key, true, 
true, true);
+            checkEvents(cache, new CreateUpdateRemoveExpireListener(), key, 
true, true, true, true);
         }
 
         CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
             new Factory<CacheEntryListener<Integer, Integer>>() {
                 @Override public CacheEntryListener<Integer, Integer> create() 
{
-                    return new CreateUpdateRemoveListener();
+                    return new CreateUpdateRemoveExpireListener();
                 }
             },
             new TestFilterFactory(),
@@ -122,13 +341,21 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         log.info("Check filter.");
 
-        checkFilter(cache, vals);
-
-        cache.deregisterCacheEntryListener(lsnrCfg);
+        try {
+            checkFilter(cache, vals);
+        }
+        finally {
+            cache.deregisterCacheEntryListener(lsnrCfg);
+        }
 
         cache.putAll(vals);
 
-        checkListenerOnStart(vals);
+        try {
+            checkListenerOnStart(vals);
+        }
+        finally {
+            cache.removeAll(vals.keySet());
+        }
     }
 
     /**
@@ -140,7 +367,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new Factory<CacheEntryListener<Integer, Integer>>() {
                 @Override public CacheEntryListener<Integer, Integer> create() 
{
-                    return new CreateUpdateRemoveListener();
+                    return new CreateUpdateRemoveExpireListener();
                 }
             },
             null,
@@ -150,20 +377,25 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         Ignite grid = startGrid(gridCount());
 
-        IgniteCache<Integer, Integer> cache = grid.jcache(null);
+        try {
+            awaitPartitionMapExchange();
 
-        Integer key = Integer.MAX_VALUE;
+            IgniteCache<Integer, Integer> cache = grid.jcache(null);
 
-        log.info("Check create/update/remove events for listener in 
configuration [key=" + key + ']');
+            Integer key = Integer.MAX_VALUE;
 
-        checkEvents(cache, lsnrCfg, key, true, true, true);
+            log.info("Check create/update/remove events for listener in 
configuration [key=" + key + ']');
 
-        stopGrid(gridCount());
+            checkEvents(cache, lsnrCfg, key, true, true, true, true);
+        }
+        finally {
+            stopGrid(gridCount());
+        }
 
         lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new Factory<CacheEntryListener<Integer, Integer>>() {
                 @Override public CacheEntryListener<Integer, Integer> create() 
{
-                    return new CreateUpdateRemoveListener();
+                    return new CreateUpdateRemoveExpireListener();
                 }
             },
             new TestFilterFactory(),
@@ -173,13 +405,21 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         grid = startGrid(gridCount());
 
-        cache = grid.jcache(null);
+        try {
+            awaitPartitionMapExchange();
+
+            IgniteCache<Integer, Integer> cache = grid.jcache(null);
 
-        log.info("Check filter for listener in configuration.");
+            log.info("Check filter for listener in configuration.");
 
-        checkFilter(cache, vals);
+            if (cacheMode() == LOCAL)
+                cache.putAll(vals);
 
-        stopGrid(gridCount());
+            checkFilter(cache, vals);
+        }
+        finally {
+            stopGrid(gridCount());
+        }
     }
 
     /**
@@ -189,6 +429,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
      * @param create {@code True} if listens for create events.
      * @param update {@code True} if listens for update events.
      * @param rmv {@code True} if listens for remove events.
+     * @param expire {@code True} if listens for expire events.
      * @throws Exception If failed.
      */
     private void checkEvents(
@@ -197,7 +438,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         Integer key,
         boolean create,
         boolean update,
-        boolean rmv) throws Exception {
+        boolean rmv,
+        boolean expire) throws Exception {
         CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
             new Factory<CacheEntryListener<Integer, Integer>>() {
                 @Override public CacheEntryListener<Integer, Integer> create() 
{
@@ -211,7 +453,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
 
         cache.registerCacheEntryListener(lsnrCfg);
 
-        checkEvents(cache, lsnrCfg, key, create, update, rmv);
+        checkEvents(cache, lsnrCfg, key, create, update, rmv, expire);
     }
 
     /**
@@ -222,7 +464,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
     private void checkFilter(IgniteCache<Integer, Integer> cache, Map<Integer, 
Integer> vals) throws Exception {
         evts = new ArrayList<>();
 
-        final int expEvts = (vals.size() / 2) * 3; // Remove, create and 
update for half of modified entries.
+        final int expEvts = (vals.size() / 2) * 4; // Remove, create, update 
and expire for half of modified entries.
 
         evtsLatch = new CountDownLatch(expEvts);
 
@@ -235,55 +477,87 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         for (Integer key : vals.keySet())
             newVals.put(key, -1);
 
-        cache.putAll(newVals);
+        cache.withExpiryPolicy(new ModifiedExpiryPolicy(new 
Duration(MILLISECONDS, 100))).putAll(newVals);
+
+        U.sleep(200);
 
-        evtsLatch.await(5000, TimeUnit.MILLISECONDS);
+        if (!eagerTtl()) { // Provoke expire events if eager ttl is disabled.
+            for (Integer key : newVals.keySet())
+                assertNull(primaryCache(key, cache.getName()).get(key));
+        }
+
+        evtsLatch.await(5000, MILLISECONDS);
 
         assertEquals(expEvts, evts.size());
 
-        Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = 
evts.iterator();
+        Set<Integer> rmvd = new HashSet<>();
+        Set<Integer> created = new HashSet<>();
+        Set<Integer> updated = new HashSet<>();
+        Set<Integer> expired = new HashSet<>();
 
-        for (Integer key : vals.keySet()) {
-            if (key % 2 == 0) {
-                CacheEntryEvent<? extends Integer, ? extends Integer> evt = 
iter.next();
+        for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) 
{
+            assertTrue(evt.getKey() % 2 == 0);
 
-                assertTrue(evt.getKey() % 2 == 0);
-                assertTrue(vals.keySet().contains(evt.getKey()));
-                assertEquals(REMOVED, evt.getEventType());
-                assertNull(evt.getValue());
-                assertEquals(vals.get(evt.getKey()), evt.getOldValue());
+            assertTrue(vals.keySet().contains(evt.getKey()));
 
-                iter.remove();
-            }
-        }
+            switch (evt.getEventType()) {
+                case REMOVED:
+                    assertNull(evt.getValue());
 
-        for (Integer key : vals.keySet()) {
-            if (key % 2 == 0) {
-                CacheEntryEvent<? extends Integer, ? extends Integer> evt = 
iter.next();
+                    assertEquals(vals.get(evt.getKey()), evt.getOldValue());
 
-                assertTrue(evt.getKey() % 2 == 0);
-                assertTrue(vals.keySet().contains(evt.getKey()));
-                assertEquals(CREATED, evt.getEventType());
-                assertEquals(vals.get(evt.getKey()), evt.getValue());
-                assertNull(evt.getOldValue());
+                    assertTrue(rmvd.add(evt.getKey()));
 
-                iter.remove();
-            }
-        }
+                    break;
+
+                case CREATED:
+                    assertEquals(vals.get(evt.getKey()), evt.getValue());
+
+                    assertNull(evt.getOldValue());
+
+                    assertTrue(rmvd.contains(evt.getKey()));
+
+                    assertTrue(created.add(evt.getKey()));
+
+                    break;
+
+                case UPDATED:
+                    assertEquals(-1, (int)evt.getValue());
+
+                    assertEquals(vals.get(evt.getKey()), evt.getOldValue());
+
+                    assertTrue(rmvd.contains(evt.getKey()));
 
-        for (Integer key : vals.keySet()) {
-            if (key % 2 == 0) {
-                CacheEntryEvent<? extends Integer, ? extends Integer> evt = 
iter.next();
+                    assertTrue(created.contains(evt.getKey()));
 
-                assertTrue(evt.getKey() % 2 == 0);
-                assertTrue(vals.keySet().contains(evt.getKey()));
-                assertEquals(UPDATED, evt.getEventType());
-                assertEquals(-1, (int) evt.getValue());
-                assertEquals(vals.get(evt.getKey()), evt.getOldValue());
+                    assertTrue(updated.add(evt.getKey()));
 
-                iter.remove();
+                    break;
+
+                case EXPIRED:
+                    assertNull(evt.getValue());
+
+                    assertEquals(-1, (int)evt.getOldValue());
+
+                    assertTrue(rmvd.contains(evt.getKey()));
+
+                    assertTrue(created.contains(evt.getKey()));
+
+                    assertTrue(updated.contains(evt.getKey()));
+
+                    assertTrue(expired.add(evt.getKey()));
+
+                    break;
+
+                default:
+                    fail("Unexpected type: " + evt.getEventType());
             }
         }
+
+        assertEquals(vals.size() / 2, rmvd.size());
+        assertEquals(vals.size() / 2, created.size());
+        assertEquals(vals.size() / 2, updated.size());
+        assertEquals(vals.size() / 2, expired.size());
     }
 
     /**
@@ -293,6 +567,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
      * @param create {@code True} if listens for create events.
      * @param update {@code True} if listens for update events.
      * @param rmv {@code True} if listens for remove events.
+     * @param expire {@code True} if listens for expire events.
      * @throws Exception If failed.
      */
     private void checkEvents(
@@ -301,7 +576,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         Integer key,
         boolean create,
         boolean update,
-        boolean rmv) throws Exception {
+        boolean rmv,
+        boolean expire) throws Exception {
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
                 cache.registerCacheEntryListener(lsnrCfg);
@@ -315,7 +591,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         int expEvts = 0;
 
         if (create)
-            expEvts += 2;
+            expEvts += 4;
 
         if (update)
             expEvts += (UPDATES + 1);
@@ -323,6 +599,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         if (rmv)
             expEvts += 2;
 
+        if (expire)
+            expEvts += 2;
+
         evts = new ArrayList<>();
 
         evtsLatch = new CountDownLatch(expEvts);
@@ -338,6 +617,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
 
         assertTrue(cache.remove(key));
 
+        IgniteCache<Integer, Integer> expirePlcCache =
+            cache.withExpiryPolicy(new CreatedExpiryPolicy(new 
Duration(MILLISECONDS, 100)));
+
+        expirePlcCache.put(key, 10);
+
+        U.sleep(200);
+
+        if (!eagerTtl())
+            assertNull(primaryCache(key, cache.getName()).get(key)); // 
Provoke expire event if eager ttl is disabled.
+
         IgniteCache<Integer, Integer> cache1 = cache;
 
         if (gridCount() > 1)
@@ -349,7 +638,17 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
 
         assertTrue(cache1.remove(key));
 
-        evtsLatch.await(5000, TimeUnit.MILLISECONDS);
+        IgniteCache<Integer, Integer> expirePlcCache1 =
+            cache1.withExpiryPolicy(new CreatedExpiryPolicy(new 
Duration(MILLISECONDS, 100)));
+
+        expirePlcCache1.put(key, 20);
+
+        U.sleep(200);
+
+        if (!eagerTtl())
+            assertNull(primaryCache(key, cache.getName()).get(key)); // 
Provoke expire event if eager ttl is disabled.
+
+        evtsLatch.await(5000, MILLISECONDS);
 
         assertEquals(expEvts, evts.size());
 
@@ -367,6 +666,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
             checkEvent(iter, key, REMOVED, null, UPDATES);
 
         if (create)
+            checkEvent(iter, key, CREATED, 10, null);
+
+        if (expire)
+            checkEvent(iter, key, EXPIRED, null, 10);
+
+        if (create)
             checkEvent(iter, key, CREATED, 1, null);
 
         if (update)
@@ -375,9 +680,15 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         if (rmv)
             checkEvent(iter, key, REMOVED, null, 2);
 
+        if (create)
+            checkEvent(iter, key, CREATED, 20, null);
+
+        if (expire)
+            checkEvent(iter, key, EXPIRED, null, 20);
+
         assertEquals(0, evts.size());
 
-        log.info("Remove listener. ");
+        log.info("Remove listener.");
 
         cache.deregisterCacheEntryListener(lsnrCfg);
 
@@ -453,6 +764,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         return keys;
     }
 
+    /**
+     * @return Value for configuration property {@link 
GridCacheConfiguration#isEagerTtl()}.
+     */
+    protected boolean eagerTtl() {
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         super.afterTestsStopped();
@@ -466,7 +784,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
      * @param evt Event.
      */
     private static void onEvent(CacheEntryEvent<? extends Integer, ? extends 
Integer> evt) {
-        //System.out.println("Received event [evt=" + evt + ", thread=" + 
Thread.currentThread().getName() + ']');
+        // System.out.println("Received event [evt=" + evt + ", thread=" + 
Thread.currentThread().getName() + ']');
 
         assert evt != null;
         assert evt.getSource() != null : evt;
@@ -524,12 +842,44 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    static class CreateUpdateRemoveListener extends CreateUpdateListener
-        implements CacheEntryRemovedListener<Integer, Integer> {
+    static class NoOpCreateUpdateListener implements 
CacheEntryCreatedListener<Integer, Integer>,
+        CacheEntryUpdatedListener<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : 
evts) {
+                assert evt != null;
+                assert evt.getSource() != null : evt;
+                assert evt.getEventType() != null : evt;
+                assert evt.getKey() != null : evt;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : 
evts) {
+                assert evt != null;
+                assert evt.getSource() != null : evt;
+                assert evt.getEventType() != null : evt;
+                assert evt.getKey() != null : evt;
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    static class CreateUpdateRemoveExpireListener extends CreateUpdateListener
+        implements CacheEntryRemovedListener<Integer, Integer>, 
CacheEntryExpiredListener<Integer, Integer> {
         /** {@inheritDoc} */
         @Override public void onRemoved(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
             for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : 
evts)
                 onEvent(evt);
         }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : 
evts)
+                onEvent(evt);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java
new file mode 100644
index 0000000..6b7110d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java
@@ -0,0 +1,22 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+/**
+ * Tests expire events when {@link GridCacheConfiguration#isEagerTtl()} is 
disabled.
+ */
+public class IgniteCacheEntryListenerEagerTtlDisabledTest extends 
IgniteCacheEntryListenerTxTest {
+    /** {@inheritDoc} */
+    @Override protected boolean eagerTtl() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
index f40d941..25f954a 100644
--- 
a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
@@ -480,6 +480,19 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
     }
 
     /**
+     * @param key Key.
+     * @param cacheName Cache name.
+     * @return Cache.
+     */
+    protected <K, V> IgniteCache<K, V> primaryCache(Object key, @Nullable 
String cacheName) {
+        ClusterNode node = 
grid(0).cache(cacheName).affinity().mapKeyToNode(key);
+
+        assertNotNull(node);
+
+        return 
grid((String)node.attribute(GridNodeAttributes.ATTR_GRID_NAME)).jcache(cacheName);
+    }
+
+    /**
      * @param cache Cache.
      * @return Collection of keys for which given cache is primary.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/13740b70/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
 
b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
index efe49fb..6c01b2f 100644
--- 
a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
+++ 
b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
@@ -44,6 +44,7 @@ public class GridDataGridTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheEntryListenerTxTest.class);
         suite.addTestSuite(IgniteCacheEntryListenerTxReplicatedTest.class);
         suite.addTestSuite(IgniteCacheEntryListenerTxLocalTest.class);
+        suite.addTestSuite(IgniteCacheEntryListenerEagerTtlDisabledTest.class);
 
         suite.addTest(IgniteCacheExpiryPolicyTestSuite.suite());
 

Reply via email to