ignite-326 review
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7bf54037 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7bf54037 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7bf54037 Branch: refs/heads/ignite-sql-tests Commit: 7bf540372b6becfd187dbcbca98173fd0c245cba Parents: 6d15f90 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Wed Feb 25 21:21:50 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Wed Feb 25 21:21:50 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 69 ++++++++++++++ .../ignite/cache/IgniteEntryProcessor.java | 28 ++++++ .../processors/cache/IgniteCacheProxy.java | 63 +++++++++++++ .../cache/GridCacheAbstractFullApiSelfTest.java | 99 ++++++++++++++++++++ 4 files changed, 259 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bf54037/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 05c496e..195a304 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -381,12 +381,81 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @IgniteAsyncSupported @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments); + /** + * Invokes an {@link IgniteEntryProcessor} against the {@link Entry} specified by + * the provided key. If an {@link Entry} does not exist for the specified key, + * an attempt is made to load it (if a loader is configured) or a surrogate + * {@link Entry}, consisting of the key with a null value is used instead. + * This method different + * <p> + * + * @param key the key to the entry + * @param entryProcessor the {@link IgniteEntryProcessor} to invoke + * @param arguments additional arguments to pass to the + * {@link IgniteEntryProcessor} + * @return the result of the processing, if any, defined by the + * {@link IgniteEntryProcessor} implementation + * @throws NullPointerException if key or {@link IgniteEntryProcessor} is null + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws ClassCastException if the implementation is configured to perform + * runtime-type-checking, and the key or value + * types are incompatible with those that have been + * configured for the {@link Cache} + * @throws EntryProcessorException if an exception is thrown by the {@link + * IgniteEntryProcessor}, a Caching Implementation + * must wrap any {@link Exception} thrown + * wrapped in an {@link EntryProcessorException}. + * @see IgniteEntryProcessor + */ + @IgniteAsyncSupported + public <T> T invoke(K key, IgniteEntryProcessor<K, V, T> entryProcessor, Object... arguments); + /** {@inheritDoc} */ @IgniteAsyncSupported @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args); /** + * Invokes an {@link IgniteEntryProcessor} against the set of {@link Entry}s + * specified by the set of keys. + * <p> + * If an {@link Entry} does not exist for the specified key, an attempt is made + * to load it (if a loader is configured) or a surrogate {@link Entry}, + * consisting of the key and a value of null is provided. + * <p> + * The order that the entries for the keys are processed is undefined. + * Implementations may choose to process the entries in any order, including + * concurrently. Furthermore there is no guarantee implementations will + * use the same {@link IgniteEntryProcessor} instance to process each entry, as + * the case may be in a non-local cache topology. + * <p> + * The result of executing the {@link IgniteEntryProcessor} is returned as a + * {@link Map} of {@link EntryProcessorResult}s, one result per key. Should the + * {@link IgniteEntryProcessor} or Caching implementation throw an exception, the + * exception is wrapped and re-thrown when a call to + * {@link javax.cache.processor.EntryProcessorResult#get()} is made. + * + * @param keys the set of keys for entries to process + * @param entryProcessor the {@link IgniteEntryProcessor} to invoke + * @param args additional arguments to pass to the + * {@link IgniteEntryProcessor} + * @return the map of {@link EntryProcessorResult}s of the processing per key, + * if any, defined by the {@link IgniteEntryProcessor} implementation. No mappings + * will be returned for {@link IgniteEntryProcessor}s that return a + * <code>null</code> value for a key. + * @throws NullPointerException if keys or {@link IgniteEntryProcessor} are null + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws ClassCastException if the implementation is configured to perform + * runtime-type-checking, and the key or value + * types are incompatible with those that have been + * configured for the {@link Cache} + * @see IgniteEntryProcessor + */ + @IgniteAsyncSupported + public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + IgniteEntryProcessor<K, V, T> entryProcessor, Object... args); + + /** * Gets snapshot metrics (statistics) for this cache. * * @return Cache metrics. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bf54037/modules/core/src/main/java/org/apache/ignite/cache/IgniteEntryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/IgniteEntryProcessor.java b/modules/core/src/main/java/org/apache/ignite/cache/IgniteEntryProcessor.java new file mode 100644 index 0000000..727433b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/IgniteEntryProcessor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache; + +import javax.cache.processor.*; +import java.io.*; + +/** + * This processor adds {@link Serializable} interface to {@link EntryProcessor} object. + */ +public interface IgniteEntryProcessor<K, V, T> extends EntryProcessor<K, V, T>, Serializable { + // No-op. +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bf54037/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index c361a27..4101a26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1099,6 +1099,44 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public <T> T invoke(K key, IgniteEntryProcessor<K, V, T> entryProcessor, Object... args) + throws EntryProcessorException { + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + if (isAsync()) { + IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); + + IgniteInternalFuture<T> fut0 = fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { + @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut) + throws IgniteCheckedException { + EntryProcessorResult<T> res = fut.get(); + + return res != null ? res.get() : null; + } + }); + + setFuture(fut0); + + return null; + } + else { + EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { @@ -1124,6 +1162,31 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + IgniteEntryProcessor<K, V, T> entryProcessor, + Object... args) { + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + + return null; + } + else + return delegate.invokeAll(keys, entryProcessor, args); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bf54037/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index c5b4dc8..3ab4544 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -70,6 +70,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract } }; + /** Increment processor for invoke operations with IgniteEntryProcessor. */ + public static final IgniteEntryProcessor<String, Integer, String> INCR_IGNITE_PROCESSOR = + new IgniteEntryProcessor<String, Integer, String>() { + @Override public String process(MutableEntry<String, Integer> e, Object... args) { + return INCR_PROCESSOR.process(e, args); + } + }; + /** Increment processor for invoke operations. */ public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new EntryProcessor<String, Integer, String>() { @Override public String process(MutableEntry<String, Integer> e, Object... args) { @@ -83,6 +91,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract } }; + /** Increment processor for invoke operations with IgniteEntryProcessor. */ + public static final IgniteEntryProcessor<String, Integer, String> RMV_IGNITE_PROCESSOR = + new IgniteEntryProcessor<String, Integer, String>() { + @Override public String process(MutableEntry<String, Integer> e, Object... args) { + return RMV_PROCESSOR.process(e, args); + } + }; + /** Dflt grid. */ protected Ignite dfltIgnite; @@ -600,6 +616,89 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract } /** + * @throws Exception If failed. + */ + public void testIgniteTransformOptimisticReadCommitted() throws Exception { + checkIgniteTransform(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testIgniteTransformOptimisticRepeatableRead() throws Exception { + checkIgniteTransform(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testIgniteTransformPessimisticReadCommitted() throws Exception { + checkIgniteTransform(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testIgniteTransformPessimisticRepeatableRead() throws Exception { + checkIgniteTransform(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If failed. + */ + private void checkIgniteTransform(TransactionConcurrency concurrency, TransactionIsolation isolation) + throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + cache.put("key2", 1); + cache.put("key3", 3); + + Transaction tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, isolation) : null; + + try { + assertEquals("null", cache.invoke("key1", INCR_IGNITE_PROCESSOR)); + assertEquals("1", cache.invoke("key2", INCR_IGNITE_PROCESSOR)); + assertEquals("3", cache.invoke("key3", RMV_IGNITE_PROCESSOR)); + + if (tx != null) + tx.commit(); + } + catch (Exception e) { + e.printStackTrace(); + + throw e; + } + finally { + if (tx != null) + tx.close(); + } + + assertEquals((Integer)1, cache.get("key1")); + assertEquals((Integer)2, cache.get("key2")); + assertNull(cache.get("key3")); + + for (int i = 0; i < gridCount(); i++) + assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", CachePeekMode.ONHEAP)); + + cache.remove("key1"); + cache.put("key2", 1); + cache.put("key3", 3); + + assertEquals("null", cache.invoke("key1", INCR_IGNITE_PROCESSOR)); + assertEquals("1", cache.invoke("key2", INCR_IGNITE_PROCESSOR)); + assertEquals("3", cache.invoke("key3", RMV_IGNITE_PROCESSOR)); + + assertEquals((Integer)1, cache.get("key1")); + assertEquals((Integer)2, cache.get("key2")); + assertNull(cache.get("key3")); + + for (int i = 0; i < gridCount(); i++) + assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP)); + } + + /** * @param concurrency Concurrency. * @param isolation Isolation. * @throws Exception If failed.