ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0da9afad Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0da9afad Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0da9afad Branch: refs/heads/ignite-54 Commit: 0da9afad51430e8a8b0eb8f96456ec1778ab3972 Parents: 9fd2f23 Author: sboikov <sboi...@gridgain.com> Authored: Mon Dec 29 09:43:08 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Dec 29 11:41:38 2014 +0300 ---------------------------------------------------------------------- .../examples/datagrid/CacheApiExample.java | 1 - .../datagrid/CachePopularNumbersExample.java | 2 +- .../ScalarCachePopularNumbersExample.scala | 21 +------- .../dataload/IgniteDataLoadCacheUpdater.java | 4 +- .../processors/cache/IgniteCacheProxy.java | 22 ++++---- .../processors/cache/CacheInvokeResult.java | 1 - .../processors/cache/GridCacheEntryEx.java | 5 +- .../processors/cache/GridCacheMapEntry.java | 24 ++++----- .../processors/cache/GridCacheProxyImpl.java | 7 ++- .../GridAtomicCacheQueueImpl.java | 2 +- .../datastructures/GridCacheQueueAdapter.java | 3 +- .../dht/atomic/GridDhtAtomicCache.java | 12 +++-- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 6 +-- .../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +- .../local/atomic/GridLocalAtomicCache.java | 13 +++-- .../transactions/IgniteTxLocalAdapter.java | 6 +-- .../processors/ggfs/GridGgfsDataManager.java | 7 +-- .../processors/ggfs/GridGgfsMetaManager.java | 5 +- .../cache/IgniteCacheInvokeAbstractTest.java | 56 +++++++++++++++++++- .../cache/GridCacheAbstractFullApiSelfTest.java | 16 ++++++ .../GridCacheInterceptorAbstractSelfTest.java | 12 ++--- .../processors/cache/GridCacheTestEntryEx.java | 5 +- .../hadoop/jobtracker/GridHadoopJobTracker.java | 7 ++- 23 files changed, 141 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java b/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java index f477a89..f05331f 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java @@ -11,7 +11,6 @@ package org.gridgain.examples.datagrid; import org.apache.ignite.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import javax.cache.processor.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java index 678a5e2..c9b8f1d 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java @@ -156,7 +156,7 @@ public class CachePopularNumbersExample { @Override public Void process(MutableEntry<Integer, Long> e, Object... args) { Long val = e.getValue(); - e.setValue(val == null ? 1 : val + 1); + e.setValue(val == null ? 1L : val + 1); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala b/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala index 0d87f19..567cdf0 100644 --- a/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala +++ b/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala @@ -89,26 +89,7 @@ object ScalarCachePopularNumbersExample extends App { // Reduce parallel operations since we running the whole grid locally under heavy load. val ldr = dataLoader$[Int, Long](CACHE_NAME, 2048) - val f = new EntryProcessor[Int, Long, Void] { - override def process(e: MutableEntry[Int, Long], arguments: AnyRef*): Void = { - if (e.exists()) - e.setValue(e.getValue + 1) - else - e.setValue(1) - - null - } - } - - // Set custom updater to increment value for each key. - ldr.updater(new IgniteDataLoadCacheUpdater[Int, Long] { - def update(cache: IgniteCache[Int, Long], entries: util.Collection[Entry[Int, Long]]) = { - import scala.collection.JavaConversions._ - - for (e <- entries) - cache.invoke(e.getKey, f) - } - }) + // TODO IGNITE-44: restore invoke. (0 until CNT) foreach (_ => ldr.addData(Random.nextInt(RANGE), 1L)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java index 2e1f92c..9c43db4 100644 --- a/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java @@ -15,12 +15,12 @@ import java.io.*; import java.util.*; /** - * Updates cache with batch of entries. Usually it is enough to configure {@link org.apache.ignite.IgniteDataLoader#isolated(boolean)} + * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#isolated(boolean)} * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best * performance custom user-defined implementation may help. * <p> * Data loader can be configured to use custom implementation of updater instead of default one using - * {@link org.apache.ignite.IgniteDataLoader#updater(IgniteDataLoadCacheUpdater)} method. + * {@link IgniteDataLoader#updater(IgniteDataLoadCacheUpdater)} method. */ public interface IgniteDataLoadCacheUpdater<K, V> extends Serializable { /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index eacae0f..afa2986 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -826,17 +826,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements return new IgniteCacheProxy<>(ctx, delegate, prj, true); } - /** - * @param e Checked exception. - * @return Cache exception. - */ - private CacheException cacheException(IgniteCheckedException e) { - if (e instanceof GridCachePartialUpdateException) - return new CachePartialUpdateException((GridCachePartialUpdateException)e); - - return new CacheException(e); - } - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <K1, V1> IgniteCache<K1, V1> keepPortable() { @@ -897,6 +886,17 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } } + /** + * @param e Checked exception. + * @return Cache exception. + */ + private CacheException cacheException(IgniteCheckedException e) { + if (e instanceof GridCachePartialUpdateException) + return new CachePartialUpdateException((GridCachePartialUpdateException)e); + + return new CacheException(e); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java index 19b4141..ab0959e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.marshaller.optimized.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; import javax.cache.processor.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index 101427f..0962cd7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -20,6 +20,7 @@ import org.gridgain.grid.util.lang.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.util.*; /** @@ -460,11 +461,11 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { * @param intercept If {@code true} then calls cache interceptor. * @param subjId Subject ID initiated this update. * @param taskName Task name. - * @return Tuple containing success flag and operation result. + * @return Tuple containing success flag, old value and result for invoke operation. * @throws IgniteCheckedException If update failed. * @throws GridCacheEntryRemovedException If entry is obsolete. */ - public IgniteBiTuple<Boolean, Object> innerUpdateLocal( + public GridTuple3<Boolean, V, EntryProcessorResult<Object>> innerUpdateLocal( GridCacheVersion ver, GridCacheOperation op, @Nullable Object writeObj, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 110d8b1..bb493cc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -1365,7 +1365,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteBiTuple<Boolean, Object> innerUpdateLocal( + @Override public GridTuple3<Boolean, V, EntryProcessorResult<Object>> innerUpdateLocal( GridCacheVersion ver, GridCacheOperation op, @Nullable Object writeObj, @@ -1382,12 +1382,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert cctx.isLocal() && cctx.atomic(); - Object opRes; + V old; boolean res = true; IgniteBiTuple<Boolean, ?> interceptorRes = null; + EntryProcessorResult<Object> invokeRes = null; + synchronized (this) { boolean needVal = retval || intercept || op == GridCacheOperation.TRANSFORM || !F.isEmpty(filter); @@ -1398,9 +1400,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> unswap(true, retval); // Possibly get old value form store. - V old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; - - opRes = old; + old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; GridCacheValueBytes oldBytes = valueBytesUnlocked(); @@ -1431,7 +1431,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updateTtl(ttl); } - return new IgniteBiTuple<>(false, (Object)(retval ? old : null)); + return new T3<>(false, retval ? old : null, null); } } @@ -1458,12 +1458,12 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updated = cctx.unwrapTemporary(entry.getValue()); - opRes = computed != null ? new CacheInvokeResult<>(cctx.unwrapTemporary(computed)) : null; + invokeRes = computed != null ? new CacheInvokeResult<>(cctx.unwrapTemporary(computed)) : null; } catch (Exception e) { updated = old; - opRes = new CacheInvokeResult<>(e); + invokeRes = new CacheInvokeResult<>(e); } } else @@ -1476,14 +1476,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updated = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); if (updated == null) - return new IgniteBiTuple<>(false, (Object)cctx.<V>unwrapTemporary(old)); + return new GridTuple3<>(false, cctx.<V>unwrapTemporary(old), invokeRes); } else { interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key, old); if (cctx.cancelRemove(interceptorRes)) - return new IgniteBiTuple<>(false, - (Object)cctx.<V>unwrapTemporary(interceptorRes.get2())); + return new GridTuple3<>(false, cctx.<V>unwrapTemporary(interceptorRes.get2()), invokeRes); } } @@ -1593,8 +1592,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } - return new IgniteBiTuple<>(res, - (Object)(cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : opRes))); + return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old), invokeRes); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index 80a776a..dd24388 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -797,8 +797,11 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, - long ttl, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { + @Override public IgniteFuture<Boolean> putxAsync(K key, + V val, + @Nullable GridCacheEntryEx<K, V> entry, + long ttl, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java index 090fb94..786e863 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java @@ -223,7 +223,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { /** * @param c EntryProcessor to be applied for queue header. - * @return Value computed by the transform closure. + * @return Value computed by the entry processor. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java index e10ac90..ed00313 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java @@ -322,8 +322,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp A.ensure(batchSize >= 0, "Batch size cannot be negative: " + batchSize); try { - IgniteBiTuple<Long, Long> t = (IgniteBiTuple<Long, Long>)cache.invoke(queueKey, - new ClearProcessor(id)); + IgniteBiTuple<Long, Long> t = (IgniteBiTuple<Long, Long>)cache.invoke(queueKey, new ClearProcessor(id)); if (t == null) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index e9bb16e..70491e0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1276,11 +1276,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CU.<K, V>empty(), null); - if (entryProcessorMap == null) - entryProcessorMap = new HashMap<>(); - - entryProcessorMap.put(entry.key(), entryProcessor); - CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old); V updated; @@ -1335,6 +1330,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { firstEntryIdx = i + 1; putMap = null; + entryProcessorMap = null; filtered = new ArrayList<>(); } @@ -1376,6 +1372,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { firstEntryIdx = i + 1; rmvKeys = null; + entryProcessorMap = null; filtered = new ArrayList<>(); } @@ -1385,6 +1382,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { putMap.put(entry.key(), ctx.<V>unwrapTemporary(updated)); } + + if (entryProcessorMap == null) + entryProcessorMap = new HashMap<>(); + + entryProcessorMap.put(entry.key(), entryProcessor); } else if (op == UPDATE) { V updated = req.value(i); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 60e42bc..c34a221 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -120,11 +120,11 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp @GridDirectVersion(2) private List<byte[]> entryProcessorsBytes; - /** Near transform closures. */ + /** Near entry processors. */ @GridDirectTransient private List<EntryProcessor<K, V, ?>> nearEntryProcessors; - /** Near transform closures bytes. */ + /** Near entry processors bytes. */ @GridDirectCollection(byte[].class) @GridDirectVersion(2) private List<byte[]> nearEntryProcessorsBytes; @@ -133,7 +133,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp @GridDirectTransient private Object[] invokeArgs; - /** Filter bytes. */ + /** Entry processor arguments bytes. */ private byte[][] invokeArgsBytes; /** Subject ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 391a5b2..a210711 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -82,7 +82,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im @GridDirectTransient private Object[] invokeArgs; - /** Filter bytes. */ + /** Entry processor arguments bytes. */ private byte[][] invokeArgsBytes; /** DR versions. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index a3b4157..639ab38 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -19,6 +19,7 @@ import org.gridgain.grid.kernal.processors.cache.local.*; import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; +import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -882,7 +883,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { try { entry = entryEx(key); - IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal( + GridTuple3<Boolean, V, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( ver, val == null ? DELETE : op, val, @@ -898,9 +899,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { taskName); if (op == TRANSFORM) { - assert t.get2() == null || t.get2() instanceof EntryProcessorResult : t.get2(); - - if (t.get2() != null) { + if (t.get3() != null) { Map<K, EntryProcessorResult> computedMap; if (res == null) { @@ -911,11 +910,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { else computedMap = (Map<K, EntryProcessorResult>)res.get2(); - computedMap.put(key, (EntryProcessorResult)t.getValue()); + computedMap.put(key, t.get3()); } } else if (res == null) - res = t; + res = new T2(t.get1(), t.get2()); break; // While. } @@ -1297,7 +1296,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { assert writeVal != null || op == DELETE : "null write value found."; - IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal( + GridTuple3<Boolean, V, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( ver, op, writeVal, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java index d349215..6380605 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1850,10 +1850,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> Set<K> skipped = null; - Set<K> missedForInvoke = null; - boolean rmv = lookup == null && invokeMap == null; + Set<K> missedForInvoke = null; + try { // Set transform flag for transaction. if (invokeMap != null) @@ -2285,7 +2285,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } else { if (!hasPrevVal) - v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet(); + v = cached.rawGetOrUnmarshal(false); } if (txEntry.op() == TRANSFORM) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java index 89bcbe6..1906b95 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java @@ -1118,9 +1118,7 @@ public class GridGgfsDataManager extends GridGgfsManager { GridGgfsBlockKey key = new GridGgfsBlockKey(colocatedKey.getFileId(), null, colocatedKey.evictExclude(), colocatedKey.getBlockId()); - IgniteTx tx = dataCachePrj.txStart(PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = dataCachePrj.txStart(PESSIMISTIC, REPEATABLE_READ)) { // Lock keys. Map<GridGgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key)); @@ -1147,9 +1145,6 @@ public class GridGgfsDataManager extends GridGgfsManager { tx.commit(); } - finally { - tx.close(); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java index 87e09b9..722d142 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java @@ -2796,11 +2796,8 @@ public class GridGgfsMetaManager extends GridGgfsManager { GridGgfsListingEntry entry = listing.get(fileName); - if (entry == null || !entry.fileId().equals(fileId)) { - e.setValue(fileInfo); - + if (entry == null || !entry.fileId().equals(fileId)) return null; - } entry = new GridGgfsListingEntry(entry, entry.length() + lenDelta, accessTime == -1 ? entry.accessTime() : accessTime, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java index 380eced..bda973f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java @@ -10,6 +10,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.util.typedef.internal.*; @@ -138,9 +139,23 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT checkValue(key, 63); + IgniteCache<Integer, Integer> asyncCache = cache.enableAsync(); + + assertTrue(asyncCache.isAsync()); + + assertNull(asyncCache.invoke(key, incProcessor)); + + IgniteFuture<Integer> fut = asyncCache.future(); + + assertNotNull(fut); + + assertEquals(63, (int)fut.get()); + + checkValue(key, 64); + tx = startTx(txMode); - assertNull(cache.invoke(key, new RemoveProcessor(63))); + assertNull(cache.invoke(key, new RemoveProcessor(64))); if (tx != null) tx.commit(); @@ -374,6 +389,45 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT for (Integer key : keys) checkValue(key, null); + + IgniteCache<Integer, Integer> asyncCache = cache.enableAsync(); + + assertTrue(asyncCache.isAsync()); + + assertNull(asyncCache.invokeAll(keys, new IncrementProcessor())); + + IgniteFuture<Map<Integer, EntryProcessorResult<Integer>>> fut = asyncCache.future(); + + resMap = fut.get(); + + exp = new HashMap<>(); + + for (Integer key : keys) + exp.put(key, -1); + + checkResult(resMap, exp); + + for (Integer key : keys) + checkValue(key, 1); + + invokeMap = new HashMap<>(); + + for (Integer key : keys) + invokeMap.put(key, incProcessor); + + assertNull(asyncCache.invokeAll(invokeMap)); + + fut = asyncCache.future(); + + resMap = fut.get(); + + for (Integer key : keys) + exp.put(key, 1); + + checkResult(resMap, exp); + + for (Integer key : keys) + checkValue(key, 2); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java index a6f6e14..54397cc 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -926,6 +926,22 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertEquals("3", res.get("key3").get()); assertEquals(3, res.size()); + + cache.remove("key1"); + cache.put("key2", 1); + cache.put("key3", 3); + + res = cache.invokeAll(F.asMap("key1", INCR_PROCESSOR, "key2", INCR_PROCESSOR, "key3", INCR_PROCESSOR)); + + assertEquals((Integer)1, cache.get("key1")); + assertEquals((Integer)2, cache.get("key2")); + assertEquals((Integer)4, cache.get("key3")); + + assertEquals("null", res.get("key1").get()); + assertEquals("1", res.get("key2").get()); + assertEquals("3", res.get("key3").get()); + + assertEquals(3, res.size()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java index 66abdc6..3f3d3a6 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java @@ -1194,8 +1194,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst case UPDATE_FILTER: { Object old = cache.getAndRemoveIf(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() { - @Override - public boolean apply(GridCacheEntry<String, Integer> entry) { + @Override public boolean apply(GridCacheEntry<String, Integer> entry) { return true; } }); @@ -1207,8 +1206,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst case TRANSFORM: { cache.invoke(key, new EntryProcessor<String, Integer, Void>() { - @Override - public Void process(MutableEntry<String, Integer> e, Object... args) { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { Integer old = e.getValue(); assertEquals(expOld, old); @@ -1242,8 +1240,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst case UPDATE_FILTER: { Object old = cache.getAndPutIf(key, val, new P1<GridCacheEntry<String, Integer>>() { - @Override - public boolean apply(GridCacheEntry<String, Integer> entry) { + @Override public boolean apply(GridCacheEntry<String, Integer> entry) { return true; } }); @@ -1255,8 +1252,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst case TRANSFORM: { cache.invoke(key, new EntryProcessor<String, Integer, Void>() { - @Override - public Void process(MutableEntry<String, Integer> e, Object... args) { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { Integer old = e.getValue(); assertEquals(expOld, old); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java index af729a1..89c1efa 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java @@ -19,6 +19,7 @@ import org.gridgain.grid.util.typedef.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.util.*; /** @@ -423,7 +424,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public IgniteBiTuple<Boolean, Object> innerUpdateLocal(GridCacheVersion ver, + @Override public GridTuple3<Boolean, V, EntryProcessorResult<Object>> innerUpdateLocal(GridCacheVersion ver, GridCacheOperation op, @Nullable Object writeObj, @Nullable Object[] invokeArgs, @@ -437,7 +438,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme UUID subjId, String taskName) throws IgniteCheckedException, GridCacheEntryRemovedException { - return new IgniteBiTuple<>(false, null); + return new GridTuple3<>(false, null, null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0da9afad/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java index 23d98c1..6b3260f 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java @@ -652,7 +652,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } /** - * + * @param jobId Job ID. + * @param plan Map-reduce plan. */ private void printPlan(GridHadoopJobId jobId, GridHadoopMapReducePlan plan) { log.info("Plan for " + jobId); @@ -1325,9 +1326,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * @param prev Previous closure. * @param splits Mapper splits to remove. + * @param err Error. */ private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<GridHadoopInputSplit> splits, - Throwable err) { + Throwable err) { super(prev); this.splits = splits; @@ -1382,6 +1384,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * @param prev Previous closure. * @param rdc Reducer to remove. + * @param err Error. */ private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) { super(prev);