Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-143 bab9e351f -> d0c88b79f


IGNITE-143 - Fixing tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/83cfcc12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/83cfcc12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/83cfcc12

Branch: refs/heads/ignite-143
Commit: 83cfcc127a4074dfda48ea840d09b22840c74f7a
Parents: bab9e35
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Thu Feb 12 16:50:59 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Thu Feb 12 16:50:59 2015 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 37 ++------
 .../continuous/CacheContinuousQueryEntry.java   | 14 ---
 .../continuous/CacheContinuousQueryManager.java | 92 +++++++++++++++-----
 3 files changed, 77 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/83cfcc12/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index e1906e2..c863d77 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1165,11 +1165,8 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                     subjId, null, taskName);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear())) {
-                EventType type = old != null || oldBytes != null ? 
EventType.UPDATED : EventType.CREATED;
-
-                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes, type);
-            }
+            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1328,7 +1325,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 }
 
                 if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                    cctx.continuousQueries().onEntryUpdated(this, key, null, 
null, old, oldBytes, EventType.REMOVED);
+                    cctx.continuousQueries().onEntryUpdated(this, key, null, 
null, old, oldBytes);
 
                 cctx.dataStructures().onEntryUpdated(key, true);
             }
@@ -1637,7 +1634,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes, eventType(op));
+            cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -2210,8 +2207,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 updateMetrics(op, metrics);
 
             if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes,
-                    eventType(op));
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -3218,8 +3214,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 if (!skipQryNtf) {
                     if (!preload && (cctx.isLocal() || cctx.isReplicated() ||
                         cctx.affinity().primary(cctx.localNode(), key, 
topVer)))
-                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, valueBytesUnlocked(), null, null,
-                            EventType.CREATED);
+                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, valueBytesUnlocked(), null, null);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }
@@ -4383,26 +4378,6 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
         return cctx.marshaller().unmarshal(res, ldr);
     }
 
-    /**
-     * @param op Operation.
-     * @return Event type.
-     */
-    private EventType eventType(GridCacheOperation op) {
-        switch (op) {
-            case CREATE:
-                return EventType.CREATED;
-
-            case UPDATE:
-                return EventType.UPDATED;
-
-            case DELETE:
-                return EventType.REMOVED;
-
-            default:
-                throw new IllegalStateException("Invalid operation: " + op);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         // Identity comparison left on purpose.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/83cfcc12/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index c8d9fec..72269c8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -96,20 +96,6 @@ class CacheContinuousQueryEntry<K, V> implements 
GridCacheDeployable, Externaliz
     }
 
     /**
-     * Unmarshals value from bytes if needed.
-     *
-     * @param marsh Marshaller.
-     * @param ldr Class loader.
-     * @throws IgniteCheckedException In case of error.
-     */
-    void initValue(Marshaller marsh, @Nullable ClassLoader ldr) throws 
IgniteCheckedException {
-        assert marsh != null;
-
-        if (newVal == null && newValBytes != null && !newValBytes.isNull())
-            newVal = newValBytes.isPlain() ? (V)newValBytes.get() : 
marsh.<V>unmarshal(newValBytes.get(), ldr);
-    }
-
-    /**
      * @param marsh Marshaller.
      * @throws IgniteCheckedException In case of error.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/83cfcc12/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index ac113c9..93fbbe7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.security.*;
 import org.jdk8.backport.*;
 
+import javax.cache.*;
 import javax.cache.configuration.*;
 import javax.cache.event.*;
 import java.io.*;
@@ -120,7 +121,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, 
GridCacheValueBytes newBytes,
-        V oldVal, GridCacheValueBytes oldBytes, EventType type) throws 
IgniteCheckedException {
+        V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException {
         assert e != null;
         assert key != null;
 
@@ -134,20 +135,41 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
         if (F.isEmpty(lsnrCol))
             return;
 
-        oldVal = cctx.unwrapTemporary(oldVal);
+        boolean hasNewVal = newVal != null || (newBytes != null && 
!newBytes.isNull());
+        boolean hasOldVal = oldVal != null || (oldBytes != null && 
!oldBytes.isNull());
 
-        CacheContinuousQueryEntry<K, V> e0 = new 
CacheContinuousQueryEntry<>(key, newVal, newBytes, oldVal, oldBytes);
+        assert hasNewVal || hasOldVal;
 
-        e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader());
+        EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : 
UPDATED;
 
-        CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>(
-            cctx.kernalContext().grid().jcache(cctx.name()), type, e0);
+        boolean initialized = false;
 
         boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1);
         boolean recordIgniteEvt = !e.isInternal() && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-        for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values())
+        for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
+            if (!initialized) {
+                if (lsnr.oldValueRequired()) {
+                    oldVal = cctx.unwrapTemporary(oldVal);
+
+                    if (oldVal == null && oldBytes != null && 
!oldBytes.isNull())
+                        oldVal = oldBytes.isPlain() ? (V)oldBytes.get() : 
cctx.marshaller().<V>unmarshal(oldBytes.get
+                            (), cctx.deploy().globalLoader());
+                }
+
+                if (newVal == null && newBytes != null && !newBytes.isNull())
+                    newVal = newBytes.isPlain() ? (V)newBytes.get() : 
cctx.marshaller().<V>unmarshal(newBytes.get(),
+                        cctx.deploy().globalLoader());
+            }
+
+            CacheContinuousQueryEntry<K, V> e0 = new 
CacheContinuousQueryEntry<>(key, newVal, newBytes,
+                lsnr.oldValueRequired() ? oldVal : null, 
lsnr.oldValueRequired() ? oldBytes : null);
+
+            CacheContinuousQueryEvent<K, V> evt = new 
CacheContinuousQueryEvent<>(
+                cctx.kernalContext().cache().publicJCache(cctx.name()), 
evtType, e0);
+
             lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+        }
     }
 
     /**
@@ -155,8 +177,10 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @param key Key.
      * @param oldVal Old value.
      * @param oldBytes Old value bytes.
+     * @throws IgniteCheckedException In case of error.
      */
-    public void onEntryExpired(GridCacheEntryEx<K, V> e, K key, V oldVal, 
GridCacheValueBytes oldBytes) {
+    public void onEntryExpired(GridCacheEntryEx<K, V> e, K key, V oldVal, 
GridCacheValueBytes oldBytes)
+        throws IgniteCheckedException {
         assert e != null;
         assert key != null;
 
@@ -169,16 +193,30 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
             return;
 
         if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), 
key, -1)) {
-            CacheContinuousQueryEntry<K, V> e0 = new 
CacheContinuousQueryEntry<>(key, null, null, oldVal, oldBytes);
-
-            CacheContinuousQueryEvent<K, V> evt = new 
CacheContinuousQueryEvent<>(
-                cctx.kernalContext().grid().jcache(cctx.name()), EXPIRED, e0);
-
             boolean primary = cctx.affinity().primary(cctx.localNode(), key, 
-1);
             boolean recordIgniteEvt = 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-            for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values())
+            boolean initialized = false;
+
+            for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
+                if (!initialized) {
+                    if (lsnr.oldValueRequired()) {
+                        oldVal = cctx.unwrapTemporary(oldVal);
+
+                        if (oldVal == null && oldBytes != null && 
!oldBytes.isNull())
+                            oldVal = oldBytes.isPlain() ? (V)oldBytes.get() :
+                                cctx.marshaller().<V>unmarshal(oldBytes.get(), 
cctx.deploy().globalLoader());
+                    }
+                }
+
+                CacheContinuousQueryEntry<K, V> e0 = new 
CacheContinuousQueryEntry<>(key, null, null,
+                    lsnr.oldValueRequired() ? oldVal : null, 
lsnr.oldValueRequired() ? oldBytes : null);
+
+                CacheContinuousQueryEvent<K, V> evt = new 
CacheContinuousQueryEvent<>(
+                    cctx.kernalContext().cache().publicJCache(cctx.name()), 
EXPIRED, e0);
+
                 lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+            }
         }
     }
 
@@ -251,7 +289,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
         JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr);
 
         if (old != null)
-            throw new IgniteCheckedException("Listener is already registered 
for configuration: " + cfg);
+            throw new IllegalArgumentException("Listener is already registered 
for configuration: " + cfg);
 
         try {
             lsnr.execute();
@@ -443,11 +481,12 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
             if (types == 0)
                 throw new IgniteCheckedException("Listener must implement one 
of CacheEntryListener sub-interfaces.");
 
-            CacheEntryUpdatedListener<K, V> locLsnr = 
(CacheEntryUpdatedListener<K, V>)new JCacheQueryLocalListener<>(
-                locLsnrImpl);
+            CacheEntryUpdatedListener<K, V> locLsnr = 
(CacheEntryUpdatedListener<K, V>)new JCacheQueryLocalListener(
+                locLsnrImpl, 
cctx.kernalContext().cache().publicJCache(cctx.name()));
 
             CacheEntryEventFilter<K, V> rmtFilter = (CacheEntryEventFilter<K, 
V>)new JCacheQueryRemoteFilter<>(
-                cfg.getCacheEntryEventFilterFactory().create(), types);
+                cfg.getCacheEntryEventFilterFactory() != null ? 
cfg.getCacheEntryEventFilterFactory().create() : null,
+                types);
 
             routineId = executeQuery0(
                 locLsnr,
@@ -481,20 +520,27 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      */
     private static class JCacheQueryLocalListener<K, V> implements 
CacheEntryUpdatedListener<K, V> {
         /** */
-        private CacheEntryListener<K, V> impl;
+        private final CacheEntryListener<K, V> impl;
+
+        /** */
+        private final Cache<K, V> cache;
 
         /**
          * @param impl Listener.
          */
-        private JCacheQueryLocalListener(CacheEntryListener<K, V> impl) {
+        JCacheQueryLocalListener(CacheEntryListener<K, V> impl, Cache<K, V> 
cache) {
             assert impl != null;
+            assert cache != null;
 
             this.impl = impl;
+            this.cache = cache;
         }
 
         /** {@inheritDoc} */
         @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, 
? extends V>> evts) {
             for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
+
+
                 switch (evt.getEventType()) {
                     case CREATED:
                         assert impl instanceof CacheEntryCreatedListener;
@@ -534,11 +580,15 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
          * @param evt Event.
          * @return Singleton iterable.
          */
+        @SuppressWarnings("unchecked")
         private Iterable<CacheEntryEvent<? extends K, ? extends V>> singleton(
             CacheEntryEvent<? extends K, ? extends V> evt) {
+            assert evt instanceof CacheContinuousQueryEvent;
+
             Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new 
ArrayList<>(1);
 
-            evts.add(evt);
+            evts.add(new CacheContinuousQueryEvent<>(cache, evt.getEventType(),
+                ((CacheContinuousQueryEvent<? extends K, ? extends 
V>)evt).entry()));
 
             return evts;
         }

Reply via email to