ignite-326 review

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7bf54037
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7bf54037
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7bf54037

Branch: refs/heads/ignite-317
Commit: 7bf540372b6becfd187dbcbca98173fd0c245cba
Parents: 6d15f90
Author: Yakov Zhdanov <yzhda...@gridgain.com>
Authored: Wed Feb 25 21:21:50 2015 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Wed Feb 25 21:21:50 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     | 69 ++++++++++++++
 .../ignite/cache/IgniteEntryProcessor.java      | 28 ++++++
 .../processors/cache/IgniteCacheProxy.java      | 63 +++++++++++++
 .../cache/GridCacheAbstractFullApiSelfTest.java | 99 ++++++++++++++++++++
 4 files changed, 259 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bf54037/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 05c496e..195a304 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -381,12 +381,81 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
     @IgniteAsyncSupported
     @Override public <T> T invoke(K key, EntryProcessor<K, V, T> 
entryProcessor, Object... arguments);
 
+    /**
+     * Invokes an {@link IgniteEntryProcessor} against the {@link Entry} 
specified by
+     * the provided key. If an {@link Entry} does not exist for the specified 
key,
+     * an attempt is made to load it (if a loader is configured) or a surrogate
+     * {@link Entry}, consisting of the key with a null value is used instead.
+     * This method different
+     * <p>
+     *
+     * @param key            the key to the entry
+     * @param entryProcessor the {@link IgniteEntryProcessor} to invoke
+     * @param arguments      additional arguments to pass to the
+     *                       {@link IgniteEntryProcessor}
+     * @return the result of the processing, if any, defined by the
+     *         {@link IgniteEntryProcessor} implementation
+     * @throws NullPointerException    if key or {@link IgniteEntryProcessor} 
is null
+     * @throws IllegalStateException   if the cache is {@link #isClosed()}
+     * @throws ClassCastException    if the implementation is configured to 
perform
+     *                               runtime-type-checking, and the key or 
value
+     *                               types are incompatible with those that 
have been
+     *                               configured for the {@link Cache}
+     * @throws EntryProcessorException if an exception is thrown by the {@link
+     *                                 IgniteEntryProcessor}, a Caching 
Implementation
+     *                                 must wrap any {@link Exception} thrown
+     *                                 wrapped in an {@link 
EntryProcessorException}.
+     * @see IgniteEntryProcessor
+     */
+    @IgniteAsyncSupported
+    public <T> T invoke(K key, IgniteEntryProcessor<K, V, T> entryProcessor, 
Object... arguments);
+
     /** {@inheritDoc} */
     @IgniteAsyncSupported
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
         EntryProcessor<K, V, T> entryProcessor, Object... args);
 
     /**
+     * Invokes an {@link IgniteEntryProcessor} against the set of {@link 
Entry}s
+     * specified by the set of keys.
+     * <p>
+     * If an {@link Entry} does not exist for the specified key, an attempt is 
made
+     * to load it (if a loader is configured) or a surrogate {@link Entry},
+     * consisting of the key and a value of null is provided.
+     * <p>
+     * The order that the entries for the keys are processed is undefined.
+     * Implementations may choose to process the entries in any order, 
including
+     * concurrently.  Furthermore there is no guarantee implementations will
+     * use the same {@link IgniteEntryProcessor} instance to process each 
entry, as
+     * the case may be in a non-local cache topology.
+     * <p>
+     * The result of executing the {@link IgniteEntryProcessor} is returned as 
a
+     * {@link Map} of {@link EntryProcessorResult}s, one result per key.  
Should the
+     * {@link IgniteEntryProcessor} or Caching implementation throw an 
exception, the
+     * exception is wrapped and re-thrown when a call to
+     * {@link javax.cache.processor.EntryProcessorResult#get()} is made.
+     *
+     * @param keys           the set of keys for entries to process
+     * @param entryProcessor the {@link IgniteEntryProcessor} to invoke
+     * @param args      additional arguments to pass to the
+     *                       {@link IgniteEntryProcessor}
+     * @return the map of {@link EntryProcessorResult}s of the processing per 
key,
+     * if any, defined by the {@link IgniteEntryProcessor} implementation.  No 
mappings
+     * will be returned for {@link IgniteEntryProcessor}s that return a
+     * <code>null</code> value for a key.
+     * @throws NullPointerException    if keys or {@link IgniteEntryProcessor} 
are null
+     * @throws IllegalStateException   if the cache is {@link #isClosed()}
+     * @throws ClassCastException    if the implementation is configured to 
perform
+     *                               runtime-type-checking, and the key or 
value
+     *                               types are incompatible with those that 
have been
+     *                               configured for the {@link Cache}
+     * @see IgniteEntryProcessor
+     */
+    @IgniteAsyncSupported
+    public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+        IgniteEntryProcessor<K, V, T> entryProcessor, Object... args);
+
+    /**
      * Gets snapshot metrics (statistics) for this cache.
      *
      * @return Cache metrics.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bf54037/modules/core/src/main/java/org/apache/ignite/cache/IgniteEntryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/IgniteEntryProcessor.java 
b/modules/core/src/main/java/org/apache/ignite/cache/IgniteEntryProcessor.java
new file mode 100644
index 0000000..727433b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/IgniteEntryProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import javax.cache.processor.*;
+import java.io.*;
+
+/**
+ * This processor adds {@link Serializable} interface to {@link 
EntryProcessor} object.
+ */
+public interface IgniteEntryProcessor<K, V, T> extends EntryProcessor<K, V, 
T>, Serializable {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bf54037/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index c361a27..4101a26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1099,6 +1099,44 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public <T> T invoke(K key, IgniteEntryProcessor<K, V, T> 
entryProcessor, Object... args)
+        throws EntryProcessorException {
+        try {
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+            try {
+                if (isAsync()) {
+                    IgniteInternalFuture<EntryProcessorResult<T>> fut = 
delegate.invokeAsync(key, entryProcessor, args);
+
+                    IgniteInternalFuture<T> fut0 = fut.chain(new 
CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() {
+                        @Override public T 
applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut)
+                            throws IgniteCheckedException {
+                            EntryProcessorResult<T> res = fut.get();
+
+                            return res != null ? res.get() : null;
+                        }
+                    });
+
+                    setFuture(fut0);
+
+                    return null;
+                }
+                else {
+                    EntryProcessorResult<T> res = delegate.invoke(key, 
entryProcessor, args);
+
+                    return res != null ? res.get() : null;
+                }
+            }
+            finally {
+                gate.leave(prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
         EntryProcessor<K, V, T> entryProcessor,
         Object... args) {
@@ -1124,6 +1162,31 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
+        IgniteEntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        try {
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.invokeAllAsync(keys, entryProcessor, 
args));
+
+                    return null;
+                }
+                else
+                    return delegate.invokeAll(keys, entryProcessor, args);
+            }
+            finally {
+                gate.leave(prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
         Object... args) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bf54037/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index c5b4dc8..3ab4544 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -70,6 +70,14 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
         }
     };
 
+    /** Increment processor for invoke operations with IgniteEntryProcessor. */
+    public static final IgniteEntryProcessor<String, Integer, String> 
INCR_IGNITE_PROCESSOR =
+        new IgniteEntryProcessor<String, Integer, String>() {
+            @Override public String process(MutableEntry<String, Integer> e, 
Object... args) {
+                return INCR_PROCESSOR.process(e, args);
+            }
+        };
+
     /** Increment processor for invoke operations. */
     public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR 
= new EntryProcessor<String, Integer, String>() {
         @Override public String process(MutableEntry<String, Integer> e, 
Object... args) {
@@ -83,6 +91,14 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
         }
     };
 
+    /** Increment processor for invoke operations with IgniteEntryProcessor. */
+    public static final IgniteEntryProcessor<String, Integer, String> 
RMV_IGNITE_PROCESSOR =
+        new IgniteEntryProcessor<String, Integer, String>() {
+            @Override public String process(MutableEntry<String, Integer> e, 
Object... args) {
+                return RMV_PROCESSOR.process(e, args);
+            }
+        };
+
     /** Dflt grid. */
     protected Ignite dfltIgnite;
 
@@ -600,6 +616,89 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testIgniteTransformOptimisticReadCommitted() throws Exception {
+        checkIgniteTransform(OPTIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIgniteTransformOptimisticRepeatableRead() throws Exception 
{
+        checkIgniteTransform(OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIgniteTransformPessimisticReadCommitted() throws Exception 
{
+        checkIgniteTransform(PESSIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIgniteTransformPessimisticRepeatableRead() throws 
Exception {
+        checkIgniteTransform(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws Exception If failed.
+     */
+    private void checkIgniteTransform(TransactionConcurrency concurrency, 
TransactionIsolation isolation)
+        throws Exception {
+        IgniteCache<String, Integer> cache = jcache();
+
+        cache.put("key2", 1);
+        cache.put("key3", 3);
+
+        Transaction tx = txEnabled() ? 
ignite(0).transactions().txStart(concurrency, isolation) : null;
+
+        try {
+            assertEquals("null", cache.invoke("key1", INCR_IGNITE_PROCESSOR));
+            assertEquals("1", cache.invoke("key2", INCR_IGNITE_PROCESSOR));
+            assertEquals("3", cache.invoke("key3", RMV_IGNITE_PROCESSOR));
+
+            if (tx != null)
+                tx.commit();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+
+            throw e;
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+
+        assertEquals((Integer)1, cache.get("key1"));
+        assertEquals((Integer)2, cache.get("key2"));
+        assertNull(cache.get("key3"));
+
+        for (int i = 0; i < gridCount(); i++)
+            assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", 
CachePeekMode.ONHEAP));
+
+        cache.remove("key1");
+        cache.put("key2", 1);
+        cache.put("key3", 3);
+
+        assertEquals("null", cache.invoke("key1", INCR_IGNITE_PROCESSOR));
+        assertEquals("1", cache.invoke("key2", INCR_IGNITE_PROCESSOR));
+        assertEquals("3", cache.invoke("key3", RMV_IGNITE_PROCESSOR));
+
+        assertEquals((Integer)1, cache.get("key1"));
+        assertEquals((Integer)2, cache.get("key2"));
+        assertNull(cache.get("key3"));
+
+        for (int i = 0; i < gridCount(); i++)
+            assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+    }
+
+    /**
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      * @throws Exception If failed.

Reply via email to