# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8b86bdec Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8b86bdec Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8b86bdec Branch: refs/heads/ignite-51 Commit: 8b86bdec24df32014a06040b5551220f7554e375 Parents: 772c5b6 Author: sboikov <sboi...@gridgain.com> Authored: Wed Feb 25 10:35:26 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Feb 25 10:35:26 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 2 +- .../processors/cache/GridCacheAtomicFuture.java | 4 +- .../processors/cache/GridCacheContext.java | 9 +- .../processors/cache/GridCacheMvccManager.java | 6 +- .../processors/cache/GridCacheSwapManager.java | 4 +- .../GridDistributedCacheAdapter.java | 26 +-- .../distributed/dht/GridDhtCacheAdapter.java | 126 ++++++------- .../distributed/dht/GridDhtLocalPartition.java | 41 ++--- .../dht/atomic/GridDhtAtomicCache.java | 176 ++++++++++--------- .../dht/atomic/GridDhtAtomicCacheEntry.java | 13 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 62 +++---- .../dht/atomic/GridNearAtomicUpdateFuture.java | 126 ++++++------- .../colocated/GridDhtColocatedCacheEntry.java | 13 +- .../colocated/GridDhtDetachedCacheEntry.java | 25 +-- .../distributed/near/GridNearCacheAdapter.java | 2 +- .../distributed/near/GridNearCacheEntry.java | 125 +++++++------ .../cache/local/GridLocalCacheEntry.java | 23 ++- .../datastructures/DataStructuresProcessor.java | 6 +- .../loadtests/hashmap/GridHashMapLoadTest.java | 10 +- 19 files changed, 422 insertions(+), 377 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index bb6519f..b74587f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -566,7 +566,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, TransactionIsolation isolation, boolean invalidate, long accessTtl, - IgnitePredicate<Cache.Entry<Object, V>>[] filter); + IgnitePredicate<Cache.Entry<K, V>>[] filter); /** * Post constructor initialization for subclasses. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index ec0999a..8dca8d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -22,7 +22,7 @@ import java.util.*; /** * Update future for atomic cache. */ -public interface GridCacheAtomicFuture<K, R> extends GridCacheFuture<R> { +public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { /** * @return {@code True} if partition exchange should wait for this future to complete. */ @@ -36,7 +36,7 @@ public interface GridCacheAtomicFuture<K, R> extends GridCacheFuture<R> { /** * @return Future keys. */ - public Collection<? extends K> keys(); + public Collection<Object> keys(); /** * Checks if timeout occurred. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 0a5bded..b6c0190 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -479,7 +479,7 @@ public class GridCacheContext<K, V> implements Externalizable { cache.map().incrementSize(e); if (isDht() || isColocated() || isDhtAtomic()) { - GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), -1, false); + GridDhtLocalPartition part = topology().localPartition(e.partition(), -1, false); if (part != null) part.incrementPublicSize(); @@ -497,7 +497,7 @@ public class GridCacheContext<K, V> implements Externalizable { cache.map().decrementSize(e); if (isDht() || isColocated() || isDhtAtomic()) { - GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), -1, false); + GridDhtLocalPartition part = topology().localPartition(e.partition(), -1, false); if (part != null) part.decrementPublicSize(); @@ -972,8 +972,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @return No get-value filter. */ - public IgnitePredicate<Cache.Entry<Object, Object>>[] noPeekArray() { - return noPeekArr; + @SuppressWarnings("unchecked") + public <K, V> IgnitePredicate<Cache.Entry<K, V>>[] noPeekArray() { + return (IgnitePredicate<Cache.Entry<K, V>>[])((IgnitePredicate[])noPeekArr); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 0692f3e..8b5d22d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -83,7 +83,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, Collection<GridCacheFuture<?>>> futs = newMap(); /** Pending atomic futures. */ - private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<KeyCacheObject, ?>> atomicFuts = + private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>(); /** Near to DHT version mapping. */ @@ -315,7 +315,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @param futVer Future ID. * @param fut Future. */ - public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<KeyCacheObject, ?> fut) { + public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); assert old == null; @@ -324,7 +324,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * @return Collection of pending atomic futures. */ - public Collection<GridCacheAtomicFuture<KeyCacheObject, ?>> atomicFutures() { + public Collection<GridCacheAtomicFuture<?>> atomicFutures() { return atomicFuts.values(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 2b2a277..587ba5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1322,7 +1322,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @return Iterator over off-heap keys. */ - public Iterator<KeyCacheObject> offHeapKeyIterator() { + public Iterator<KeyCacheObject> offHeapKeyIterator(boolean primary, boolean backup, long topVer) { // TODO IGNITE-51. return null; } @@ -1330,7 +1330,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @return Iterator over off-heap keys. */ - public Iterator<KeyCacheObject> swapKeyIterator() { + public Iterator<KeyCacheObject> swapKeyIterator(boolean primary, boolean backup, long topVer) { // TODO IGNITE-51. return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index f366fcd..497af0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -68,7 +68,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param ctx Cache context. * @param map Cache map. */ - protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { + protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { super(ctx, map); } @@ -76,7 +76,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter @Override public IgniteInternalFuture<Boolean> txLockAsync( Collection<? extends K> keys, long timeout, - IgniteTxLocalEx<K, V> tx, + IgniteTxLocalEx tx, boolean isRead, boolean retval, TransactionIsolation isolation, @@ -92,7 +92,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, IgnitePredicate<Cache.Entry<K, V>>... filter) { - IgniteTxLocalEx<K, V> tx = ctx.tm().userTxx(); + IgniteTxLocalEx tx = ctx.tm().userTxx(); // Return value flag is true because we choose to bring values for explicit locks. return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, -1L, filter); @@ -112,7 +112,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter */ protected abstract IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - @Nullable IgniteTxLocalEx<K, V> tx, + @Nullable IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, boolean retval, @@ -124,7 +124,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param key Key to remove. * @param ver Version to remove. */ - public void removeVersionedEntry(K key, GridCacheVersion ver) { + public void removeVersionedEntry(KeyCacheObject key, GridCacheVersion ver) { GridCacheEntryEx entry = peekEx(key); if (entry == null) @@ -276,27 +276,27 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter else dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; - try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) { - dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched()); + try (IgniteDataLoader<KeyCacheObject, Object> dataLdr = ignite.dataLoader(cacheName)) { + dataLdr.updater(GridDataLoadCacheUpdaters.<KeyCacheObject, Object>batched()); - for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) { + for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { if (!locPart.isEmpty() && locPart.primary(topVer)) { - for (GridDhtCacheEntry<K, V> o : locPart.entries()) { + for (GridDhtCacheEntry o : locPart.entries()) { if (!o.obsoleteOrDeleted()) dataLdr.removeData(o.key()); } } } - Iterator<Cache.Entry<K, V>> it = dht.context().swap().offheapIterator(true, false, topVer); + Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer); while (it.hasNext()) - dataLdr.removeData(it.next().getKey()); + dataLdr.removeData(it.next()); - it = dht.context().swap().swapIterator(true, false, topVer); + it = dht.context().swap().swapKeyIterator(true, false, topVer); while (it.hasNext()) - dataLdr.removeData(it.next().getKey()); + dataLdr.removeData(it.next()); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 34e5909..104b972 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -86,7 +86,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param ctx Cache context. * @param map Cache map. */ - protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { + protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { super(ctx, map); top = new GridDhtPartitionTopologyImpl<>(ctx); @@ -94,11 +94,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + map.setEntryFactory(new GridCacheMapEntryFactory() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { - return new GridDhtCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); + @Override public GridCacheMapEntry create(GridCacheContext ctx, long topVer, KeyCacheObject key, int hash, + CacheObject val, GridCacheMapEntry next, long ttl, int hdrId) { + return new GridDhtCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId); } }); } @@ -107,8 +107,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest<K, V> req) { + ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() { + @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) { processTtlUpdateRequest(req); } }); @@ -282,8 +282,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param key Key. * @return DHT entry. */ - @Nullable public GridDhtCacheEntry<K, V> peekExx(K key) { - return (GridDhtCacheEntry<K, V>)peekEx(key); + @Nullable public GridDhtCacheEntry peekExx(KeyCacheObject key) { + return (GridDhtCacheEntry)peekEx(key); } /** @@ -300,7 +300,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - @Override public GridCacheEntryEx entryEx(K key, boolean touch) throws GridDhtInvalidPartitionException { + @Override public GridCacheEntryEx entryEx(KeyCacheObject key, boolean touch) + throws GridDhtInvalidPartitionException + { return super.entryEx(key, touch); } @@ -309,7 +311,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - @Override public GridCacheEntryEx entryEx(K key, long topVer) throws GridDhtInvalidPartitionException { + @Override public GridCacheEntryEx entryEx(KeyCacheObject key, long topVer) throws GridDhtInvalidPartitionException { return super.entryEx(key, topVer); } @@ -318,8 +320,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return DHT entry. * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - public GridDhtCacheEntry<K, V> entryExx(K key) throws GridDhtInvalidPartitionException { - return (GridDhtCacheEntry<K, V>)entryEx(key); + public GridDhtCacheEntry entryExx(KeyCacheObject key) throws GridDhtInvalidPartitionException { + return (GridDhtCacheEntry)entryEx(key); } /** @@ -328,8 +330,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return DHT entry. * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - public GridDhtCacheEntry<K, V> entryExx(K key, long topVer) throws GridDhtInvalidPartitionException { - return (GridDhtCacheEntry<K, V>)entryEx(key, topVer); + public GridDhtCacheEntry entryExx(KeyCacheObject key, long topVer) throws GridDhtInvalidPartitionException { + return (GridDhtCacheEntry)entryEx(key, topVer); } /** @@ -344,17 +346,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @throws GridDhtInvalidPartitionException if entry does not belong to this node and * {@code allowDetached} is {@code false}. */ - public GridCacheEntryEx entryExx(K key, long topVer, boolean allowDetached, boolean touch) { + public GridCacheEntryEx entryExx(KeyCacheObject key, long topVer, boolean allowDetached, boolean touch) { try { return allowDetached && !ctx.affinity().localNode(key, topVer) ? - new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : + new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0, 0) : entryEx(key, touch); } catch (GridDhtInvalidPartitionException e) { if (!allowDetached) throw e; - return new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0); + return new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0, 0); } } @@ -376,8 +378,15 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); - ctx.store().loadAllFromStore(null, keys, new CI2<K, V>() { - @Override public void apply(K key, V val) { + // TODO IGNITE-51. + Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() { + @Override public KeyCacheObject apply(K key) { + return ctx.toCacheKeyObject(key); + } + }); + + ctx.store().loadAllFromStore(null, keys0, new CI2<KeyCacheObject, Object>() { + @Override public void apply(KeyCacheObject key, Object val) { loadEntry(key, val, ver0, null, topVer, replicate, plc0); } }); @@ -404,8 +413,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); - ctx.store().loadCache(new CI3<K, V, GridCacheVersion>() { - @Override public void apply(K key, V val, @Nullable GridCacheVersion ver) { + ctx.store().loadCache(new CI3<KeyCacheObject, Object, GridCacheVersion>() { + @Override public void apply(KeyCacheObject key, Object val, @Nullable GridCacheVersion ver) { assert ver == null; loadEntry(key, val, ver0, p, topVer, replicate, plc); @@ -422,18 +431,18 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param replicate Replication flag. * @param plc Expiry policy. */ - private void loadEntry(K key, - V val, + private void loadEntry(KeyCacheObject key, + Object val, GridCacheVersion ver, @Nullable IgniteBiPredicate<K, V> p, long topVer, boolean replicate, @Nullable ExpiryPolicy plc) { - if (p != null && !p.apply(key, val)) + if (p != null && !p.apply(key.<K>value(ctx), (V)val)) return; try { - GridDhtLocalPartition<K, V> part = top.localPartition(ctx.affinity().partition(key), -1, true); + GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key), -1, true); // Reserve to make sure that partition does not get unloaded. if (part.reserve()) { @@ -445,14 +454,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (ttl == CU.TTL_ZERO) return; - if (ctx.portableEnabled()) { - key = (K)ctx.marshalToPortable(key); - val = (V)ctx.marshalToPortable(val); - } + CacheObject cacheVal = ctx.toCacheObject(val); entry = entryEx(key, false); - entry.initialValue(val, null, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer, + entry.initialValue(cacheVal, null, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer, replicate ? DR_LOAD : DR_NONE); } catch (IgniteCheckedException e) { @@ -485,7 +491,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap long topVer = ctx.affinity().affinityTopologyVersion(); - for (GridDhtLocalPartition<K, V> p : topology().currentLocalPartitions()) { + for (GridDhtLocalPartition p : topology().currentLocalPartitions()) { if (p.primary(topVer)) sum += p.publicSize(); } @@ -504,7 +510,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( - @Nullable Collection<? extends KeyCacheObject> keys, + @Nullable Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, @Nullable GridCacheEntryEx entry, @@ -578,7 +584,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param expiry Expiry policy. * @return DHT future. */ - public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader, + public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader, long msgId, LinkedHashMap<? extends K, Boolean> keys, boolean readThrough, @@ -612,14 +618,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param nodeId Node ID. * @param req Get request. */ - protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest<K, V> req) { + protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { assert isAffinityNode(cacheCfg); long ttl = req.accessTtl(); final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl); - IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut = + IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = getDhtAsync(nodeId, req.messageId(), req.keys(), @@ -632,18 +638,18 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap expiryPlc, req.skipValues()); - fut.listenAsync(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>>>() { - @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> f) { - GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(), + fut.listenAsync(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() { + @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) { + GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(), req.futureId(), req.miniId(), req.version()); - GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut = - (GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>>)f; + GridDhtFuture<Collection<GridCacheEntryInfo>> fut = + (GridDhtFuture<Collection<GridCacheEntryInfo>>)f; try { - Collection<GridCacheEntryInfo<K, V>> entries = fut.get(); + Collection<GridCacheEntryInfo> entries = fut.get(); res.entries(entries); } @@ -680,7 +686,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap assert entries != null && !entries.isEmpty(); - Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = new HashMap<>(); + Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>(); long topVer = ctx.discovery().topologyVersion(); @@ -691,11 +697,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap ClusterNode node = nodes.get(i); if (!node.isLocal()) { - GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node); + GridCacheTtlUpdateRequest req = reqMap.get(node); if (req == null) { reqMap.put(node, - req = new GridCacheTtlUpdateRequest<>(topVer, expiryPlc.forAccess())); + req = new GridCacheTtlUpdateRequest(topVer, expiryPlc.forAccess())); req.cacheId(ctx.cacheId()); } @@ -714,10 +720,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap ClusterNode node = ctx.node(e.getKey()); if (node != null) { - GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node); + GridCacheTtlUpdateRequest req = reqMap.get(node); if (req == null) { - reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(topVer, + reqMap.put(node, req = new GridCacheTtlUpdateRequest(topVer, expiryPlc.forAccess())); req.cacheId(ctx.cacheId()); @@ -729,7 +735,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } } - for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) { + for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest> req : reqMap.entrySet()) { try { ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy()); } @@ -745,7 +751,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * @param req Request. */ - private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) { + private void processTtlUpdateRequest(GridCacheTtlUpdateRequest req) { if (req.keys() != null) updateTtl(this, req.keys(), req.versions(), req.ttl()); @@ -765,7 +771,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param ttl TTL. */ private void updateTtl(GridCacheAdapter<K, V> cache, - List<K> keys, + List<KeyCacheObject> keys, List<GridCacheVersion> vers, long ttl) { assert !F.isEmpty(keys); @@ -834,10 +840,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @NotNull @Override public Iterator<Cache.Entry<K, V>> iterator() { - final GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId, + final GridDhtLocalPartition part = ctx.topology().localPartition(partId, ctx.discovery().topologyVersion(), false); - Iterator<GridDhtCacheEntry<K, V>> partIt = part == null ? null : part.entries().iterator(); + Iterator<GridDhtCacheEntry> partIt = part == null ? null : part.entries().iterator(); return new PartitionEntryIterator<>(partIt); } @@ -887,7 +893,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public int size() { - GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId, + GridDhtLocalPartition part = ctx.topology().localPartition(partId, ctx.discovery().topologyVersion(), false); return part != null ? part.publicSize() : 0; @@ -911,7 +917,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) { assert entry.isDht(); - GridDhtLocalPartition<K, V> part = topology().localPartition(entry.partition(), -1, false); + GridDhtLocalPartition part = topology().localPartition(entry.partition(), -1, false); // Do not remove entry on replica topology. Instead, add entry to removal queue. // It will be cleared eventually. @@ -938,12 +944,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap else { final long topVer = ctx.affinity().affinityTopologyVersion(); - final Iterator<GridDhtLocalPartition<K, V>> partIt = topology().currentLocalPartitions().iterator(); + final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator(); Iterator<GridCacheEntryEx> it = new Iterator<GridCacheEntryEx>() { private GridCacheEntryEx next; - private Iterator<GridDhtCacheEntry<K, V>> curIt; + private Iterator<GridDhtCacheEntry> curIt; { advance(); @@ -974,7 +980,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap do { if (curIt == null) { while (partIt.hasNext()) { - GridDhtLocalPartition<K, V> part = partIt.next(); + GridDhtLocalPartition part = partIt.next(); if (primary == part.primary(topVer)) { curIt = part.entries().iterator(); @@ -1016,12 +1022,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap private Cache.Entry<K, V> last; /** Partition iterator. */ - private final Iterator<GridDhtCacheEntry<K, V>> partIt; + private final Iterator<GridDhtCacheEntry> partIt; /** * @param partIt Partition iterator. */ - private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry<K, V>> partIt) { + private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry> partIt) { this.partIt = partIt; advance(); @@ -1058,7 +1064,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap private void advance() { if (partIt != null) { while (partIt.hasNext()) { - GridDhtCacheEntry<K, V> next = partIt.next(); + GridDhtCacheEntry next = partIt.next(); if (next.isInternal() || !next.visitable(CU.<K, V>empty())) continue; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 3f40372..1af61bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -44,7 +44,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Key partition. */ -public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalPartition> { +public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> { /** Maximum size for delete queue. */ private static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); @@ -68,10 +68,10 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti private final GridFutureAdapter<?> rent; /** Entries map. */ - private final ConcurrentMap<K, GridDhtCacheEntry> map; + private final ConcurrentMap<KeyCacheObject, GridDhtCacheEntry> map; /** Context. */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext cctx; /** Create time. */ @GridToStringExclude @@ -87,14 +87,14 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti private final LongAdder mapPubSize = new LongAdder(); /** Remove queue. */ - private GridCircularBuffer<T2<K, GridCacheVersion>> rmvQueue; + private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue; /** * @param cctx Context. * @param id Partition ID. */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - GridDhtLocalPartition(GridCacheContext<K, V> cctx, int id) { + GridDhtLocalPartition(GridCacheContext cctx, int id) { assert cctx != null; this.id = id; @@ -148,7 +148,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti /** * @return Entries belonging to partition. */ - public Collection<GridDhtCacheEntry<K, V>> entries() { + public Collection<GridDhtCacheEntry> entries() { return map.values(); } @@ -199,7 +199,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti /** * @param entry Entry to add. */ - void onAdded(GridDhtCacheEntry<K, V> entry) { + void onAdded(GridDhtCacheEntry entry) { GridDhtPartitionState state = state(); assert state != EVICTED : "Adding entry to invalid partition: " + this; @@ -214,7 +214,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * @param entry Entry to remove. */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - void onRemoved(GridDhtCacheEntry<K, V> entry) { + void onRemoved(GridDhtCacheEntry entry) { assert entry.obsolete(); // Make sure to remove exactly this entry. @@ -234,9 +234,9 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * @param ver Removed version. * @throws IgniteCheckedException If failed. */ - public void onDeferredDelete(K key, GridCacheVersion ver) throws IgniteCheckedException { + public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException { try { - T2<K, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver)); + T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver)); if (evicted != null) cctx.dht().removeVersionedEntry(evicted.get1(), evicted.get2()); @@ -295,7 +295,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * @param ver Version. * @return {@code True} if preloading is permitted. */ - public boolean preloadingPermitted(K key, GridCacheVersion ver) { + public boolean preloadingPermitted(KeyCacheObject key, GridCacheVersion ver) { assert key != null; assert ver != null; assert lock.isHeldByCurrentThread(); // Only one thread can enter this method at a time. @@ -433,7 +433,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti rent.onDone(); - ((GridDhtPreloader<K, V>)cctx.preloader()).onPartitionEvicted(this, updateSeq); + ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); clearDeferredDeletes(); @@ -469,7 +469,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti rent.onDone(); - ((GridDhtPreloader<K, V>)cctx.preloader()).onPartitionEvicted(this, updateSeq); + ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); clearDeferredDeletes(); @@ -486,16 +486,17 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti assert state() == EVICTED; try { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> it = cctx.swap().iterator(id, false); + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = cctx.swap().iterator(id, false); if (it != null) { // We can safely remove these values because no entries will be created for evicted partition. while (it.hasNext()) { - Map.Entry<byte[], GridCacheSwapEntry<V>> entry = it.next(); + Map.Entry<byte[], GridCacheSwapEntry> entry = it.next(); byte[] keyBytes = entry.getKey(); - K key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader()); + // TODO IGNITE-51. + KeyCacheObject key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader()); cctx.swap().remove(key, keyBytes); } @@ -531,8 +532,8 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti boolean rec = cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_UNLOADED); - for (Iterator<GridDhtCacheEntry<K, V>> it = map.values().iterator(); it.hasNext();) { - GridDhtCacheEntry<K, V> cached = it.next(); + for (Iterator<GridDhtCacheEntry> it = map.values().iterator(); it.hasNext();) { + GridDhtCacheEntry cached = it.next(); try { if (cached.clearInternal(clearVer, swap)) { @@ -558,8 +559,8 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * */ private void clearDeferredDeletes() { - rmvQueue.forEach(new CI1<T2<K, GridCacheVersion>>() { - @Override public void apply(T2<K, GridCacheVersion> t) { + rmvQueue.forEach(new CI1<T2<KeyCacheObject, GridCacheVersion>>() { + @Override public void apply(T2<KeyCacheObject, GridCacheVersion> t) { cctx.dht().removeVersionedEntry(t.get1(), t.get2()); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2968d82..8c847dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -359,7 +359,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<V> putIfAbsentAsync(K key, V val) { A.notNull(key, "key", val, "val"); - return putAsync(key, val, ctx.noPeekArray()); + return putAsync(key, val, ctx.<K, V>noPeekArray()); } /** {@inheritDoc} */ @@ -371,7 +371,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<Boolean> putxIfAbsentAsync(K key, V val) { A.notNull(key, "key", val, "val"); - return putxAsync(key, val, ctx.noPeekArray()); + return putxAsync(key, val, ctx.<K, V>noPeekArray()); } /** {@inheritDoc} */ @@ -436,7 +436,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) { if (ctx.portableEnabled()) oldVal = (V)ctx.marshalToPortable(oldVal); @@ -626,7 +626,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - @Nullable IgniteTxLocalEx<K, V> tx, + @Nullable IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, boolean retval, @@ -1078,7 +1078,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1); - GridDhtAtomicUpdateFuture<K, V> dhtFut = null; + GridDhtAtomicUpdateFuture dhtFut = null; boolean remap = false; @@ -1089,8 +1089,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { // If batch store update is enabled, we need to lock all entries. // First, need to acquire locks on cache entries, then check filter. - List<GridDhtCacheEntry<K, V>> locked = lockEntries(keys, req.topologyVersion()); - Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted = null; + List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion()); + Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; try { topology().readLock(); @@ -1202,7 +1202,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !deleted.isEmpty(); assert ctx.deferredDelete(); - for (IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion> e : deleted) + for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted) ctx.onDeferredDelete(e.get1(), e.get2()); } } @@ -1256,12 +1256,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private UpdateBatchResult<K, V> updateWithBatch( ClusterNode node, boolean hasNear, - GridNearAtomicUpdateRequest<K, V> req, - GridNearAtomicUpdateResponse<K, V> res, - List<GridDhtCacheEntry<K, V>> locked, + GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res, + List<GridDhtCacheEntry> locked, GridCacheVersion ver, - @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut, - CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, + @Nullable GridDhtAtomicUpdateFuture dhtFut, + CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry @@ -1282,27 +1282,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { int size = req.keys().size(); - Map<K, V> putMap = null; + Map<KeyCacheObject, CacheObject> putMap = null; - Map<K, EntryProcessor<K, V, ?>> entryProcessorMap = null; + Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap = null; - Collection<K> rmvKeys = null; + Collection<KeyCacheObject> rmvKeys = null; UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>(); - List<GridDhtCacheEntry<K, V>> filtered = new ArrayList<>(size); + List<GridDhtCacheEntry> filtered = new ArrayList<>(size); GridCacheOperation op = req.operation(); - Map<K, EntryProcessorResult> invokeResMap = - op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null; + Map<KeyCacheObject, EntryProcessorResult> invokeResMap = + op == TRANSFORM ? U.<KeyCacheObject, EntryProcessorResult>newHashMap(size) : null; int firstEntryIdx = 0; boolean intercept = ctx.config().getInterceptor() != null; for (int i = 0; i < locked.size(); i++) { - GridDhtCacheEntry<K, V> entry = locked.get(i); + GridDhtCacheEntry entry = locked.get(i); if (entry == null) continue; @@ -1335,9 +1335,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (op == TRANSFORM) { - EntryProcessor<K, V, ?> entryProcessor = req.entryProcessor(i); + EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); - V old = entry.innerGet( + CacheObject old = entry.innerGet( null, /*read swap*/true, /*read through*/true, @@ -1351,9 +1351,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, null); - CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(ctx, entry.key(), old); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, + entry.key().value(ctx), + old.value(ctx)); - V updated; + Object updated; CacheInvokeResult invokeRes = null; try { @@ -1571,11 +1573,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param entries Entries. * @throws IgniteCheckedException If failed. */ - private void reloadIfNeeded(final List<GridDhtCacheEntry<K, V>> entries) throws IgniteCheckedException { + private void reloadIfNeeded(final List<GridDhtCacheEntry> entries) throws IgniteCheckedException { Map<K, Integer> needReload = null; for (int i = 0; i < entries.size(); i++) { - GridDhtCacheEntry<K, V> entry = entries.get(i); + GridDhtCacheEntry entry = entries.get(i); if (entry == null) continue; @@ -1598,7 +1600,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Integer idx = idxMap.get(k); if (idx != null) { - GridDhtCacheEntry<K, V> entry = entries.get(idx); + GridDhtCacheEntry entry = entries.get(idx); try { GridCacheVersion ver = entry.version(); @@ -1637,18 +1639,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private UpdateSingleResult<K, V> updateSingle( ClusterNode node, boolean hasNear, - GridNearAtomicUpdateRequest<K, V> req, - GridNearAtomicUpdateResponse<K, V> res, - List<GridDhtCacheEntry<K, V>> locked, + GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res, + List<GridDhtCacheEntry> locked, GridCacheVersion ver, - @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut, - CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, + @Nullable GridDhtAtomicUpdateFuture dhtFut, + CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry ) throws GridCacheEntryRemovedException { GridCacheReturn<Object> retVal = null; - Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted = null; + Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; List<K> keys = req.keys(); @@ -1671,7 +1673,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // We are holding java-level locks on entries at this point. // No GridCacheEntryRemovedException can be thrown. try { - GridDhtCacheEntry<K, V> entry = locked.get(i); + GridDhtCacheEntry entry = locked.get(i); if (entry == null) continue; @@ -1861,19 +1863,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Deleted entries. */ @SuppressWarnings("ForLoopReplaceableByForEach") - @Nullable private GridDhtAtomicUpdateFuture<K, V> updatePartialBatch( + @Nullable private GridDhtAtomicUpdateFuture updatePartialBatch( boolean hasNear, int firstEntryIdx, - List<GridDhtCacheEntry<K, V>> entries, + List<GridDhtCacheEntry> entries, final GridCacheVersion ver, ClusterNode node, @Nullable Map<K, V> putMap, @Nullable Collection<K> rmvKeys, @Nullable Map<K, EntryProcessor<K, V, ?>> entryProcessorMap, - @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut, - CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, - final GridNearAtomicUpdateRequest<K, V> req, - final GridNearAtomicUpdateResponse<K, V> res, + @Nullable GridDhtAtomicUpdateFuture dhtFut, + CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + final GridNearAtomicUpdateRequest req, + final GridNearAtomicUpdateResponse res, boolean replicate, UpdateBatchResult<K, V> batchRes, String taskName, @@ -1939,7 +1941,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { - GridDhtCacheEntry<K, V> entry = entries.get(i); + GridDhtCacheEntry entry = entries.get(i); assert Thread.holdsLock(entry); @@ -2103,14 +2105,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * locks are released. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private List<GridDhtCacheEntry<K, V>> lockEntries(List<KeyCacheObject> keys, long topVer) + private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, long topVer) throws GridDhtInvalidPartitionException { if (keys.size() == 1) { K key = keys.get(0); while (true) { try { - GridDhtCacheEntry<K, V> entry = entryExx(key, topVer); + GridDhtCacheEntry entry = entryExx(key, topVer); UNSAFE.monitorEnter(entry); @@ -2129,12 +2131,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } else { - List<GridDhtCacheEntry<K, V>> locked = new ArrayList<>(keys.size()); + List<GridDhtCacheEntry> locked = new ArrayList<>(keys.size()); while (true) { for (K key : keys) { try { - GridDhtCacheEntry<K, V> entry = entryExx(key, topVer); + GridDhtCacheEntry entry = entryExx(key, topVer); locked.add(entry); } @@ -2186,7 +2188,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param locked Locked entries. * @param topVer Topology version. */ - private void unlockEntries(Collection<GridDhtCacheEntry<K, V>> locked, long topVer) { + private void unlockEntries(Collection<GridDhtCacheEntry> locked, long topVer) { // Process deleted entries before locks release. assert ctx.deferredDelete(); @@ -2210,7 +2212,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } // Try evict partitions. - for (GridDhtCacheEntry<K, V> entry : locked) { + for (GridDhtCacheEntry entry : locked) { if (entry != null) entry.onUnlock(); } @@ -2264,8 +2266,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * will return false. * @return {@code True} if filter evaluation succeeded. */ - private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest<K, V> req, - GridNearAtomicUpdateResponse<K, V> res) { + private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res) { try { return ctx.isAll(entry.wrapFilterLocked(), req.filter()); } @@ -2279,7 +2281,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param req Request to remap. */ - private void remapToNewPrimary(GridNearAtomicUpdateRequest<K, V> req) { + private void remapToNewPrimary(GridNearAtomicUpdateRequest req) { if (log.isDebugEnabled()) log.debug("Remapping near update request locally: " + req); @@ -2351,11 +2353,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param force If {@code true} then creates future without optimizations checks. * @return Backup update future or {@code null} if there are no backups. */ - @Nullable private GridDhtAtomicUpdateFuture<K, V> createDhtFuture( + @Nullable private GridDhtAtomicUpdateFuture createDhtFuture( GridCacheVersion writeVer, - GridNearAtomicUpdateRequest<K, V> updateReq, - GridNearAtomicUpdateResponse<K, V> updateRes, - CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, + GridNearAtomicUpdateRequest updateReq, + GridNearAtomicUpdateResponse updateRes, + CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean force ) { if (!force) { @@ -2378,7 +2380,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - GridDhtAtomicUpdateFuture<K, V> fut = new GridDhtAtomicUpdateFuture<>(ctx, completionCb, writeVer, updateReq, + GridDhtAtomicUpdateFuture fut = new GridDhtAtomicUpdateFuture<>(ctx, completionCb, writeVer, updateReq, updateRes); ctx.mvcc().addAtomicFuture(fut.version(), fut); @@ -2411,7 +2413,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Sender node ID. * @param req Near atomic update request. */ - private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest<K, V> req) { + private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) { if (log.isDebugEnabled()) log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); @@ -2425,7 +2427,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param res Near atomic update response. */ @SuppressWarnings("unchecked") - private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse<K, V> res) { + private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { if (log.isDebugEnabled()) log.debug("Processing near atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); @@ -2444,14 +2446,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Sender node ID. * @param req Dht atomic update request. */ - private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateRequest<K, V> req) { + private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateRequest req) { if (log.isDebugEnabled()) log.debug("Processing dht atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); GridCacheVersion ver = req.writeVersion(); // Always send update reply. - GridDhtAtomicUpdateResponse<K, V> res = new GridDhtAtomicUpdateResponse<>(ctx.cacheId(), req.futureVersion()); + GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion()); Boolean replicate = ctx.isDrEnabled(); @@ -2460,18 +2462,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); for (int i = 0; i < req.size(); i++) { - K key = req.key(i); + KeyCacheObject key = req.key(i); try { while (true) { - GridDhtCacheEntry<K, V> entry = null; + GridDhtCacheEntry entry = null; try { entry = entryExx(key); - V val = req.value(i); + CacheObject val = req.value(i); byte[] valBytes = req.valueBytes(i); - EntryProcessor<K, V, ?> entryProcessor = req.entryProcessor(i); + EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null || valBytes != null) ? @@ -2481,7 +2483,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long ttl = req.ttl(i); long expireTime = req.conflictExpireTime(i); - GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate( + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, nodeId, @@ -2496,7 +2498,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/!req.forceTransformBackups(), - CU.<K, V>empty(), + CU.empty(), replicate ? DR_BACKUP : DR_NONE, ttl, expireTime, @@ -2562,8 +2564,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param locked Already locked entries (from the request). */ @SuppressWarnings("ForLoopReplaceableByForEach") - private void checkClearForceTransformBackups(GridNearAtomicUpdateRequest<K, V> req, - List<GridDhtCacheEntry<K, V>> locked) { + private void checkClearForceTransformBackups(GridNearAtomicUpdateRequest req, + List<GridDhtCacheEntry> locked) { if (ctx.writeThrough() && req.operation() == TRANSFORM) { for (int i = 0; i < locked.size(); i++) { if (!locked.get(i).hasValue()) { @@ -2609,11 +2611,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param res Dht atomic update response. */ @SuppressWarnings("unchecked") - private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse<K, V> res) { + private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) { if (log.isDebugEnabled()) log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); - GridDhtAtomicUpdateFuture<K, V> updateFut = (GridDhtAtomicUpdateFuture<K, V>)ctx.mvcc(). + GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc(). atomicFuture(res.futureVersion()); if (updateFut != null) @@ -2628,12 +2630,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param res Deferred atomic update response. */ @SuppressWarnings("unchecked") - private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse<K, V> res) { + private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) { if (log.isDebugEnabled()) log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); for (GridCacheVersion ver : res.futureVersions()) { - GridDhtAtomicUpdateFuture<K, V> updateFut = (GridDhtAtomicUpdateFuture<K, V>)ctx.mvcc().atomicFuture(ver); + GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver); if (updateFut != null) updateFut.onResult(nodeId); @@ -2647,7 +2649,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Originating node ID. * @param res Near update response. */ - private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse<K, V> res) { + private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) { try { ctx.io().send(nodeId, res, ctx.ioPolicy()); } @@ -2674,10 +2676,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private final GridCacheReturn<Object> retVal; /** */ - private final Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted; + private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; /** */ - private final GridDhtAtomicUpdateFuture<K, V> dhtFut; + private final GridDhtAtomicUpdateFuture dhtFut; /** * @param retVal Return value. @@ -2685,8 +2687,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param dhtFut DHT future. */ private UpdateSingleResult(GridCacheReturn<Object> retVal, - Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted, - GridDhtAtomicUpdateFuture<K, V> dhtFut) { + Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted, + GridDhtAtomicUpdateFuture dhtFut) { this.retVal = retVal; this.deleted = deleted; this.dhtFut = dhtFut; @@ -2702,14 +2704,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @return Deleted entries. */ - private Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted() { + private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() { return deleted; } /** * @return DHT future. */ - public GridDhtAtomicUpdateFuture<K, V> dhtFuture() { + public GridDhtAtomicUpdateFuture dhtFuture() { return dhtFut; } } @@ -2719,10 +2721,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private static class UpdateBatchResult<K, V> { /** */ - private Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted; + private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; /** */ - private GridDhtAtomicUpdateFuture<K, V> dhtFut; + private GridDhtAtomicUpdateFuture dhtFut; /** */ private boolean readersOnly; @@ -2735,9 +2737,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param updRes Entry update result. * @param entries All entries. */ - private void addDeleted(GridDhtCacheEntry<K, V> entry, - GridCacheUpdateAtomicResult<K, V> updRes, - Collection<GridDhtCacheEntry<K, V>> entries) { + private void addDeleted(GridDhtCacheEntry entry, + GridCacheUpdateAtomicResult updRes, + Collection<GridDhtCacheEntry> entries) { if (updRes.removeVersion() != null) { if (deleted == null) deleted = new ArrayList<>(entries.size()); @@ -2749,14 +2751,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @return Deleted entries. */ - private Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted() { + private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() { return deleted; } /** * @return DHT future. */ - public GridDhtAtomicUpdateFuture<K, V> dhtFuture() { + public GridDhtAtomicUpdateFuture dhtFuture() { return dhtFut; } @@ -2777,7 +2779,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param dhtFut DHT future. */ - private void dhtFuture(@Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut) { + private void dhtFuture(@Nullable GridDhtAtomicUpdateFuture dhtFut) { this.dhtFut = dhtFut; } @@ -2925,7 +2927,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * Sends deferred notification message and removes this buffer from pending responses map. */ private void finish() { - GridDhtAtomicDeferredUpdateResponse<K, V> msg = new GridDhtAtomicDeferredUpdateResponse<>(ctx.cacheId(), + GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), respVers); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java index 2fa9922..76f8a40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; /** * DHT atomic cache entry. */ -public class GridDhtAtomicCacheEntry<K, V> extends GridDhtCacheEntry<K, V> { +public class GridDhtAtomicCacheEntry extends GridDhtCacheEntry { /** * @param ctx Cache context. * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). @@ -35,8 +35,15 @@ public class GridDhtAtomicCacheEntry<K, V> extends GridDhtCacheEntry<K, V> { * @param ttl Time to live. * @param hdrId Header id. */ - public GridDhtAtomicCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val, - GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + public GridDhtAtomicCacheEntry(GridCacheContext ctx, + long topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { super(ctx, topVer, key, hash, val, next, ttl, hdrId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 92fe74b..c5cb216 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -43,8 +43,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** * DHT atomic cache backup update future. */ -public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> - implements GridCacheAtomicFuture<K, Void> { +public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implements GridCacheAtomicFuture<Void> { /** */ private static final long serialVersionUID = 0L; @@ -55,7 +54,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> protected static IgniteLogger log; /** Cache context. */ - private GridCacheContext<K, V> cctx; + private GridCacheContext cctx; /** Future version. */ private GridCacheVersion futVer; @@ -68,23 +67,23 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> /** Completion callback. */ @GridToStringExclude - private CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb; + private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb; /** Mappings. */ @GridToStringInclude - private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest<K, V>> mappings = new ConcurrentHashMap8<>(); + private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); /** Entries with readers. */ - private Map<K, GridDhtCacheEntry<K, V>> nearReadersEntries; + private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; /** Update request. */ - private GridNearAtomicUpdateRequest<K, V> updateReq; + private GridNearAtomicUpdateRequest updateReq; /** Update response. */ - private GridNearAtomicUpdateResponse<K, V> updateRes; + private GridNearAtomicUpdateResponse updateRes; /** Future keys. */ - private Collection<K> keys; + private Collection<KeyCacheObject> keys; /** Future map time. */ private volatile long mapTime; @@ -104,11 +103,12 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param updateRes Update response. */ public GridDhtAtomicUpdateFuture( - GridCacheContext<K, V> cctx, - CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, + GridCacheContext cctx, + CI2<GridNearAtomicUpdateRequest, + GridNearAtomicUpdateResponse> completionCb, GridCacheVersion writeVer, - GridNearAtomicUpdateRequest<K, V> updateReq, - GridNearAtomicUpdateResponse<K, V> updateRes + GridNearAtomicUpdateRequest updateReq, + GridNearAtomicUpdateResponse updateRes ) { super(cctx.kernalContext()); @@ -147,11 +147,11 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> if (log.isDebugEnabled()) log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); - GridDhtAtomicUpdateRequest<K, V> req = mappings.get(nodeId); + GridDhtAtomicUpdateRequest req = mappings.get(nodeId); if (req != null) { - updateRes.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Failed to write keys on backup (node left" + - " grid before response is received): " + nodeId)); + updateRes.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Failed to write keys on backup " + + "(node left grid before response is received): " + nodeId)); // Remove only after added keys to failed set. mappings.remove(nodeId); @@ -200,7 +200,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> } /** {@inheritDoc} */ - @Override public Collection<? extends K> keys() { + @Override public Collection<KeyCacheObject> keys() { return keys; } @@ -213,10 +213,10 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). */ - public void addWriteEntry(GridDhtCacheEntry<K, V> entry, - @Nullable V val, + public void addWriteEntry(GridDhtCacheEntry entry, + @Nullable CacheObject val, @Nullable byte[] valBytes, - EntryProcessor<K, V, ?> entryProcessor, + EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long conflictExpireTime, @Nullable GridCacheVersion conflictVer) { @@ -235,10 +235,10 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> UUID nodeId = node.id(); if (!nodeId.equals(ctx.localNodeId())) { - GridDhtAtomicUpdateRequest<K, V> updateReq = mappings.get(nodeId); + GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); if (updateReq == null) { - updateReq = new GridDhtAtomicUpdateRequest<>( + updateReq = new GridDhtAtomicUpdateRequest( cctx.cacheId(), nodeId, futVer, @@ -275,10 +275,10 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param expireTime Expire time for near cache update (optional). */ public void addNearWriteEntries(Iterable<UUID> readers, - GridDhtCacheEntry<K, V> entry, - @Nullable V val, + GridDhtCacheEntry entry, + @Nullable CacheObject val, @Nullable byte[] valBytes, - EntryProcessor<K, V, ?> entryProcessor, + EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long expireTime) { CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); @@ -288,7 +288,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> long topVer = updateReq.topologyVersion(); for (UUID nodeId : readers) { - GridDhtAtomicUpdateRequest<K, V> updateReq = mappings.get(nodeId); + GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); if (updateReq == null) { ClusterNode node = ctx.discovery().node(nodeId); @@ -297,7 +297,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> if (node == null) continue; - updateReq = new GridDhtAtomicUpdateRequest<>( + updateReq = new GridDhtAtomicUpdateRequest( cctx.cacheId(), nodeId, futVer, @@ -348,7 +348,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> mapTime = U.currentTimeMillis(); if (!mappings.isEmpty()) { - for (GridDhtAtomicUpdateRequest<K, V> req : mappings.values()) { + for (GridDhtAtomicUpdateRequest req : mappings.values()) { try { if (log.isDebugEnabled()) log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); @@ -384,7 +384,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param nodeId Backup node ID. * @param updateRes Update response. */ - public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse<K, V> updateRes) { + public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) { if (log.isDebugEnabled()) log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']'); @@ -392,8 +392,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error()); if (!F.isEmpty(updateRes.nearEvicted())) { - for (K key : updateRes.nearEvicted()) { - GridDhtCacheEntry<K, V> entry = nearReadersEntries.get(key); + for (KeyCacheObject key : updateRes.nearEvicted()) { + GridDhtCacheEntry entry = nearReadersEntries.get(key); try { entry.removeReader(nodeId, updateRes.messageId());