# ignite-43 skip update if EntryProcessor does not modify entry
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fc0b5e8b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fc0b5e8b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fc0b5e8b Branch: refs/heads/ignite-43 Commit: fc0b5e8b34a927eb9cf59efa5db16069c55980c9 Parents: 8f8f6ef Author: sboikov <sboi...@gridgain.com> Authored: Wed Jan 14 12:50:28 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jan 14 12:50:28 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 25 ++++++++- .../processors/cache/CacheInvokeEntry.java | 14 +++++ .../processors/cache/GridCacheMapEntry.java | 15 ++++++ .../GridCacheContinuousQueryManager.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 22 +++++++- .../IgniteCacheEntryListenerAbstractTest.java | 57 +++++++++++++++++++- 6 files changed, 130 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index ac015e8..31264ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -850,7 +850,30 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { // TODO IGNITE-1. - throw new UnsupportedOperationException(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return F.iterator(delegate, new C1<GridCacheEntry<K, V>, Entry<K, V>>() { + @Override public Entry<K, V> apply(final GridCacheEntry<K, V> e) { + return new Entry<K, V>() { + @Override public K getKey() { + return e.getKey(); + } + + @Override public V getValue() { + return e.getValue(); + } + + @Override public <T> T unwrap(Class<T> clazz) { + throw new IllegalArgumentException(); + } + }; + } + }, false); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java index c9ca244..ab7dfc4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java @@ -26,6 +26,9 @@ public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> { @GridToStringInclude private V val; + /** */ + private boolean modified; + /** * @param key Key. * @param val Value. @@ -43,6 +46,8 @@ public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> { /** {@inheritDoc} */ @Override public void remove() { val = null; + + modified = true; } /** {@inheritDoc} */ @@ -51,6 +56,8 @@ public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> { throw new NullPointerException(); this.val = val; + + modified = true; } /** {@inheritDoc} */ @@ -68,6 +75,13 @@ public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> { throw new IllegalArgumentException(); } + /** + * @return {@code True} if {@link #setValue} or {@link #remove was called}. + */ + public boolean modified() { + return modified; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheInvokeEntry.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index b80f729..8c90c62 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -1479,6 +1479,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> invokeRes = new CacheInvokeResult<>(e); } + + if (!entry.modified()) + return new GridTuple3<>(false, null, invokeRes); } else updated = (V)writeObj; @@ -1832,6 +1835,18 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> valBytes = oldBytes.getIfMarshaled(); } + + if (!entry.modified()) { + return new GridCacheUpdateAtomicResult<>(false, + retval ? old : null, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); + } } else updated = (V)writeObj; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java index 56c7020..e180adc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java @@ -584,10 +584,10 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt return fltr.evaluate(new org.apache.ignite.cache.CacheEntryEvent(cache, evtType, entry)); } - catch (CacheEntryListenerException e) { + catch (Exception e) { LT.warn(ignite.log(), e, "Cache entry event filter error: " + e); - return false; + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java index bfd9359..f6ec594 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java @@ -23,6 +23,7 @@ import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.tostring.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -1199,9 +1200,26 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter CU.<K, V>empty(), null); - val = txEntry.applyEntryProcessors(val); + boolean modified = false; - GridCacheOperation op = val == null ? DELETE : UPDATE; + for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) { + CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val); + + try { + EntryProcessor processor = t.get1(); + + processor.process(invokeEntry, t.get2()); + + val = invokeEntry.getValue(); + } + catch (Exception ignore) { + // No-op. + } + + modified |= invokeEntry.modified(); + } + + GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; return F.t(op, (V)cacheCtx.<V>unwrapTemporary(val), null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index f63ab9d..718e847 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -22,6 +22,7 @@ import org.jetbrains.annotations.*; import javax.cache.configuration.*; import javax.cache.event.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.*; @@ -667,8 +668,15 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cache.put(key, 0); - for (int i = 0; i < UPDATES; i++) - cache.put(key, i + 1); + for (int i = 0; i < UPDATES; i++) { + if (i % 2 == 0) + cache.put(key, i + 1); + else + cache.invoke(key, new SetValueProcessor(i + 1)); + } + + // Invoke processor does not update value, should not trigger event. + assertEquals(String.valueOf(UPDATES), cache.invoke(key, new ToStringProcessor())); assertFalse(cache.putIfAbsent(key, -1)); @@ -941,4 +949,49 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb onEvent(evt); } } + + /** + * + */ + protected static class ToStringProcessor implements EntryProcessor<Integer, Integer, String> { + /** {@inheritDoc} */ + @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments) + throws EntryProcessorException { + return String.valueOf(e.getValue()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ToStringProcessor.class, this); + } + } + + /** + * + */ + protected static class SetValueProcessor implements EntryProcessor<Integer, Integer, String> { + /** */ + private Integer val; + + /** + * @param val Value to set. + */ + public SetValueProcessor(Integer val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments) + throws EntryProcessorException { + e.setValue(val); + + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SetValueProcessor.class, this); + } + } + }