# sprint-2 ignore exception from event listener filter

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/68a3b77b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/68a3b77b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/68a3b77b

Branch: refs/heads/ignite-224
Commit: 68a3b77b7492cdf7a098ddd66acdda78ebb75c00
Parents: a4101a5
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Feb 17 11:09:50 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Feb 17 11:09:50 2015 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java |  16 ++-
 .../IgniteCacheEntryListenerAbstractTest.java   | 106 +++++++++++++++++++
 2 files changed, 120 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/68a3b77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index fc9811e..8480211 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.resources.*;
 import org.jdk8.backport.*;
 
 import javax.cache.*;
@@ -651,7 +652,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
                     }
                 }
                 catch (Exception e) {
-                    U.error(log, "CacheEntryCreatedListener failed: " + e);
+                    U.error(log, "CacheEntryListener failed: " + e);
                 }
             }
         }
@@ -683,6 +684,10 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
         /** */
         private byte types;
 
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
         /**
          * For {@link Externalizable}.
          */
@@ -703,7 +708,14 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
 
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<? extends K, ? 
extends V> evt) {
-            return (types & flag(evt.getEventType())) != 0 && (impl == null || 
impl.evaluate(evt));
+            try {
+                return (types & flag(evt.getEventType())) != 0 && (impl == 
null || impl.evaluate(evt));
+            }
+            catch (Exception e) {
+                U.error(log, "CacheEntryEventFilter failed: " + e);
+
+                return true;
+            }
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/68a3b77b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 0c2cb0c..66892ea 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -87,6 +87,69 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
     /**
      * @throws Exception If failed.
      */
+    public void testExceptionIgnored() throws Exception {
+        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() 
{
+                    return new ExceptionListener();
+                }
+            },
+            null,
+            false,
+            false
+        );
+
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        cache.registerCacheEntryListener(lsnrCfg);
+
+        try {
+            for (Integer key : keys()) {
+                log.info("Check listener exceptions are ignored [key=" + key + 
']');
+
+                cache.put(key, key);
+
+                cache.remove(key);
+            }
+        }
+        finally {
+            cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+
+        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() 
{
+                    return new CreateUpdateRemoveExpireListener();
+                }
+            },
+            new Factory<CacheEntryEventFilter<? super Integer, ? super 
Integer>>() {
+                @Override public CacheEntryEventFilter<? super Integer, ? 
super Integer> create() {
+                    return new ExceptionFilter();
+                }
+            },
+            false,
+            false
+        );
+
+        cache.registerCacheEntryListener(lsnrCfg);
+
+        try {
+            for (Integer key : keys()) {
+                log.info("Check filter exceptions are ignored [key=" + key + 
']');
+
+                cache.put(key, key);
+
+                cache.remove(key);
+            }
+        }
+        finally {
+            cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testNoOldValue() throws Exception {
         CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
             new Factory<CacheEntryListener<Integer, Integer>>() {
@@ -960,6 +1023,49 @@ public abstract class 
IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
+    static class ExceptionFilter implements CacheEntryEventFilter<Integer, 
Integer> {
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? 
extends Integer> evt) {
+            throw new RuntimeException("Test filter error.");
+        }
+    }
+
+    /**
+     *
+     */
+    static class ExceptionListener extends CreateUpdateListener
+        implements CacheEntryRemovedListener<Integer, Integer>, 
CacheEntryExpiredListener<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+            error();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+            error();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+            error();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+            error();
+        }
+
+        /**
+         * Throws exception.
+         */
+        private void error() {
+            throw new RuntimeException("Test listener error.");
+        }
+    }
+
+    /**
+     *
+     */
     protected static class ToStringProcessor implements 
EntryProcessor<Integer, Integer, String> {
         /** {@inheritDoc} */
         @Override public String process(MutableEntry<Integer, Integer> e, 
Object... arguments)

Reply via email to