ignite-43

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/48dd7fbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/48dd7fbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/48dd7fbf

Branch: refs/heads/ignite-99
Commit: 48dd7fbf68771633a4db5986a0173eb82cf7ea6e
Parents: 311bbdd
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Jan 13 10:14:20 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Jan 13 11:55:54 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   2 +-
 .../processors/cache/GridCacheProcessor.java    |   1 -
 .../GridCacheDataStructuresManager.java         |   3 +-
 .../GridCacheContinuousQueryAdapter.java        |  44 +++----
 .../GridCacheContinuousQueryEntry.java          |  46 +++++--
 .../GridCacheContinuousQueryHandler.java        |  52 +++++---
 .../GridCacheContinuousQueryHandlerV2.java      |  83 ------------
 .../GridCacheContinuousQueryListener.java       |   3 +-
 .../GridCacheContinuousQueryManager.java        | 126 +++++++++++--------
 .../continuous/GridContinuousProcessor.java     |   4 +-
 .../service/GridServiceProcessor.java           |   4 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 118 +++++++++++------
 12 files changed, 250 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index aee3806..b80f729 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -3016,7 +3016,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 drReplicate(drType, val, valBytes, ver);
 
                 if (!skipQryNtf) {
-                    if (cctx.affinity().primary(cctx.localNode(), key, topVer) 
|| cctx.isReplicated()) {
+                    if (cctx.isReplicated() || 
cctx.affinity().primary(cctx.localNode(), key, topVer)) {
                         cctx.continuousQueries().onEntryUpdate(this,
                             key,
                             val,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index 46755c6..6dfd00a 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -43,7 +43,6 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
-import javax.cache.configuration.*;
 import javax.management.*;
 import java.util.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
index 384329b..c756492 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
@@ -682,7 +682,8 @@ public final class GridCacheDataStructuresManager<K, V> 
extends GridCacheManager
             queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? 
cctx.grid().forLocal() : null,
                 true,
                 false,
-                false);
+                false,
+                true);
         }
 
         GridCacheQueueProxy queue = queuesMap.get(header.id());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
index 06eb2a8..f72f913 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
@@ -213,12 +213,12 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements GridCacheContinuou
 
     /** {@inheritDoc} */
     @Override public void execute() throws IgniteCheckedException {
-        execute(null, false, false, false);
+        execute(null, false, false, false, true);
     }
 
     /** {@inheritDoc} */
     @Override public void execute(@Nullable ClusterGroup prj) throws 
IgniteCheckedException {
-        execute(prj, false, false, false);
+        execute(prj, false, false, false, true);
     }
 
     /**
@@ -228,12 +228,14 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements GridCacheContinuou
      * @param internal If {@code true} then query notified about internal 
entries updates.
      * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
      * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
+     * @param oldVal {@code True} if old value is required.
      * @throws IgniteCheckedException If failed.
      */
     public void execute(@Nullable ClusterGroup prj,
         boolean internal,
         boolean entryLsnr,
-        boolean sync) throws IgniteCheckedException {
+        boolean sync,
+        boolean oldVal) throws IgniteCheckedException {
         if (locCb == null)
             throw new IllegalStateException("Mandatory local callback is not 
set for the query: " + this);
 
@@ -276,29 +278,19 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements GridCacheContinuou
 
             guard.block();
 
-            GridContinuousHandler hnd;
-
-            if (ctx.kernalContext().security().enabled()) {
-                hnd = new GridCacheContinuousQueryHandlerV2<>(ctx.name(),
-                    topic,
-                    locCb,
-                    rmtFilter,
-                    prjPred,
-                    internal,
-                    entryLsnr,
-                    sync,
-                    ctx.kernalContext().job().currentTaskNameHash());
-            }
-            else {
-                hnd = new GridCacheContinuousQueryHandler<>(ctx.name(),
-                    topic,
-                    locCb,
-                    rmtFilter,
-                    prjPred,
-                    internal,
-                    entryLsnr,
-                    sync);
-            }
+            int taskNameHash =
+                ctx.kernalContext().security().enabled() ? 
ctx.kernalContext().job().currentTaskNameHash() : 0;
+
+            GridContinuousHandler hnd = new 
GridCacheContinuousQueryHandler<>(ctx.name(),
+                topic,
+                locCb,
+                rmtFilter,
+                prjPred,
+                internal,
+                entryLsnr,
+                sync,
+                oldVal,
+                taskNameHash);
 
             routineId = ctx.kernalContext().continuous().startRoutine(hnd,
                 bufSize,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
index 3c5265c..4738961 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
@@ -21,6 +21,7 @@ import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -37,10 +38,13 @@ public class GridCacheContinuousQueryEntry<K, V> implements 
GridCacheEntry<K, V>
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Event type enum values. */
+    private static final EventType[] EVT_TYPE_VALS = EventType.values();
+
     /** Cache context. */
     @SuppressWarnings("TransientFieldNotInitialized")
     @GridToStringExclude
-    private final transient GridCacheContext ctx;
+    private final transient GridCacheContext<K, V> ctx;
 
     /** Cache entry. */
     @SuppressWarnings("TransientFieldNotInitialized")
@@ -78,7 +82,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements 
GridCacheEntry<K, V>
     private GridDeploymentInfo depInfo;
 
     /** */
-    private boolean expired;
+    private EventType evtType;
 
     /**
      * Required by {@link Externalizable}.
@@ -88,7 +92,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements 
GridCacheEntry<K, V>
         impl = null;
     }
 
-    /*
+    /**
      * @param ctx Cache context.
      * @param impl Cache entry.
      * @param key Key.
@@ -96,7 +100,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements 
GridCacheEntry<K, V>
      * @param newValBytes Value bytes.
      * @param oldVal Old value.
      * @param oldValBytes Old value bytes.
-     * @param expired {@code True} if created for expired entry.
+     * @param evtType Event type.
      */
     GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx,
         GridCacheEntry<K, V> impl,
@@ -105,10 +109,11 @@ public class GridCacheContinuousQueryEntry<K, V> 
implements GridCacheEntry<K, V>
         @Nullable GridCacheValueBytes newValBytes,
         @Nullable V oldVal,
         @Nullable GridCacheValueBytes oldValBytes,
-        boolean expired) {
+        EventType evtType) {
         assert ctx != null;
         assert impl != null;
         assert key != null;
+        assert evtType != null;
 
         this.ctx = ctx;
         this.impl = impl;
@@ -117,14 +122,35 @@ public class GridCacheContinuousQueryEntry<K, V> 
implements GridCacheEntry<K, V>
         this.newValBytes = newValBytes;
         this.oldVal = oldVal;
         this.oldValBytes = oldValBytes;
-        this.expired = expired;
+        this.evtType = evtType;
+    }
+
+    /**
+     * @return Cache entry.
+     */
+    GridCacheEntry<K, V> entry() {
+        return impl;
+    }
+
+    /**
+     * @return Cache context.
+     */
+    GridCacheContext<K, V> context() {
+        return ctx;
+    }
+
+    /**
+     * @return New value bytes.
+     */
+    GridCacheValueBytes newValueBytes() {
+        return newValBytes;
     }
 
     /**
      * @return {@code True} if entry expired.
      */
-    public boolean expired() {
-        return expired;
+    public EventType eventType() {
+        return evtType;
     }
 
     /**
@@ -729,7 +755,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements 
GridCacheEntry<K, V>
             out.writeObject(oldVal);
         }
 
-        out.writeBoolean(expired);
+        out.writeByte((byte)evtType.ordinal());
     }
 
     /** {@inheritDoc} */
@@ -755,7 +781,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements 
GridCacheEntry<K, V>
             oldVal = (V)in.readObject();
         }
 
-        expired = in.readBoolean();
+        evtType = EVT_TYPE_VALS[in.readByte()];
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
index 9162b89..e9b5678 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
@@ -68,6 +68,12 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
     /** Synchronous listener flag. */
     private boolean sync;
 
+    /** {@code True} if old value is required. */
+    private boolean oldVal;
+
+    /** Task name hash code. */
+    private int taskHash;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -84,6 +90,8 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
      * @param internal If {@code true} then query is notified about internal 
entries updates.
      * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
      * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
+     * @param oldVal {@code True} if old value is required.
+     * @param taskHash Task name hash code.
      */
     GridCacheContinuousQueryHandler(@Nullable String cacheName,
         Object topic,
@@ -92,9 +100,12 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred,
         boolean internal,
         boolean entryLsnr,
-        boolean sync) {
+        boolean sync,
+        boolean oldVal,
+        int taskHash) {
         assert topic != null;
         assert cb != null;
+        assert !sync || entryLsnr;
 
         this.cacheName = cacheName;
         this.topic = topic;
@@ -104,6 +115,8 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         this.internal = internal;
         this.entryLsnr = entryLsnr;
         this.sync = sync;
+        this.oldVal = oldVal;
+        this.taskHash = taskHash;
     }
 
     /** {@inheritDoc} */
@@ -156,9 +169,7 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
                 }
             }
 
-            @Override public void 
onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e,
-                boolean recordEvt,
-                boolean sync) {
+            @Override public void 
onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) {
                 boolean notify;
 
                 GridCacheFlag[] f = cacheContext(ctx).forceLocalRead();
@@ -172,6 +183,17 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
                 }
 
                 if (notify) {
+                    if (!oldVal && e.getOldValue() != null) {
+                        e = new GridCacheContinuousQueryEntry<>(e.context(),
+                            e.entry(),
+                            e.getKey(),
+                            e.getValue(),
+                            e.newValueBytes(),
+                            null,
+                            null,
+                            e.eventType());
+                    }
+
                     if (loc) {
                         if (!cb.apply(nodeId,
                             
F.<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>>asList(e)))
@@ -244,21 +266,11 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
             }
 
             @Nullable private String taskName() {
-                String taskName = null;
-
-                if (ctx.security().enabled()) {
-                    assert GridCacheContinuousQueryHandler.this instanceof 
GridCacheContinuousQueryHandlerV2;
-
-                    int taskHash = 
((GridCacheContinuousQueryHandlerV2)GridCacheContinuousQueryHandler.this).taskHash();
-
-                    taskName = ctx.task().resolveTaskName(taskHash);
-                }
-
-                return taskName;
+                return ctx.security().enabled() ? 
ctx.task().resolveTaskName(taskHash) : null;
             }
         };
 
-        return manager(ctx).registerListener(routineId, lsnr, internal, 
entryLsnr, sync);
+        return manager(ctx).registerListener(routineId, lsnr, internal, 
entryLsnr);
     }
 
     /** {@inheritDoc} */
@@ -393,6 +405,10 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         out.writeBoolean(entryLsnr);
 
         out.writeBoolean(sync);
+
+        out.writeBoolean(oldVal);
+
+        out.writeInt(taskHash);
     }
 
     /** {@inheritDoc} */
@@ -420,6 +436,10 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         entryLsnr = in.readBoolean();
 
         sync = in.readBoolean();
+
+        oldVal = in.readBoolean();
+
+        taskHash = in.readInt();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
deleted file mode 100644
index b4216d5..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.query.continuous;
-
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry;
-import org.jetbrains.annotations.*;
-
-import javax.cache.event.*;
-import java.io.*;
-import java.util.*;
-
-/**
- * Continuous query handler extension.
- */
-public class GridCacheContinuousQueryHandlerV2<K, V> extends 
GridCacheContinuousQueryHandler<K, V> {
-    /** */
-    private static final long serialVersionUID = 2180994610452685320L;
-
-    /** Task hash. */
-    private int taskHash;
-
-    /**
-     * For {@link Externalizable}.
-     */
-    public GridCacheContinuousQueryHandlerV2() {
-        // No-op.
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @param topic Topic for ordered messages.
-     * @param cb Local callback.
-     * @param filter Filter.
-     * @param prjPred Projection predicate.
-     * @param internal If {@code true} then query is notified about internal 
entries updates.
-     * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
-     * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
-     * @param taskHash Task hash.
-     */
-    public GridCacheContinuousQueryHandlerV2(@Nullable String cacheName,
-        Object topic,
-        IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, 
V>>> cb,
-        @Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter,
-        @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred,
-        boolean internal,
-        boolean entryLsnr,
-        boolean sync,
-        int taskHash) {
-        super(cacheName, topic, cb, filter, prjPred, internal, entryLsnr, 
sync);
-
-        this.taskHash = taskHash;
-    }
-
-    /**
-     * @return Task hash.
-     */
-    public int taskHash() {
-        return taskHash;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        super.writeExternal(out);
-
-        out.writeInt(taskHash);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        super.readExternal(in);
-
-        taskHash = in.readInt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
index 2707428..dd6b5f9 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
@@ -23,7 +23,6 @@ interface GridCacheContinuousQueryListener<K, V> {
      *
      * @param e Entry.
      * @param recordEvt Whether to record event.
-     * @param sync {@code True} if event is synchronous.
      */
-    public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt, boolean sync);
+    public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
index 8bcbceb..56c7020 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
@@ -145,6 +145,9 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
 
         oldVal = cctx.unwrapTemporary(oldVal);
 
+        EventType evtType = newVal == null ? REMOVED :
+            ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? 
UPDATED : CREATED));
+
         GridCacheContinuousQueryEntry<K, V> e0 = new 
GridCacheContinuousQueryEntry<>(
             cctx,
             e.wrap(false),
@@ -153,7 +156,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
             newBytes,
             oldVal,
             oldBytes,
-            false);
+            evtType);
 
         e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader());
 
@@ -194,7 +197,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
                 null,
                 oldVal,
                 oldBytes,
-                true);
+                EXPIRED);
 
             for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
                 if (!lsnr.entryListener())
@@ -254,7 +257,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
 
             qry.remoteFilter(fltr);
 
-            qry.execute(null, false, true, lsnrCfg.isSynchronous());
+            qry.execute(null, false, true, lsnrCfg.isSynchronous(), 
lsnrCfg.isOldValueRequired());
         }
         catch (IgniteCheckedException e) {
             lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to 
execute it.
@@ -281,17 +284,13 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
      * @param lsnr Listener.
      * @param internal Internal flag.
      * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
-     * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
      * @return Whether listener was actually registered.
      */
     boolean registerListener(UUID lsnrId,
         GridCacheContinuousQueryListener<K, V> lsnr,
         boolean internal,
-        boolean entryLsnr,
-        boolean sync) {
-        assert !sync || entryLsnr;
-
-        ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr, sync);
+        boolean entryLsnr) {
+        ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr);
 
         boolean added;
 
@@ -349,6 +348,8 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
             entries = internal ? cctx.cache().primaryEntrySetx() :
                 cctx.cache().primaryEntrySet();
 
+        boolean evt = !internal && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
         for (GridCacheEntry<K, V> e : entries) {
             GridCacheContinuousQueryEntry<K, V> qryEntry = new 
GridCacheContinuousQueryEntry<>(cctx,
                 e,
@@ -357,9 +358,9 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
                 null,
                 null,
                 null,
-                false);
+                CREATED);
 
-            info.onIterate(qryEntry, !internal && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ));
+            info.onIterate(qryEntry, evt);
         }
 
         info.flushPending();
@@ -378,18 +379,13 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
         /** */
         private final boolean entryLsnr;
 
-        /** */
-        private final boolean sync;
-
         /**
          * @param lsnr Listener.
          * @param entryLsnr {@code True} if listener created for {@link 
CacheEntryListener}.
-         * @param sync {@code True} if listener is synchronous.
          */
-        private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, 
boolean entryLsnr, boolean sync) {
+        private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, 
boolean entryLsnr) {
             this.lsnr = lsnr;
             this.entryLsnr = entryLsnr;
-            this.sync = sync;
 
             if (!entryLsnr)
                 pending = new LinkedList<>();
@@ -411,7 +407,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
             }
 
             if (notifyLsnr)
-                lsnr.onEntryUpdate(e, recordEvt, sync);
+                lsnr.onEntryUpdate(e, recordEvt);
         }
 
         /**
@@ -419,7 +415,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
          * @param recordEvt Whether to record event.
          */
         void onIterate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt) {
-            lsnr.onEntryUpdate(e, recordEvt, sync);
+            lsnr.onEntryUpdate(e, recordEvt);
         }
 
         /**
@@ -435,7 +431,7 @@ public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapt
             }
 
             for (PendingEntry<K, V> e : pending0)
-                lsnr.onEntryUpdate(e.entry, e.recordEvt, sync);
+                lsnr.onEntryUpdate(e.entry, e.recordEvt);
         }
 
         /**
@@ -546,44 +542,47 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
         @SuppressWarnings("unchecked")
         @Override public boolean 
apply(org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K1, V1> 
entry) {
             try {
-                EventType evtType;
+                EventType evtType = 
(((GridCacheContinuousQueryEntry)entry).eventType());
 
-                if (entry.getValue() == null) {
-                    if (((GridCacheContinuousQueryEntry)entry).expired()) { // 
Expire.
+                switch (evtType) {
+                    case EXPIRED:
                         if (!expire)
                             return false;
 
-                        evtType = EXPIRED;
-                    }
-                    else { // Remove.
+                        break;
+
+                    case REMOVED:
                         if (!rmv)
                             return false;
 
-                        evtType = REMOVED;
-                    }
-                }
-                else {
-                    if (entry.getOldValue() != null) { // Update.
-                        if (!update)
-                            return false;
+                        break;
 
-                        evtType = UPDATED;
-                    }
-                    else { // Create.
+                    case CREATED:
                         if (!create)
                             return false;
 
-                        evtType = CREATED;
-                    }
+                        break;
+
+                    case UPDATED:
+                        if (!update)
+                            return false;
+
+                        break;
+
+                    default:
+                        assert false : evtType;
                 }
 
+                if (fltr == null)
+                    return true;
+
                 if (cache == null) {
                     cache = ignite.jcache(cacheName);
 
                     assert cache != null : cacheName;
                 }
 
-                return fltr == null || fltr.evaluate(new 
org.apache.ignite.cache.CacheEntryEvent(cache, evtType, entry));
+                return fltr.evaluate(new 
org.apache.ignite.cache.CacheEntryEvent(cache, evtType, entry));
             }
             catch (CacheEntryListenerException e) {
                 LT.warn(ignite.log(), e, "Cache entry event filter error: " + 
e);
@@ -690,42 +689,59 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
 
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
-        @Override public boolean apply(UUID uuid, 
Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> 
entries) {
+        @Override public boolean apply(UUID uuid,
+            
Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> 
entries) {
             for (org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry 
entry : entries) {
                 try {
-                    if (entry.getValue() == null) { // Remove.
-                        if (((GridCacheContinuousQueryEntry)entry).expired()) 
{ // Expire.
+                    EventType evtType = 
(((GridCacheContinuousQueryEntry)entry).eventType());
+
+                    switch (evtType) {
+                        case EXPIRED: {
                             assert expireLsnr != null;
 
                             org.apache.ignite.cache.CacheEntryEvent evt0 =
                                 new 
org.apache.ignite.cache.CacheEntryEvent(cache, EXPIRED, entry);
 
                             expireLsnr.onExpired(Collections.singleton(evt0));
+
+                            break;
                         }
-                        else {
+
+                        case REMOVED: {
                             assert rmvLsnr != null;
 
                             org.apache.ignite.cache.CacheEntryEvent evt0 =
                                 new 
org.apache.ignite.cache.CacheEntryEvent(cache, REMOVED, entry);
 
                             rmvLsnr.onRemoved(Collections.singleton(evt0));
+
+                            break;
                         }
-                    }
-                    else if (entry.getOldValue() != null) { // Update.
-                        assert updateLsnr != null;
 
-                        org.apache.ignite.cache.CacheEntryEvent evt0 =
-                            new org.apache.ignite.cache.CacheEntryEvent(cache, 
UPDATED, entry);
+                        case UPDATED: {
+                            assert updateLsnr != null;
 
-                        updateLsnr.onUpdated(Collections.singleton(evt0));
-                    }
-                    else { // Create.
-                        assert createLsnr != null;
+                            org.apache.ignite.cache.CacheEntryEvent evt0 =
+                                new 
org.apache.ignite.cache.CacheEntryEvent(cache, UPDATED, entry);
 
-                        org.apache.ignite.cache.CacheEntryEvent evt0 =
-                            new org.apache.ignite.cache.CacheEntryEvent(cache, 
CREATED, entry);
+                            updateLsnr.onUpdated(Collections.singleton(evt0));
+
+                            break;
+                        }
+
+                        case CREATED: {
+                            assert createLsnr != null;
+
+                            org.apache.ignite.cache.CacheEntryEvent evt0 =
+                                new 
org.apache.ignite.cache.CacheEntryEvent(cache, CREATED, entry);
+
+                            createLsnr.onCreated(Collections.singleton(evt0));
+
+                            break;
+                        }
 
-                        createLsnr.onCreated(Collections.singleton(evt0));
+                        default:
+                            assert false : evtType;
                     }
                 }
                 catch (CacheEntryListenerException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
index e7f8619..0315e97 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
@@ -898,7 +898,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             LocalRoutineInfo routine = locInfos.get(routineId);
 
             if (routine != null)
-                routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>) 
msg.data(), ctx);
+                routine.hnd.notifyCallback(nodeId, routineId, 
(Collection<?>)msg.data(), ctx);
         }
         finally {
             if (msg.futureId() != null) {
@@ -1726,7 +1726,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Future for stop routine.
+     * Synchronous message acknowledgement future.
      */
     private static class SyncMessageAckFuture extends 
GridFutureAdapter<Object> {
         /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
index 5d9edc9..8cb3e1d 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
@@ -124,13 +124,13 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
 
             cfgQry.localCallback(new DeploymentListener());
 
-            cfgQry.execute(ctx.grid().forLocal(), true, false, false);
+            cfgQry.execute(ctx.grid().forLocal(), true, false, false, true);
 
             assignQry = (GridCacheContinuousQueryAdapter<Object, 
Object>)cache.queries().createContinuousQuery();
 
             assignQry.localCallback(new AssignmentListener());
 
-            assignQry.execute(ctx.grid().forLocal(), true, false, false);
+            assignQry.execute(ctx.grid().forLocal(), true, false, false, true);
         }
         finally {
             if (ctx.deploy().enabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/48dd7fbf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 04867d9..b35e1a8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -12,6 +12,7 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
+import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;
 import org.jetbrains.annotations.*;
@@ -27,7 +28,6 @@ import java.util.concurrent.atomic.*;
 import static java.util.concurrent.TimeUnit.*;
 import static javax.cache.event.EventType.*;
 import static org.gridgain.grid.cache.GridCacheMode.*;
-import static org.gridgain.grid.cache.GridCachePreloadMode.*;
 
 /**
  *
@@ -46,7 +46,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
     private Integer lastKey = 0;
 
     /** */
-    private CacheEntryListenerConfiguration lsnrCfg;
+    private CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg;
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
@@ -58,14 +58,43 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
 
         cfg.setEagerTtl(eagerTtl());
 
-        cfg.setPreloadMode(SYNC);
-
         return cfg;
     }
 
     /**
      * @throws Exception If failed.
      */
+    public void testNoOldValue() throws Exception {
+        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() 
{
+                    return new CreateUpdateRemoveExpireListener();
+                }
+            },
+            null,
+            false,
+            true
+        );
+
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        try {
+            for (Integer key : keys()) {
+                log.info("Check create/update/remove/expire events, no old 
value [key=" + key + ']');
+
+                cache.registerCacheEntryListener(lsnrCfg);
+
+                checkEvents(cache, lsnrCfg, key, true, true, true, true, 
false);
+            }
+        }
+        finally {
+            cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSynchronousEvents() throws Exception {
         final CacheEntryCreatedListener<Integer, Integer> lsnr = new 
CreateUpdateRemoveExpireListener() {
             @Override public void onRemoved(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
@@ -208,6 +237,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
                 cache.putAll(vals);
 
             fut.get();
+
+            log.info("Update one more time.");
+
+            cache.putAll(vals);
         }
         finally {
             stopGrid(gridCount());
@@ -223,7 +256,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
      */
     private void syncEvent(Integer key, Integer val, IgniteCache<Integer, 
Integer> cache, int expEvts)
         throws Exception {
-        evts = new ArrayList<>();
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? 
extends Integer, ? extends Integer>>());
 
         evtsLatch = new CountDownLatch(expEvts);
 
@@ -296,7 +329,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         Map<Integer, Integer> vals = new HashMap<>();
 
         for (int i = 0; i < 100; i++)
-            vals.put(i + 1_000_000, i);
+            vals.put(i + 2_000_000, i);
 
         cache.putAll(vals); // Put some data in cache to make sure events are 
not generated for existing entries.
 
@@ -386,7 +419,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
 
             log.info("Check create/update/remove events for listener in 
configuration [key=" + key + ']');
 
-            checkEvents(cache, lsnrCfg, key, true, true, true, true);
+            checkEvents(cache, lsnrCfg, key, true, true, true, true, true);
         }
         finally {
             stopGrid(gridCount());
@@ -453,7 +486,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
 
         cache.registerCacheEntryListener(lsnrCfg);
 
-        checkEvents(cache, lsnrCfg, key, create, update, rmv, expire);
+        try {
+            checkEvents(cache, lsnrCfg, key, create, update, rmv, expire, 
true);
+        }
+        finally {
+            cache.deregisterCacheEntryListener(lsnrCfg);
+        }
     }
 
     /**
@@ -461,8 +499,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
      * @param vals Values in cache.
      * @throws Exception If failed.
      */
-    private void checkFilter(IgniteCache<Integer, Integer> cache, Map<Integer, 
Integer> vals) throws Exception {
-        evts = new ArrayList<>();
+    private void checkFilter(final IgniteCache<Integer, Integer> cache, 
Map<Integer, Integer> vals) throws Exception {
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? 
extends Integer, ? extends Integer>>());
 
         final int expEvts = (vals.size() / 2) * 4; // Remove, create, update 
and expire for half of modified entries.
 
@@ -472,19 +510,23 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cache.putAll(vals);
 
-        Map<Integer, Integer> newVals = new HashMap<>();
+        final Map<Integer, Integer> newVals = new HashMap<>();
 
         for (Integer key : vals.keySet())
             newVals.put(key, -1);
 
-        cache.withExpiryPolicy(new ModifiedExpiryPolicy(new 
Duration(MILLISECONDS, 100))).putAll(newVals);
+        cache.withExpiryPolicy(new ModifiedExpiryPolicy(new 
Duration(MILLISECONDS, 500))).putAll(newVals);
 
-        U.sleep(200);
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (Integer key : newVals.keySet()) {
+                    if (primaryCache(key, cache.getName()).get(key) != null)
+                        return false;
+                }
 
-        if (!eagerTtl()) { // Provoke expire events if eager ttl is disabled.
-            for (Integer key : newVals.keySet())
-                assertNull(primaryCache(key, cache.getName()).get(key));
-        }
+                return true;
+            }
+        }, 5000);
 
         evtsLatch.await(5000, MILLISECONDS);
 
@@ -568,6 +610,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
      * @param update {@code True} if listens for update events.
      * @param rmv {@code True} if listens for remove events.
      * @param expire {@code True} if listens for expire events.
+     * @param oldVal {@code True} if old value should be provided for event.
      * @throws Exception If failed.
      */
     private void checkEvents(
@@ -577,7 +620,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         boolean create,
         boolean update,
         boolean rmv,
-        boolean expire) throws Exception {
+        boolean expire,
+        boolean oldVal) throws Exception {
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
                 cache.registerCacheEntryListener(lsnrCfg);
@@ -602,7 +646,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
         if (expire)
             expEvts += 2;
 
-        evts = new ArrayList<>();
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? 
extends Integer, ? extends Integer>>());
 
         evtsLatch = new CountDownLatch(expEvts);
 
@@ -659,32 +703,32 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         if (update) {
             for (int i = 0; i < UPDATES; i++)
-                checkEvent(iter, key, UPDATED, i + 1, i);
+                checkEvent(iter, key, UPDATED, i + 1, oldVal ? i : null);
         }
 
         if (rmv)
-            checkEvent(iter, key, REMOVED, null, UPDATES);
+            checkEvent(iter, key, REMOVED, null, oldVal ? UPDATES : null);
 
         if (create)
             checkEvent(iter, key, CREATED, 10, null);
 
         if (expire)
-            checkEvent(iter, key, EXPIRED, null, 10);
+            checkEvent(iter, key, EXPIRED, null, oldVal ? 10 : null);
 
         if (create)
             checkEvent(iter, key, CREATED, 1, null);
 
         if (update)
-            checkEvent(iter, key, UPDATED, 2, 1);
+            checkEvent(iter, key, UPDATED, 2, oldVal ? 1 : null);
 
         if (rmv)
-            checkEvent(iter, key, REMOVED, null, 2);
+            checkEvent(iter, key, REMOVED, null, oldVal ? 2 : null);
 
         if (create)
             checkEvent(iter, key, CREATED, 20, null);
 
         if (expire)
-            checkEvent(iter, key, EXPIRED, null, 20);
+            checkEvent(iter, key, EXPIRED, null, oldVal ? 20 : null);
 
         assertEquals(0, evts.size());
 
@@ -786,10 +830,10 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     private static void onEvent(CacheEntryEvent<? extends Integer, ? extends 
Integer> evt) {
         // System.out.println("Received event [evt=" + evt + ", thread=" + 
Thread.currentThread().getName() + ']');
 
-        assert evt != null;
-        assert evt.getSource() != null : evt;
-        assert evt.getEventType() != null : evt;
-        assert evt.getKey() != null : evt;
+        assertNotNull(evt);
+        assertNotNull(evt.getSource());
+        assertNotNull(evt.getEventType());
+        assertNotNull(evt.getKey());
 
         evts.add(evt);
 
@@ -847,20 +891,20 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         /** {@inheritDoc} */
         @Override public void onCreated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
             for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : 
evts) {
-                assert evt != null;
-                assert evt.getSource() != null : evt;
-                assert evt.getEventType() != null : evt;
-                assert evt.getKey() != null : evt;
+                assertNotNull(evt);
+                assertNotNull(evt.getSource());
+                assertNotNull(evt.getEventType());
+                assertNotNull(evt.getKey());
             }
         }
 
         /** {@inheritDoc} */
         @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
             for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : 
evts) {
-                assert evt != null;
-                assert evt.getSource() != null : evt;
-                assert evt.getEventType() != null : evt;
-                assert evt.getKey() != null : evt;
+                assertNotNull(evt);
+                assertNotNull(evt.getSource());
+                assertNotNull(evt.getEventType());
+                assertNotNull(evt.getKey());
             }
         }
     }

Reply via email to