# sprint-2 ignore exception from event listener filter
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/68a3b77b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/68a3b77b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/68a3b77b Branch: refs/heads/ignite-223 Commit: 68a3b77b7492cdf7a098ddd66acdda78ebb75c00 Parents: a4101a5 Author: sboikov <sboi...@gridgain.com> Authored: Tue Feb 17 11:09:50 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Feb 17 11:09:50 2015 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryManager.java | 16 ++- .../IgniteCacheEntryListenerAbstractTest.java | 106 +++++++++++++++++++ 2 files changed, 120 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/68a3b77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index fc9811e..8480211 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.security.*; +import org.apache.ignite.resources.*; import org.jdk8.backport.*; import javax.cache.*; @@ -651,7 +652,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K } } catch (Exception e) { - U.error(log, "CacheEntryCreatedListener failed: " + e); + U.error(log, "CacheEntryListener failed: " + e); } } } @@ -683,6 +684,10 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K /** */ private byte types; + /** */ + @LoggerResource + private IgniteLogger log; + /** * For {@link Externalizable}. */ @@ -703,7 +708,14 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) { - return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt)); + try { + return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt)); + } + catch (Exception e) { + U.error(log, "CacheEntryEventFilter failed: " + e); + + return true; + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/68a3b77b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 0c2cb0c..66892ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -87,6 +87,69 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * @throws Exception If failed. */ + public void testExceptionIgnored() throws Exception { + CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return new ExceptionListener(); + } + }, + null, + false, + false + ); + + IgniteCache<Integer, Integer> cache = jcache(); + + cache.registerCacheEntryListener(lsnrCfg); + + try { + for (Integer key : keys()) { + log.info("Check listener exceptions are ignored [key=" + key + ']'); + + cache.put(key, key); + + cache.remove(key); + } + } + finally { + cache.deregisterCacheEntryListener(lsnrCfg); + } + + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return new CreateUpdateRemoveExpireListener(); + } + }, + new Factory<CacheEntryEventFilter<? super Integer, ? super Integer>>() { + @Override public CacheEntryEventFilter<? super Integer, ? super Integer> create() { + return new ExceptionFilter(); + } + }, + false, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + + try { + for (Integer key : keys()) { + log.info("Check filter exceptions are ignored [key=" + key + ']'); + + cache.put(key, key); + + cache.remove(key); + } + } + finally { + cache.deregisterCacheEntryListener(lsnrCfg); + } + } + + /** + * @throws Exception If failed. + */ public void testNoOldValue() throws Exception { CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( new Factory<CacheEntryListener<Integer, Integer>>() { @@ -960,6 +1023,49 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ + static class ExceptionFilter implements CacheEntryEventFilter<Integer, Integer> { + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + throw new RuntimeException("Test filter error."); + } + } + + /** + * + */ + static class ExceptionListener extends CreateUpdateListener + implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> { + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + error(); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + error(); + } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + error(); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + error(); + } + + /** + * Throws exception. + */ + private void error() { + throw new RuntimeException("Test listener error."); + } + } + + /** + * + */ protected static class ToStringProcessor implements EntryProcessor<Integer, Integer, String> { /** {@inheritDoc} */ @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)