# ignite-43 skip update if EntryProcessor does not modify entry

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fc0b5e8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fc0b5e8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fc0b5e8b

Branch: refs/heads/ignite-43
Commit: fc0b5e8b34a927eb9cf59efa5db16069c55980c9
Parents: 8f8f6ef
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Jan 14 12:50:28 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Jan 14 12:50:28 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      | 25 ++++++++-
 .../processors/cache/CacheInvokeEntry.java      | 14 +++++
 .../processors/cache/GridCacheMapEntry.java     | 15 ++++++
 .../GridCacheContinuousQueryManager.java        |  4 +-
 .../cache/transactions/IgniteTxAdapter.java     | 22 +++++++-
 .../IgniteCacheEntryListenerAbstractTest.java   | 57 +++++++++++++++++++-
 6 files changed, 130 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index ac015e8..31264ba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -850,7 +850,30 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     /** {@inheritDoc} */
     @Override public Iterator<Cache.Entry<K, V>> iterator() {
         // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return F.iterator(delegate, new C1<GridCacheEntry<K, V>, Entry<K, 
V>>() {
+                @Override public Entry<K, V> apply(final GridCacheEntry<K, V> 
e) {
+                    return new Entry<K, V>() {
+                        @Override public K getKey() {
+                            return e.getKey();
+                        }
+
+                        @Override public V getValue() {
+                            return e.getValue();
+                        }
+
+                        @Override public <T> T unwrap(Class<T> clazz) {
+                            throw new IllegalArgumentException();
+                        }
+                    };
+                }
+            }, false);
+        }
+        finally {
+            gate.leave(prev);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
index c9ca244..ab7dfc4 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
@@ -26,6 +26,9 @@ public class CacheInvokeEntry<K, V> implements 
MutableEntry<K, V> {
     @GridToStringInclude
     private V val;
 
+    /** */
+    private boolean modified;
+
     /**
      * @param key Key.
      * @param val Value.
@@ -43,6 +46,8 @@ public class CacheInvokeEntry<K, V> implements 
MutableEntry<K, V> {
     /** {@inheritDoc} */
     @Override public void remove() {
         val = null;
+
+        modified = true;
     }
 
     /** {@inheritDoc} */
@@ -51,6 +56,8 @@ public class CacheInvokeEntry<K, V> implements 
MutableEntry<K, V> {
             throw new NullPointerException();
 
         this.val = val;
+
+        modified = true;
     }
 
     /** {@inheritDoc} */
@@ -68,6 +75,13 @@ public class CacheInvokeEntry<K, V> implements 
MutableEntry<K, V> {
         throw new IllegalArgumentException();
     }
 
+    /**
+     * @return {@code True} if {@link #setValue} or {@link #remove was called}.
+     */
+    public boolean modified() {
+        return modified;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheInvokeEntry.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index b80f729..8c90c62 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1479,6 +1479,9 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
                     invokeRes = new CacheInvokeResult<>(e);
                 }
+
+                if (!entry.modified())
+                    return new GridTuple3<>(false, null, invokeRes);
             }
             else
                 updated = (V)writeObj;
@@ -1832,6 +1835,18 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
                     valBytes = oldBytes.getIfMarshaled();
                 }
+
+                if (!entry.modified()) {
+                    return new GridCacheUpdateAtomicResult<>(false,
+                        retval ? old : null,
+                        null,
+                        invokeRes,
+                        -1L,
+                        -1L,
+                        null,
+                        null,
+                        false);
+                }
             }
             else
                 updated = (V)writeObj;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
index 56c7020..e180adc 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
@@ -584,10 +584,10 @@ public class GridCacheContinuousQueryManager<K, V> 
extends GridCacheManagerAdapt
 
                 return fltr.evaluate(new 
org.apache.ignite.cache.CacheEntryEvent(cache, evtType, entry));
             }
-            catch (CacheEntryListenerException e) {
+            catch (Exception e) {
                 LT.warn(ignite.log(), e, "Cache entry event filter error: " + 
e);
 
-                return false;
+                return true;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
index bfd9359..f6ec594 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
@@ -23,6 +23,7 @@ import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.tostring.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
@@ -1199,9 +1200,26 @@ public abstract class IgniteTxAdapter<K, V> extends 
GridMetadataAwareAdapter
                         CU.<K, V>empty(),
                         null);
 
-                val = txEntry.applyEntryProcessors(val);
+                boolean modified = false;
 
-                GridCacheOperation op = val == null ? DELETE : UPDATE;
+                for (T2<EntryProcessor<K, V, ?>, Object[]> t : 
txEntry.entryProcessors()) {
+                    CacheInvokeEntry<K, V> invokeEntry = new 
CacheInvokeEntry<>(txEntry.key(), val);
+
+                    try {
+                        EntryProcessor processor = t.get1();
+
+                        processor.process(invokeEntry, t.get2());
+
+                        val = invokeEntry.getValue();
+                    }
+                    catch (Exception ignore) {
+                        // No-op.
+                    }
+
+                    modified |= invokeEntry.modified();
+                }
+
+                GridCacheOperation op = modified ? (val == null ? DELETE : 
UPDATE) : NOOP;
 
                 return F.t(op, (V)cacheCtx.<V>unwrapTemporary(val), null);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fc0b5e8b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index f63ab9d..718e847 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -22,6 +22,7 @@ import org.jetbrains.annotations.*;
 import javax.cache.configuration.*;
 import javax.cache.event.*;
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 
 import java.util.*;
 import java.util.concurrent.*;
@@ -667,8 +668,15 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
 
         cache.put(key, 0);
 
-        for (int i = 0; i < UPDATES; i++)
-            cache.put(key, i + 1);
+        for (int i = 0; i < UPDATES; i++) {
+            if (i % 2 == 0)
+                cache.put(key, i + 1);
+            else
+                cache.invoke(key, new SetValueProcessor(i + 1));
+        }
+
+        // Invoke processor does not update value, should not trigger event.
+        assertEquals(String.valueOf(UPDATES), cache.invoke(key, new 
ToStringProcessor()));
 
         assertFalse(cache.putIfAbsent(key, -1));
 
@@ -941,4 +949,49 @@ public abstract class IgniteCacheEntryListenerAbstractTest 
extends IgniteCacheAb
                 onEvent(evt);
         }
     }
+
+    /**
+     *
+     */
+    protected static class ToStringProcessor implements 
EntryProcessor<Integer, Integer, String> {
+        /** {@inheritDoc} */
+        @Override public String process(MutableEntry<Integer, Integer> e, 
Object... arguments)
+            throws EntryProcessorException {
+            return String.valueOf(e.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ToStringProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class SetValueProcessor implements 
EntryProcessor<Integer, Integer, String> {
+        /** */
+        private Integer val;
+
+        /**
+         * @param val Value to set.
+         */
+        public SetValueProcessor(Integer val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String process(MutableEntry<Integer, Integer> e, 
Object... arguments)
+            throws EntryProcessorException {
+            e.setValue(val);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SetValueProcessor.class, this);
+        }
+    }
+
 }

Reply via email to