http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java index b3277ed..e10ac90 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java @@ -11,8 +11,6 @@ package org.gridgain.grid.kernal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; -import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -21,6 +19,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -50,7 +49,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp protected final GridCacheContext<?, ?> cctx; /** Cache. */ - protected final GridCacheAdapter cache; + protected final IgniteCache cache; /** Queue name. */ protected final String queueName; @@ -91,7 +90,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp cap = hdr.capacity(); collocated = hdr.collocated(); queueKey = new GridCacheQueueHeaderKey(queueName); - cache = cctx.cache(); + cache = cctx.kernalContext().cache().jcache(cctx.name()); log = cctx.logger(getClass()); @@ -130,34 +129,24 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public int size() { - try { - GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); - checkRemoved(hdr); + checkRemoved(hdr); - return hdr.size(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + return hdr.size(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public T peek() throws IgniteException { - try { - GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); - checkRemoved(hdr); + checkRemoved(hdr); - if (hdr.empty()) - return null; + if (hdr.empty()) + return null; - return (T)cache.get(itemKey(hdr.head())); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + return (T)cache.get(itemKey(hdr.head())); } /** {@inheritDoc} */ @@ -333,8 +322,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp A.ensure(batchSize >= 0, "Batch size cannot be negative: " + batchSize); try { - IgniteBiTuple<Long, Long> t = (IgniteBiTuple<Long, Long>)cache.transformAndCompute(queueKey, - new ClearClosure(id)); + IgniteBiTuple<Long, Long> t = (IgniteBiTuple<Long, Long>)cache.invoke(queueKey, + new ClearProcessor(id)); if (t == null) return; @@ -385,9 +374,16 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - static void removeKeys(GridCacheProjection cache, IgniteUuid id, String name, boolean collocated, long startIdx, - long endIdx, int batchSize) throws IgniteCheckedException { - Collection<GridCacheQueueItemKey> keys = new ArrayList<>(batchSize > 0 ? batchSize : 10); + static void removeKeys(IgniteCache cache, + IgniteUuid id, + String name, + boolean collocated, + long startIdx, + long endIdx, + int batchSize) + throws IgniteCheckedException + { + Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10); for (long idx = startIdx; idx < endIdx; idx++) { keys.add(itemKey(id, name, collocated, idx)); @@ -576,24 +572,19 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp if (next == null) throw new NoSuchElementException(); - try { - cur = next; - curIdx = idx; + cur = next; + curIdx = idx; - idx++; + idx++; - if (rmvIdxs != null) { - while (F.contains(rmvIdxs, idx) && idx < endIdx) - idx++; - } + if (rmvIdxs != null) { + while (F.contains(rmvIdxs, idx) && idx < endIdx) + idx++; + } - next = idx < endIdx ? (T)cache.get(itemKey(idx)) : null; + next = idx < endIdx ? (T)cache.get(itemKey(idx)) : null; - return cur; - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + return cur; } /** {@inheritDoc} */ @@ -646,9 +637,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** */ - protected static class ClearClosure implements IgniteClosure<GridCacheQueueHeader, - IgniteBiTuple<GridCacheQueueHeader, IgniteBiTuple<Long, Long>>>, - Externalizable { + protected static class ClearProcessor implements + EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, IgniteBiTuple<Long, Long>>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -658,30 +648,39 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** * Required by {@link Externalizable}. */ - public ClearClosure() { + public ClearProcessor() { // No-op. } /** * @param id Queue unique ID. */ - public ClearClosure(IgniteUuid id) { + public ClearProcessor(IgniteUuid id) { this.id = id; } /** {@inheritDoc} */ - @Override public IgniteBiTuple<GridCacheQueueHeader, IgniteBiTuple<Long, Long>> apply(GridCacheQueueHeader hdr) { + @Override public IgniteBiTuple<Long, Long> process( + MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) { + GridCacheQueueHeader hdr = e.getValue(); + boolean rmvd = queueRemoved(hdr, id); if (rmvd) - return new IgniteBiTuple<>(hdr, new IgniteBiTuple<>(QUEUE_REMOVED_IDX, QUEUE_REMOVED_IDX)); + return new IgniteBiTuple<>(QUEUE_REMOVED_IDX, QUEUE_REMOVED_IDX); else if (hdr.empty()) - return new IgniteBiTuple<>(hdr, null); + return null; + + GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), + hdr.capacity(), + hdr.collocated(), + hdr.tail(), + hdr.tail(), + null); - GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), hdr.collocated(), - hdr.tail(), hdr.tail(), null); + e.setValue(newHdr); - return new IgniteBiTuple<>(newHdr, new IgniteBiTuple<>(hdr.head(), hdr.tail())); + return new IgniteBiTuple<>(hdr.head(), hdr.tail()); } /** {@inheritDoc} */ @@ -697,8 +696,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** */ - protected static class PollClosure implements - IgniteClosure<GridCacheQueueHeader, IgniteBiTuple<GridCacheQueueHeader, Long>>, Externalizable { + protected static class PollProcessor implements + EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -708,31 +707,40 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** * Required by {@link Externalizable}. */ - public PollClosure() { + public PollProcessor() { // No-op. } /** * @param id Queue unique ID. */ - public PollClosure(IgniteUuid id) { + public PollProcessor(IgniteUuid id) { this.id = id; } /** {@inheritDoc} */ - @Override public IgniteBiTuple<GridCacheQueueHeader, Long> apply(GridCacheQueueHeader hdr) { + @Override public Long process( + MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) { + GridCacheQueueHeader hdr = e.getValue(); + boolean rmvd = queueRemoved(hdr, id); if (rmvd || hdr.empty()) - return new IgniteBiTuple<>(hdr, rmvd ? QUEUE_REMOVED_IDX : null); + return rmvd ? QUEUE_REMOVED_IDX : null; Set<Long> rmvdIdxs = hdr.removedIndexes(); if (rmvdIdxs == null) { - GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), hdr.collocated(), - hdr.head() + 1, hdr.tail(), rmvdIdxs); + GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), + hdr.capacity(), + hdr.collocated(), + hdr.head() + 1, + hdr.tail(), + rmvdIdxs); + + e.setValue(newHdr); - return new IgniteBiTuple<>(newHdr, hdr.head()); + return hdr.head(); } long next = hdr.head() + 1; @@ -741,19 +749,31 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp do { if (!rmvdIdxs.remove(next)) { - GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), hdr.collocated(), - next + 1, hdr.tail(), rmvdIdxs.isEmpty() ? null : rmvdIdxs); + GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), + hdr.capacity(), + hdr.collocated(), + next + 1, + hdr.tail(), + rmvdIdxs.isEmpty() ? null : rmvdIdxs); - return new IgniteBiTuple<>(newHdr, next); + e.setValue(newHdr); + + return next; } next++; } while (next != hdr.tail()); - GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), hdr.collocated(), - next, hdr.tail(), rmvdIdxs.isEmpty() ? null : rmvdIdxs); + GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), + hdr.capacity(), + hdr.collocated(), + next, + hdr.tail(), + rmvdIdxs.isEmpty() ? null : rmvdIdxs); + + e.setValue(newHdr); - return new IgniteBiTuple<>(newHdr, null); + return null; } /** {@inheritDoc} */ @@ -769,8 +789,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** */ - protected static class AddClosure implements - IgniteClosure<GridCacheQueueHeader, IgniteBiTuple<GridCacheQueueHeader, Long>>, Externalizable { + protected static class AddProcessor implements + EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -783,7 +803,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** * Required by {@link Externalizable}. */ - public AddClosure() { + public AddProcessor() { // No-op. } @@ -791,22 +811,30 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * @param id Queue unique ID. * @param size Number of elements to add. */ - public AddClosure(IgniteUuid id, int size) { + public AddProcessor(IgniteUuid id, int size) { this.id = id; this.size = size; } /** {@inheritDoc} */ - @Override public IgniteBiTuple<GridCacheQueueHeader, Long> apply(GridCacheQueueHeader hdr) { + @Override public Long process(MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) { + GridCacheQueueHeader hdr = e.getValue(); + boolean rmvd = queueRemoved(hdr, id); if (rmvd || !spaceAvailable(hdr, size)) - return new IgniteBiTuple<>(hdr, rmvd ? QUEUE_REMOVED_IDX : null); + return rmvd ? QUEUE_REMOVED_IDX : null; - GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), hdr.collocated(), - hdr.head(), hdr.tail() + size, hdr.removedIndexes()); + GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), + hdr.capacity(), + hdr.collocated(), + hdr.head(), + hdr.tail() + size, + hdr.removedIndexes()); - return new IgniteBiTuple<>(newHdr, hdr.tail()); + e.setValue(newHdr); + + return hdr.tail(); } /** @@ -833,8 +861,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** */ - protected static class RemoveClosure implements - IgniteClosure<GridCacheQueueHeader, IgniteBiTuple<GridCacheQueueHeader, Long>>, Externalizable { + protected static class RemoveProcessor implements + EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -847,7 +875,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** * Required by {@link Externalizable}. */ - public RemoveClosure() { + public RemoveProcessor() { // No-op. } @@ -855,17 +883,19 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * @param id Queue UUID. * @param idx Index of item to be removed. */ - public RemoveClosure(IgniteUuid id, Long idx) { + public RemoveProcessor(IgniteUuid id, Long idx) { this.id = id; this.idx = idx; } /** {@inheritDoc} */ - @Override public IgniteBiTuple<GridCacheQueueHeader, Long> apply(GridCacheQueueHeader hdr) { + @Override public Long process(MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) { + GridCacheQueueHeader hdr = e.getValue(); + boolean rmvd = queueRemoved(hdr, id); if (rmvd || hdr.empty() || idx < hdr.head()) - return new IgniteBiTuple<>(hdr, rmvd ? QUEUE_REMOVED_IDX : null); + return rmvd ? QUEUE_REMOVED_IDX : null; if (idx == hdr.head()) { Set<Long> rmvIdxs = hdr.removedIndexes(); @@ -873,10 +903,16 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp long head = hdr.head() + 1; if (!F.contains(rmvIdxs, head)) { - GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), hdr.collocated(), - head, hdr.tail(), hdr.removedIndexes()); + GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), + hdr.capacity(), + hdr.collocated(), + head, + hdr.tail(), + hdr.removedIndexes()); - return new IgniteBiTuple<>(newHdr, idx); + e.setValue(newHdr); + + return idx; } rmvIdxs = new HashSet<>(rmvIdxs); @@ -884,10 +920,16 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp while (rmvIdxs.remove(head)) head++; - GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), hdr.collocated(), - head, hdr.tail(), rmvIdxs.isEmpty() ? null : rmvIdxs); + GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), + hdr.capacity(), + hdr.collocated(), + head, + hdr.tail(), + rmvIdxs.isEmpty() ? null : rmvIdxs); + + e.setValue(newHdr); - return new IgniteBiTuple<>(newHdr, null); + return null; } Set<Long> rmvIdxs = hdr.removedIndexes(); @@ -907,10 +949,16 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp idx = null; } - GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), hdr.collocated(), - hdr.head(), hdr.tail(), rmvIdxs); + GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), + hdr.capacity(), + hdr.collocated(), + hdr.head(), + hdr.tail(), + rmvIdxs); + + e.setValue(newHdr); - return new IgniteBiTuple<>(newHdr, idx); + return idx; } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java index 1d58729..f038339 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java @@ -26,6 +26,9 @@ import static org.apache.ignite.transactions.IgniteTxIsolation.*; * {@link GridCacheQueue} implementation using transactional cache. */ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { + /** */ + private final IgniteTransactions txs; + /** * @param queueName Queue name. * @param hdr Queue header. @@ -33,6 +36,8 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> */ public GridTransactionalCacheQueueImpl(String queueName, GridCacheQueueHeader hdr, GridCacheContext<?, ?> cctx) { super(queueName, hdr, cctx); + + txs = cctx.kernalContext().grid().transactions(); } /** {@inheritDoc} */ @@ -47,15 +52,13 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> while (true) { try { - try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.transformAndCompute(queueKey, new AddClosure(id, 1)); + try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)); if (idx != null) { checkRemoved(idx); - boolean putx = cache.putx(itemKey(idx), item, null); - - assert putx; + cache.put(itemKey(idx), item); retVal = true; } @@ -97,13 +100,13 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> T retVal; while (true) { - try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.transformAndCompute(queueKey, new PollClosure(id)); + try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)); if (idx != null) { checkRemoved(idx); - retVal = (T)cache.remove(itemKey(idx), null); + retVal = (T)cache.getAndRemove(itemKey(idx)); assert retVal != null; } @@ -146,8 +149,8 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> int cnt = 0; while (true) { - try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.transformAndCompute(queueKey, new AddClosure(id, items.size())); + try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())); if (idx != null) { checkRemoved(idx); @@ -160,7 +163,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> idx++; } - cache.putAll(putMap, null); + cache.putAll(putMap); retVal = true; } @@ -199,13 +202,13 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> int cnt = 0; while (true) { - try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.transformAndCompute(queueKey, new RemoveClosure(id, rmvIdx)); + try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)); if (idx != null) { checkRemoved(idx); - boolean rmv = cache.removex(itemKey(idx)); + boolean rmv = cache.remove(itemKey(idx)); assert rmv; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 0644821..7b1b3a0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -454,56 +454,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException { - //transformAsync(key, transformer).get(); - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer) - throws IgniteCheckedException { - /* - return (R)updateAllAsync0(null, - Collections.singletonMap(key, new GridCacheTransformComputeClosure<>(transformer)), null, null, true, - false, null, 0, null).get(); - */ - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer, - @Nullable GridCacheEntryEx<K, V> entry, long ttl) { - /* - return updateAllAsync0(null, Collections.singletonMap(key, transformer), null, null, false, false, entry, ttl, - null); - */ - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) - throws IgniteCheckedException { - //transformAllAsync(m).get(); - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { - /* - if (F.isEmpty(m)) - return new GridFinishedFuture<Object>(ctx.kernalContext()); - - return updateAllAsync0(null, m, null, null, false, false, null, 0, null); - */ - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override public V remove(K key, @Nullable GridCacheEntryEx<K, V> entry, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException { return removeAsync(key, entry, filter).get(); @@ -680,10 +630,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { throws IgniteCheckedException { Map<K, EntryProcessorResult<T>> resMap = fut.get(); - assert resMap != null; - assert resMap.size() == 1 : resMap.size(); + if (resMap != null) { + assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); - return resMap.values().iterator().next(); + return resMap.isEmpty() ? null : resMap.values().iterator().next(); + } + + return null; } }); } @@ -717,6 +670,28 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null); } + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) { + A.notNull(map, "map"); + + if (keyCheck) + validateCacheKeys(map.keySet()); + + ctx.denyOnLocalRead(); + + return updateAllAsync0(null, + map, + args, + null, + null, + true, + false, + null, + null); + } + /** * Entry point for all public API put/transform methods. * @@ -733,7 +708,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private IgniteFuture updateAllAsync0( @Nullable final Map<? extends K, ? extends V> map, - @Nullable final Map<? extends K, EntryProcessor> invokeMap, + @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap, @Nullable Object[] invokeArgs, @Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap, @Nullable final Map<? extends K, GridCacheVersion> drRmvMap, @@ -1309,14 +1284,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old); V updated; - CacheInvokeResult invokeRes; + CacheInvokeResult invokeRes = null; try { Object computed = entryProcessor.process(invokeEntry, req.invokeArguments()); updated = ctx.unwrapTemporary(invokeEntry.getValue()); - invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); + if (computed != null) + invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); } catch (Exception e) { invokeRes = new CacheInvokeResult<>(e); @@ -1324,7 +1300,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updated = old; } - invokeResMap.put(entry.key(), invokeRes); + if (invokeRes != null) + invokeResMap.put(entry.key(), invokeRes); if (updated == null) { if (intercept) { @@ -1773,13 +1750,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (op == TRANSFORM) { assert req.returnValue(); - if (retVal == null) { - computedMap = U.newHashMap(keys.size()); + if (updRes.computedResult() != null) { + if (retVal == null) { + computedMap = U.newHashMap(keys.size()); - retVal = new GridCacheReturn<>((Object)computedMap, updRes.success()); - } + retVal = new GridCacheReturn<>((Object)computedMap, updRes.success()); + } - computedMap.put(k, updRes.computedResult()); + computedMap.put(k, updRes.computedResult()); + } } else { // Create only once. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 1bc013a..849a4fc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -324,6 +324,9 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> Object retval = res == null ? null : rawRetval ? ret : this.retval ? ret.value() : ret.success(); + if (op == TRANSFORM && retval == null) + retval = Collections.emptyMap(); + if (super.onDone(retval, err)) { cctx.mvcc().removeAtomicFuture(version()); @@ -376,7 +379,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> if (op == TRANSFORM) { assert !req.fastMap(); - addInvokeResults(res.returnValue()); + if (res.returnValue() != null) + addInvokeResults(res.returnValue()); } else if (req.fastMap() && req.hasPrimary()) opRes = res.returnValue(); @@ -840,15 +844,17 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> */ private synchronized void addInvokeResults(GridCacheReturn<Object> ret) { assert op == TRANSFORM : op; - assert ret.value() instanceof Map : ret.value(); + assert ret.value() == null || ret.value() instanceof Map : ret.value(); - if (opRes != null) { - Map<Object, Object> map = (Map<Object, Object>)opRes.value(); + if (ret.value() != null) { + if (opRes != null) { + Map<Object, Object> map = (Map<Object, Object>)opRes.value(); - map.putAll((Map<Object, Object>)ret.value()); + map.putAll((Map<Object, Object>)ret.value()); + } + else + opRes = ret; } - else - opRes = ret; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java index 5b3055a..6af4977 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -526,35 +526,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException { - dht.transform(key, transformer); - } - - /** {@inheritDoc} */ - @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer) - throws IgniteCheckedException { - return dht.transformAndCompute(key, transformer); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(K key, - IgniteClosure<V, V> transformer, - @Nullable GridCacheEntryEx<K, V> entry, - long ttl) { - return dht.transformAsync(key, transformer, entry, ttl); - } - - /** {@inheritDoc} */ - @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException { - dht.transformAll(m); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { - return dht.transformAllAsync(m); - } - - /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { @@ -569,6 +540,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) { + return dht.invokeAllAsync(map, args); + } + + /** {@inheritDoc} */ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index 650f0ab..a3b4157 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -337,92 +337,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException { - /* - ctx.denyOnLocalRead(); - - updateAllInternal(TRANSFORM, - Collections.singleton(key), - Collections.singleton(transformer), - expiryPerCall(), - false, - false, - null, - ctx.isStoreEnabled()); - */ - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer) - throws IgniteCheckedException { - /* - return (R)updateAllInternal(TRANSFORM, - Collections.singleton(key), - Collections.singleton(new GridCacheTransformComputeClosure<>(transformer)), - expiryPerCall(), - true, - false, - null, - ctx.isStoreEnabled()); - */ - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(K key, - IgniteClosure<V, V> transformer, - @Nullable GridCacheEntryEx<K, V> entry, - long ttl) { - /* - ctx.denyOnLocalRead(); - - return updateAllAsync0(null, Collections.singletonMap(key, transformer), false, false, ttl, null); - */ - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException { - /* - ctx.denyOnLocalRead(); - - if (F.isEmpty(m)) - return; - - updateAllInternal(TRANSFORM, - m.keySet(), - m.values(), - expiryPerCall(), - false, - false, - null, - ctx.isStoreEnabled()); - */ - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { - /* - ctx.denyOnLocalRead(); - - if (F.isEmpty(m)) - return new GridFinishedFuture<Object>(ctx.kernalContext()); - - return updateAllAsync0(null, m, false, false, 0, null); - */ - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public V remove(K key, @Nullable GridCacheEntryEx<K, V> entry, @@ -760,10 +674,13 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { throws IgniteCheckedException { Map<K, EntryProcessorResult<T>> resMap = fut.get(); - assert resMap != null; - assert resMap.size() == 1 : resMap.size(); + if (resMap != null) { + assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); + + return resMap.isEmpty() ? null : resMap.values().iterator().next(); + } - return resMap.values().iterator().next(); + return null; } }); } @@ -795,6 +712,26 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { null); } + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) { + A.notNull(map, "map"); + + if (keyCheck) + validateCacheKeys(map.keySet()); + + ctx.denyOnLocalRead(); + + return updateAllAsync0(null, + map, + args, + true, + false, + null); + } + /** * Entry point for public API update methods. * @@ -808,7 +745,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { */ private IgniteFuture updateAllAsync0( @Nullable final Map<? extends K, ? extends V> map, - @Nullable final Map<? extends K, EntryProcessor> invokeMap, + @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap, @Nullable final Object[] invokeArgs, final boolean retval, final boolean rawRetval, @@ -961,19 +898,21 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { taskName); if (op == TRANSFORM) { - assert t.get2() instanceof EntryProcessorResult : t.get2(); + assert t.get2() == null || t.get2() instanceof EntryProcessorResult : t.get2(); - Map<K, EntryProcessorResult> computedMap; + if (t.get2() != null) { + Map<K, EntryProcessorResult> computedMap; - if (res == null) { - computedMap = U.newHashMap(keys.size()); + if (res == null) { + computedMap = U.newHashMap(keys.size()); - res = new IgniteBiTuple<>(true, computedMap); - } - else - computedMap = (Map<K, EntryProcessorResult>)res.get2(); + res = new IgniteBiTuple<>(true, computedMap); + } + else + computedMap = (Map<K, EntryProcessorResult>)res.get2(); - computedMap.put(key, (EntryProcessorResult)t.getValue()); + computedMap.put(key, (EntryProcessorResult)t.getValue()); + } } else if (res == null) res = t; @@ -1006,8 +945,13 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (err != null) throw err; - return res == null ? null : rawRetval ? + Object ret = res == null ? null : rawRetval ? new GridCacheReturn<>(res.get2(), res.get1()) : retval ? res.get2() : res.get1(); + + if (op == TRANSFORM && ret == null) + ret = Collections.emptyMap(); + + return ret; } /** @@ -1104,14 +1048,15 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old); V updated; - CacheInvokeResult invokeRes; + CacheInvokeResult invokeRes = null; try { Object computed = entryProcessor.process(invokeEntry, invokeArgs); updated = ctx.unwrapTemporary(invokeEntry.getValue()); - invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); + if (computed != null) + invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); } catch (Exception e) { invokeRes = new CacheInvokeResult<>(e); @@ -1119,7 +1064,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updated = old; } - invokeResMap.put(entry.key(), invokeRes); + if (invokeRes != null) + invokeResMap.put(entry.key(), invokeRes); if (updated == null) { if (intercept) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java index 25c0668..588cac1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java @@ -2693,16 +2693,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public void transform(IgniteClosure<V, V> transformer) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(IgniteClosure<V, V> transformer) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Nullable @Override public V replace(V val) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java index 75453ee..991573b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java @@ -396,18 +396,6 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V> } /** {@inheritDoc} */ - @Override public void transform(IgniteClosure<V, V> transformer) throws IgniteCheckedException { - ctx.denyOnFlag(READ); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(IgniteClosure<V, V> transformer) { - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext(), false); - } - - /** {@inheritDoc} */ @Nullable @Override public V replace(V val) throws IgniteCheckedException { assert impl != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java index 1680724..a66e584 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1767,8 +1767,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> @SuppressWarnings("unchecked") @Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( GridCacheContext<K, V> cacheCtx, - boolean retval, - @Nullable Map<? extends K, EntryProcessor<K, V, Object>> map, + @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map, Object... invokeArgs ) { return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx, @@ -1776,7 +1775,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> map, invokeArgs, null, - retval, + false, null, null); } @@ -2371,7 +2370,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> res = entryProcessor.process(invokeEntry, t.get2()); } - ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res)); + if (res != null) + ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res)); } catch (Exception e) { ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult(e)); @@ -2396,7 +2396,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> private IgniteFuture putAllAsync0( final GridCacheContext<K, V> cacheCtx, @Nullable Map<? extends K, ? extends V> map, - @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap, + @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap, @Nullable final Object[] invokeArgs, @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap, final boolean retval, @@ -2444,7 +2444,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> invokeMap0 = U.newHashMap(invokeMap.size()); try { - for (Map.Entry<? extends K, EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) { + for (Map.Entry<? extends K, ? extends EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) { K key = (K)cacheCtx.marshalToPortable(e.getKey()); invokeMap0.put(key, e.getValue()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java index 12680f3..ac79a9a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java @@ -90,13 +90,11 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param cacheCtx Cache context. * @param map Entry processors map. * @param invokeArgs Optional arguments for entry processor. - * @param retval Flag indicating whether a value should be returned. * @return Transform operation future. */ public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( GridCacheContext<K, V> cacheCtx, - boolean retval, - Map<? extends K, EntryProcessor<K, V, Object>> map, + Map<? extends K, ? extends EntryProcessor<K, V, Object>> map, Object... invokeArgs); /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java index 3bb95cc..1b8f24a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java @@ -11,8 +11,10 @@ package org.gridgain.grid.kernal.processors.dataload; import org.apache.ignite.*; import org.apache.ignite.dataload.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.util.typedef.*; import org.jetbrains.annotations.*; @@ -90,13 +92,13 @@ public class GridDataLoadCacheUpdaters { * @param putMap Entries to put. * @throws IgniteCheckedException If failed. */ - protected static <K, V> void updateAll(GridCacheProjection<K,V> cache, @Nullable Collection<K> rmvCol, + protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Collection<K> rmvCol, Map<K, V> putMap) throws IgniteCheckedException { assert rmvCol != null || putMap != null; // Here we assume that there are no key duplicates, so the following calls are valid. if (rmvCol != null) - cache.removeAll(rmvCol); + ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol); if (putMap != null) cache.putAll(putMap); @@ -110,7 +112,7 @@ public class GridDataLoadCacheUpdaters { private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(GridCache<K, V> cache, Collection<Map.Entry<K, V>> entries) + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteCheckedException { assert cache != null; assert !F.isEmpty(entries); @@ -123,9 +125,9 @@ public class GridDataLoadCacheUpdaters { V val = entry.getValue(); if (val == null) - cache.removex(key); + cache.remove(key); else - cache.putx(key, val); + cache.put(key, val); } } } @@ -138,7 +140,7 @@ public class GridDataLoadCacheUpdaters { private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(GridCache<K, V> cache, Collection<Map.Entry<K, V>> entries) + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteCheckedException { assert cache != null; assert !F.isEmpty(entries); @@ -179,7 +181,7 @@ public class GridDataLoadCacheUpdaters { private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(GridCache<K, V> cache, Collection<Map.Entry<K, V>> entries) + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteCheckedException { assert cache != null; assert !F.isEmpty(entries); @@ -220,12 +222,12 @@ public class GridDataLoadCacheUpdaters { private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(GridCache<K, V> cache, Collection<Map.Entry<K, V>> entries) + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteCheckedException { assert cache != null; assert !F.isEmpty(entries); - assert cache.configuration().getAtomicityMode() != ATOMIC; + assert cache.getConfiguration(GridCacheConfiguration.class).getAtomicityMode() != ATOMIC; Map<Integer, Integer> partsCounts = new HashMap<>(); @@ -233,6 +235,8 @@ public class GridDataLoadCacheUpdaters { Map<Integer, Collection<K>> rmvPartMap = null; Map<Integer, Map<K, V>> putPartMap = null; + GridCacheAffinity<K> aff = cache.ignite().<K, V>cache(cache.getName()).affinity(); + for (Map.Entry<K, V> entry : entries) { K key = entry.getKey(); @@ -240,7 +244,7 @@ public class GridDataLoadCacheUpdaters { V val = entry.getValue(); - int part = cache.affinity().partition(key); + int part = aff.partition(key); Integer cnt = partsCounts.get(part); @@ -260,11 +264,13 @@ public class GridDataLoadCacheUpdaters { } } + IgniteTransactions txs = cache.ignite().transactions(); + for (Map.Entry<Integer, Integer> e : partsCounts.entrySet()) { Integer part = e.getKey(); int cnt = e.getValue(); - IgniteTx tx = cache.txStartPartition(part, PESSIMISTIC, REPEATABLE_READ, 0, cnt); + IgniteTx tx = txs.txStartPartition(cache.getName(), part, PESSIMISTIC, REPEATABLE_READ, 0, cnt); try { updateAll(cache, rmvPartMap == null ? null : rmvPartMap.get(part), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java index 8b01cb3..a96fc63 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java @@ -11,9 +11,8 @@ package org.gridgain.grid.kernal.processors.dataload; import org.apache.ignite.*; import org.apache.ignite.dataload.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.*; import org.gridgain.grid.kernal.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.lang.*; import org.jetbrains.annotations.*; @@ -71,18 +70,23 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { if (log.isDebugEnabled()) log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); - GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); +// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); +// +// IgniteFuture<?> f = cache.context().preloader().startFuture(); +// +// if (!f.isDone()) +// f.get(); +// +// if (ignoreDepOwnership) +// cache.context().deploy().ignoreOwnership(true); - IgniteFuture<?> f = cache.context().preloader().startFuture(); - - if (!f.isDone()) - f.get(); + IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName); if (ignoreDepOwnership) cache.context().deploy().ignoreOwnership(true); try { - updater.update(cache.<K, V>cache(), col); + updater.update(cache, col); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java index 3522cf8..0ec679d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java @@ -29,11 +29,11 @@ public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoadCacheUpda private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(GridCache<K, V> cache0, Collection<Map.Entry<K, V>> col) + @Override public void update(IgniteCache<K, V> cache0, Collection<Map.Entry<K, V>> col) throws IgniteCheckedException { - String cacheName = cache0.name(); + String cacheName = cache0.getConfiguration(GridCacheConfiguration.class).getName(); - GridKernalContext ctx = ((GridKernal)cache0.gridProjection().ignite()).context(); + GridKernalContext ctx = ((GridKernal)cache0.ignite()).context(); IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class); GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java index 068476c..c5d2363 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java @@ -277,7 +277,24 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT tx = startTx(txMode); - resMap = cache.invokeAll(keys, new RemoveProcessor(null)); + Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>(); + + for (Integer key : keys) { + switch (key % 4) { + case 0: invokeMap.put(key, new IncrementProcessor()); break; + + case 1: invokeMap.put(key, new RemoveProcessor(62)); break; + + case 2: invokeMap.put(key, new ArgumentsSumProcessor()); break; + + case 3: invokeMap.put(key, new ExceptionProcessor(62)); break; + + default: + fail(); + } + } + + resMap = cache.invokeAll(invokeMap, 10, 20, 30); if (tx != null) tx.commit(); @@ -285,11 +302,64 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT for (Integer key : keys) { final EntryProcessorResult<Integer> res = resMap.get(key); - assertNotNull("No result for " + key); + switch (key % 4) { + case 0: { + assertNotNull("No result for " + key); + + assertEquals(62, (int)res.get()); + + checkValue(key, 63); + + break; + } + + case 1: { + assertNull(res); + + checkValue(key, null); + + break; + } + + case 2: { + assertNotNull("No result for " + key); - assertNull(res.get()); + assertEquals(3, (int)res.get()); + + checkValue(key, 122); + + break; + } + + case 3: { + assertNotNull("No result for " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + res.get(); + + return null; + } + }, EntryProcessorException.class, "Test processor exception."); + + checkValue(key, 62); + + break; + } + } } + cache.invokeAll(keys, new IncrementProcessor()); + + tx = startTx(txMode); + + resMap = cache.invokeAll(keys, new RemoveProcessor(null)); + + if (tx != null) + tx.commit(); + + assertEquals("Unexpected results: " + resMap, 0, resMap.size()); + for (Integer key : keys) checkValue(key, null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 9d56c7f..a6f6e14 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -936,7 +936,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { - cache.invokeAll(null, INCR_PROCESSOR); + cache.invokeAll((Set<String>)null, INCR_PROCESSOR); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java index 0799180..7e8daf8 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java @@ -324,7 +324,7 @@ public abstract class GridCacheOffHeapTieredAbstractSelfTest extends GridCacheAb for (int i = 0; i < 100; i++) map.put(i, i); - GridCache<Integer, Integer> c = grid(0).cache(null); + IgniteCache<Integer, Integer> c = grid(0).jcache(null); Map<Integer, Integer> map0 = c.getAll(map.keySet()); @@ -339,9 +339,13 @@ public abstract class GridCacheOffHeapTieredAbstractSelfTest extends GridCacheAb for (Map.Entry<Integer, Integer> e : map.entrySet()) checkValue(e.getKey(), e.getValue()); - c.transformAll(map.keySet(), new C1<Integer, Integer>() { - @Override public Integer apply(Integer val) { - return val + 1; + c.invokeAll(map.keySet(), new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + + e.setValue(val + 1); + + return null; } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/eviction/GridCacheMockEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/eviction/GridCacheMockEntry.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/eviction/GridCacheMockEntry.java index 6519617..6827e66 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/eviction/GridCacheMockEntry.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/eviction/GridCacheMockEntry.java @@ -201,17 +201,6 @@ public class GridCacheMockEntry<K, V> extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public void transform(IgniteClosure<V, V> transformer) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(IgniteClosure<V, V> transformer) { - // No-op. - return null; - } - - /** {@inheritDoc} */ @Nullable @Override public V replace(V val) throws IgniteCheckedException { // No-op. return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java index a0db474..0143a92 100644 --- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java +++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java @@ -11,7 +11,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.dataload.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -20,6 +20,7 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.testframework.junits.common.*; +import javax.cache.processor.*; import java.util.*; import static org.gridgain.grid.cache.GridCacheDistributionMode.*; @@ -43,6 +44,8 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + // DiscoverySpi TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(IP_FINDER); @@ -114,8 +117,8 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac misses += m.misses(); } - assertEquals(CNT/2, hits); - assertEquals(CNT/2, misses); + assertEquals(CNT / 2, hits); + assertEquals(CNT / 2, misses); } finally { stopAllGrids(); @@ -146,17 +149,21 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac */ private static class IncrementingUpdater implements IgniteDataLoadCacheUpdater<Integer, Long> { /** */ - private static final IgniteClosure<Long, Long> INC = new IgniteClosure<Long, Long>() { - @Override public Long apply(Long e) { - return e == null ? 1L : e + 1; + private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() { + @Override public Void process(MutableEntry<Integer, Long> e, Object... args) { + Long val = e.getValue(); + + e.setValue(val == null ? 1 : val + 1); + + return null; } }; /** {@inheritDoc} */ - @Override public void update(GridCache<Integer, Long> cache, + @Override public void update(IgniteCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) throws IgniteCheckedException { for (Map.Entry<Integer, Long> entry : entries) - cache.transform(entry.getKey(), INC); + cache.invoke(entry.getKey(), INC); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionFilter.java ---------------------------------------------------------------------- diff --git a/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionFilter.java b/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionFilter.java index 6f7a5b2..bc293e5 100644 --- a/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionFilter.java +++ b/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionFilter.java @@ -10,6 +10,7 @@ package org.gridgain.grid.cache.websession; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; @@ -18,6 +19,7 @@ import org.gridgain.grid.startup.servlet.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; +import javax.cache.*; import javax.cache.expiry.*; import javax.servlet.*; import javax.servlet.http.*; @@ -144,7 +146,10 @@ public class GridWebSessionFilter implements Filter { public static final int DFLT_MAX_RETRIES_ON_FAIL = 3; /** Cache. */ - private GridCache<String, GridWebSession> cache; + private IgniteCache<String, GridWebSession> cache; + + /** Transactions. */ + private IgniteTransactions txs; /** Listener. */ private GridWebSessionListener lsnr; @@ -192,18 +197,20 @@ public class GridWebSessionFilter implements Filter { throw new IgniteException("Grid for web sessions caching is not started (is it configured?): " + gridName); + txs = webSesIgnite.transactions(); + log = webSesIgnite.log(); if (webSesIgnite == null) throw new IgniteException("Grid for web sessions caching is not started (is it configured?): " + gridName); - cache = webSesIgnite.cache(cacheName); + cache = webSesIgnite.jcache(cacheName); if (cache == null) throw new IgniteException("Cache for web sessions is not started (is it configured?): " + cacheName); - GridCacheConfiguration cacheCfg = cache.configuration(); + GridCacheConfiguration cacheCfg = cache.getConfiguration(GridCacheConfiguration.class); if (cacheCfg.getWriteSynchronizationMode() == FULL_ASYNC) throw new IgniteException("Cache for web sessions cannot be in FULL_ASYNC mode: " + cacheName); @@ -274,7 +281,7 @@ public class GridWebSessionFilter implements Filter { try { if (txEnabled) { - try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { sesId = doFilter0(httpReq, res, chain); tx.commit(); @@ -380,19 +387,19 @@ public class GridWebSessionFilter implements Filter { try { while (true) { try { - GridCacheProjection<String, GridWebSession> cache0; + IgniteCache<String, GridWebSession> cache0; if (cached.getMaxInactiveInterval() > 0) { long ttl = cached.getMaxInactiveInterval() * 1000; ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS, ttl)); - cache0 = ((GridCacheProjectionEx<String, GridWebSession>)cache).withExpiryPolicy(plc); + cache0 = cache.withExpiryPolicy(plc); } else cache0 = cache; - GridWebSession old = cache0.putIfAbsent(sesId, cached); + GridWebSession old = cache0.getAndPutIfAbsent(sesId, cached); if (old != null) { cached = old; @@ -403,13 +410,13 @@ public class GridWebSessionFilter implements Filter { break; } - catch (GridCachePartialUpdateException e) { + catch (CachePartialUpdateException e) { if (log.isDebugEnabled()) log.debug(e.getMessage()); } } } - catch (IgniteCheckedException e) { + catch (CacheException e) { throw new IgniteException("Failed to save session: " + sesId, e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionListener.java b/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionListener.java index 401b7ff..03a47f4 100644 --- a/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionListener.java +++ b/modules/web/src/main/java/org/gridgain/grid/cache/websession/GridWebSessionListener.java @@ -10,13 +10,15 @@ package org.gridgain.grid.cache.websession; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; @@ -30,7 +32,7 @@ class GridWebSessionListener { private static final long RETRY_DELAY = 1; /** Cache. */ - private final GridCache<String, GridWebSession> cache; + private final IgniteCache<String, GridWebSession> cache; /** Maximum retries. */ private final int retries; @@ -43,7 +45,7 @@ class GridWebSessionListener { * @param cache Cache. * @param retries Maximum retries. */ - GridWebSessionListener(Ignite ignite, GridCache<String, GridWebSession> cache, int retries) { + GridWebSessionListener(Ignite ignite, IgniteCache<String, GridWebSession> cache, int retries) { assert ignite != null; assert cache != null; @@ -60,10 +62,10 @@ class GridWebSessionListener { assert sesId != null; try { - if (cache.removex(sesId) && log.isDebugEnabled()) + if (cache.remove(sesId) && log.isDebugEnabled()) log.debug("Session destroyed: " + sesId); } - catch (IgniteCheckedException e) { + catch (CacheException e) { U.error(log, "Failed to remove session: " + sesId, e); } } @@ -84,23 +86,23 @@ class GridWebSessionListener { try { for (int i = 0; i < retries; i++) { try { - GridCacheProjection<String, GridWebSession> cache0; + IgniteCache<String, GridWebSession> cache0; if (maxInactiveInterval > 0) { long ttl = maxInactiveInterval * 1000; ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS, ttl)); - cache0 = ((GridCacheProjectionEx<String, GridWebSession>)cache).withExpiryPolicy(plc); + cache0 = cache.withExpiryPolicy(plc); } else cache0 = cache; - cache0.transform(sesId, new AttributesUpdated(updates)); + cache0.invoke(sesId, new AttributesProcessor(updates)); break; } - catch (GridCachePartialUpdateException ignored) { + catch (CachePartialUpdateException ignored) { if (i == retries - 1) { U.warn(log, "Failed to apply updates for session (maximum number of retries exceeded) [sesId=" + sesId + ", retries=" + retries + ']'); @@ -113,7 +115,7 @@ class GridWebSessionListener { } } } - catch (IgniteCheckedException e) { + catch (CacheException | IgniteCheckedException e) { U.error(log, "Failed to update session attributes [id=" + sesId + ']', e); } } @@ -126,7 +128,7 @@ class GridWebSessionListener { /** * Multiple attributes update transformer. */ - private static class AttributesUpdated implements C1<GridWebSession, GridWebSession>, Externalizable { + private static class AttributesProcessor implements EntryProcessor<String, GridWebSession, Void>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -136,26 +138,25 @@ class GridWebSessionListener { /** * Required by {@link Externalizable}. */ - public AttributesUpdated() { + public AttributesProcessor() { // No-op. } /** * @param updates Updates list. */ - AttributesUpdated(Collection<T2<String, Object>> updates) { + AttributesProcessor(Collection<T2<String, Object>> updates) { assert updates != null; this.updates = updates; } /** {@inheritDoc} */ - @SuppressWarnings("NonSerializableObjectBoundToHttpSession") - @Nullable @Override public GridWebSession apply(@Nullable GridWebSession ses) { - if (ses == null) + @Override public Void process(MutableEntry<String, GridWebSession> entry, Object... args) { + if (!entry.exists()) return null; - ses = new GridWebSession(ses); + GridWebSession ses = new GridWebSession(entry.getValue()); for (T2<String, Object> update : updates) { String name = update.get1(); @@ -170,7 +171,9 @@ class GridWebSessionListener { ses.removeAttribute(name); } - return ses; + entry.setValue(ses); + + return null; } /** {@inheritDoc} */