http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 02f799f,05500e3..23300be --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@@ -66,8 -65,15 +66,15 @@@ public class GridDhtCacheEntry extends * @param ttl Time to live. * @param hdrId Header id. */ - public GridDhtCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, - GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + public GridDhtCacheEntry(GridCacheContext ctx, - long topVer, ++ AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { super(ctx, key, hash, val, next, ttl, hdrId); // Record this entry with partition. @@@ -151,10 -157,10 +158,10 @@@ * @throws GridCacheEntryRemovedException If entry has been removed. * @throws GridDistributedLockCancelledException If lock was cancelled. */ - @Nullable public GridCacheMvccCandidate<K> addDhtLocal( + @Nullable public GridCacheMvccCandidate addDhtLocal( UUID nearNodeId, GridCacheVersion nearVer, - long topVer, + AffinityTopologyVersion topVer, long threadId, GridCacheVersion ver, long timeout, @@@ -300,26 -306,14 +307,14 @@@ * @throws GridCacheEntryRemovedException If entry has been removed. */ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"}) - @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue(AffinityTopologyVersion topVer) - @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue(long topVer) ++ @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { - if (isNew() || !valid(-1) || deletedUnlocked()) + if (isNew() || !valid(AffinityTopologyVersion.NONE) || deletedUnlocked()) return null; else { - V val0 = null; - byte[] valBytes0 = null; - - GridCacheValueBytes valBytesTuple = valueBytesUnlocked(); + CacheObject val0 = valueBytesUnlocked(); - if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V)valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); - } - else - val0 = val; - - return F.t(ver, val0, valBytes0); + return F.t(ver, val0); } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 90dde96,b957a80..76ae5a1 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@@ -74,13 -75,10 +76,10 @@@ public final class GridDhtGetFuture<K, private GridCacheVersion ver; /** Topology version .*/ - private long topVer; + private AffinityTopologyVersion topVer; /** Transaction. */ - private IgniteTxLocalEx<K, V> tx; - - /** Logger. */ - private IgniteLogger log; + private IgniteTxLocalEx tx; /** Retries because ownership changed. */ private Collection<Integer> retries = new GridLeanSet<>(); @@@ -126,14 -113,13 +114,13 @@@ GridCacheContext<K, V> cctx, long msgId, UUID reader, - LinkedHashMap<? extends K, Boolean> keys, + LinkedHashMap<KeyCacheObject, Boolean> keys, boolean readThrough, boolean reload, - @Nullable IgniteTxLocalEx<K, V> tx, + @Nullable IgniteTxLocalEx tx, - long topVer, + @NotNull AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, - boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals ) { @@@ -261,8 -238,8 +239,8 @@@ * @param parts Parts to map. * @return {@code True} if mapped. */ - private boolean map(K key, Collection<GridDhtLocalPartition> parts) { + private boolean map(KeyCacheObject key, Collection<GridDhtLocalPartition> parts) { - GridDhtLocalPartition part = topVer > 0 ? + GridDhtLocalPartition part = topVer.topologyVersion() > 0 ? cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) : cache().topology().localPartition(key, false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 4f28334,48d15aa..1e61223 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@@ -89,7 -88,7 +89,7 @@@ public interface GridDhtPartitionTopolo * @throws GridDhtInvalidPartitionException If partition is evicted or absent and * does not belong to this node. */ - @Nullable public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create) - @Nullable public GridDhtLocalPartition localPartition(int p, long topVer, boolean create) ++ @Nullable public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create) throws GridDhtInvalidPartitionException; /** @@@ -160,7 -159,7 +160,7 @@@ * @param e Entry added to cache. * @return Local partition. */ - public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e); - public GridDhtLocalPartition onAdded(long topVer, GridDhtCacheEntry e); ++ public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e); /** * @param e Entry removed from cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 2c2911b,3ec1113..86cb805 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -249,11 -247,11 +249,11 @@@ class GridDhtPartitionTopologyImpl<K, V if (cctx.preloadEnabled()) { for (int p = 0; p < num; p++) { // If this is the first node in grid. - if (oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) { - assert exchId.isJoined(); + if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) || exchId.isCacheAdded()) { + assert exchId.isJoined() || exchId.isCacheAdded(); try { - GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, true, false); + GridDhtLocalPartition locPart = localPartition(p, topVer, true, false); assert locPart != null; @@@ -451,7 -449,7 +451,7 @@@ } /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create) - @Nullable @Override public GridDhtLocalPartition localPartition(int p, long topVer, boolean create) ++ @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create) throws GridDhtInvalidPartitionException { return localPartition(p, topVer, create, true); } @@@ -463,7 -461,7 +463,7 @@@ * @param updateSeq Update sequence. * @return Local partition. */ - private GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) { - private GridDhtLocalPartition localPartition(int p, long topVer, boolean create, boolean updateSeq) { ++ private GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) { while (true) { boolean belongs = cctx.affinity().localNode(p, topVer); @@@ -513,8 -511,8 +513,8 @@@ } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) { + @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { - return localPartition(cctx.affinity().partition(key), -1, create); + return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create); } /** {@inheritDoc} */ @@@ -528,7 -526,7 +528,7 @@@ } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e) { - @Override public GridDhtLocalPartition onAdded(long topVer, GridDhtCacheEntry e) { ++ @Override public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e) { /* * Make sure not to acquire any locks here as this method * may be called from sensitive synchronization blocks. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index d3fa4f9,a90e2e7..6498364 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@@ -499,8 -499,8 +500,8 @@@ public abstract class GridDhtTransactio * @param nodeId Node ID. * @param req Request. */ - private void processNearLockRequest(UUID nodeId, GridNearLockRequest<K, V> req) { + private void processNearLockRequest(UUID nodeId, GridNearLockRequest req) { - assert isAffinityNode(cacheCfg); + assert ctx.affinityNode(); assert nodeId != null; assert req != null; @@@ -1213,8 -1209,8 +1210,8 @@@ * @param req Request. */ @SuppressWarnings({"RedundantTypeArguments", "TypeMayBeWeakened"}) - private void processNearUnlockRequest(UUID nodeId, GridNearUnlockRequest<K, V> req) { + private void processNearUnlockRequest(UUID nodeId, GridNearUnlockRequest req) { - assert isAffinityNode(cacheCfg); + assert ctx.affinityNode(); assert nodeId != null; removeLocks(nodeId, req.version(), req.keys(), true); @@@ -1230,11 -1226,11 +1227,11 @@@ * @throws IgniteCheckedException If failed. */ private void map(UUID nodeId, - long topVer, + AffinityTopologyVersion topVer, - GridCacheEntryEx<K,V> cached, + GridCacheEntryEx cached, Collection<UUID> readers, - Map<ClusterNode, List<T2<K, byte[]>>> dhtMap, - Map<ClusterNode, List<T2<K, byte[]>>> nearMap) + Map<ClusterNode, List<KeyCacheObject>> dhtMap, + Map<ClusterNode, List<KeyCacheObject>> nearMap) throws IgniteCheckedException { Collection<ClusterNode> dhtNodes = ctx.dht().topology().nodes(cached.partition(), topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 6d6446f,a13233f..82979fc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@@ -18,7 -18,7 +18,8 @@@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; + import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 9f2ca20,57795d3..00e8e3e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@@ -20,7 -20,7 +20,8 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; + import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@@ -214,7 -218,7 +219,7 @@@ public class GridDhtTxLocal extends Gri } /** {@inheritDoc} */ - @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) { - @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, long topVer) { ++ @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, AffinityTopologyVersion topVer) { return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId()); } @@@ -247,11 -251,11 +252,11 @@@ } /** {@inheritDoc} */ - @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, - IgniteTxEntry<K, V> entry, AffinityTopologyVersion topVer) { + @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached, - IgniteTxEntry entry, long topVer) { ++ IgniteTxEntry entry, AffinityTopologyVersion topVer) { // Don't add local node as reader. if (!cctx.localNodeId().equals(nearNodeId)) { - GridCacheContext<K, V> cacheCtx = cached.context(); + GridCacheContext cacheCtx = cached.context(); while (true) { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index e856aa8,b493dd6..ab5286f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@@ -20,7 -20,7 +20,8 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; + import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@@ -141,9 -141,9 +142,9 @@@ public abstract class GridDhtTxLocalAda * @return {@code True} if reader was added as a result of this call. */ @Nullable protected abstract IgniteInternalFuture<Boolean> addReader(long msgId, - GridDhtCacheEntry<K, V> cached, - IgniteTxEntry<K, V> entry, + GridDhtCacheEntry cached, + IgniteTxEntry entry, - long topVer); + AffinityTopologyVersion topVer); /** * @param commit Commit flag. @@@ -540,11 -526,11 +527,11 @@@ onePhaseCommit(onePhaseCommit); try { - Set<K> skipped = null; + Set<KeyCacheObject> skipped = null; - long topVer = topologyVersion(); + AffinityTopologyVersion topVer = topologyVersion(); - GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); // Enlist locks into transaction. for (int i = 0; i < entries.size(); i++) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index c913137,1ffe82e..3ba146f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@@ -113,10 -113,10 +114,10 @@@ public class GridDhtTxPrepareRequest ex public GridDhtTxPrepareRequest( IgniteUuid futId, IgniteUuid miniId, - @NotNull AffinityTopologyVersion topVer, - GridDhtTxLocalAdapter<K, V> tx, - Collection<IgniteTxEntry<K, V>> dhtWrites, - Collection<IgniteTxEntry<K, V>> nearWrites, - long topVer, ++ AffinityTopologyVersion topVer, + GridDhtTxLocalAdapter tx, + Collection<IgniteTxEntry> dhtWrites, + Collection<IgniteTxEntry> nearWrites, IgniteTxKey grpLockKey, boolean partLock, Map<UUID, Collection<UUID>> txNodes, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index b2c2300,fe8b91f..e019f6e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@@ -18,7 -18,7 +18,8 @@@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; + import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@@ -220,7 -222,7 +223,7 @@@ public class GridDhtTxRemote extends Gr } /** {@inheritDoc} */ - @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) { - @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, long topVer) { ++ @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, AffinityTopologyVersion topVer) { if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || cctx.localNodeId().equals(nearNodeId)) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java index 92d750a,a818500..7bcc5c3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java @@@ -71,15 -70,19 +71,20 @@@ public class GridNoStorageCacheMap exte } /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> putEntry(AffinityTopologyVersion topVer, K key, @Nullable V val, long ttl) { - @Override public GridCacheMapEntry putEntry(long topVer, KeyCacheObject key, @Nullable CacheObject val, long ttl) { ++ @Override public GridCacheMapEntry putEntry(AffinityTopologyVersion topVer, KeyCacheObject key, @Nullable CacheObject val, long ttl) { throw new AssertionError(); } /** {@inheritDoc} */ - @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, K key, @Nullable V val, - long ttl, boolean create) { - @Override public GridTriple<GridCacheMapEntry> putEntryIfObsoleteOrAbsent(long topVer, ++ @Override public GridTriple<GridCacheMapEntry> putEntryIfObsoleteOrAbsent( ++ AffinityTopologyVersion topVer, + KeyCacheObject key, + @Nullable CacheObject val, + long ttl, + boolean create) + { if (create) { - GridCacheMapEntry<K, V> entry = new GridDhtCacheEntry<>(ctx, topVer, key, hash(key.hashCode()), val, + GridCacheMapEntry entry = new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0, 0); return new GridTriple<>(entry, null, null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index fb51c91,bdbcd06..c64dc4a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@@ -61,10 -62,10 +63,10 @@@ public class GridPartitionedGetFuture<K private GridCacheContext<K, V> cctx; /** Keys. */ - private Collection<? extends K> keys; + private Collection<KeyCacheObject> keys; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Reload flag. */ private boolean reload; @@@ -128,8 -119,8 +120,8 @@@ */ public GridPartitionedGetFuture( GridCacheContext<K, V> cctx, - Collection<? extends K> keys, - @NotNull AffinityTopologyVersion topVer, + Collection<KeyCacheObject> keys, - long topVer, ++ AffinityTopologyVersion topVer, boolean readThrough, boolean reload, boolean forcePrimary, @@@ -166,9 -158,9 +159,9 @@@ * Initializes future. */ public void init() { - long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); - map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer); + map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); markInitialized(); } @@@ -275,9 -260,13 +261,14 @@@ * @param mapped Mappings to check for duplicates. * @param topVer Topology version on which keys should be mapped. */ - private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, AffinityTopologyVersion topVer) { - private void map(Collection<KeyCacheObject> keys, ++ private void map( ++ Collection<KeyCacheObject> keys, + Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped, - long topVer) - { ++ AffinityTopologyVersion topVer ++ ) { if (CU.affinityNodes(cctx, topVer).isEmpty()) { - onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all partition nodes left the grid).")); + onDone(new ClusterTopologyCheckedException("Failed to map keys for cache " + + "(all partition nodes left the grid).")); return; } @@@ -416,8 -396,10 +398,13 @@@ * @return {@code True} if has remote nodes. */ @SuppressWarnings("ConstantConditions") - private boolean map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings, Map<K, V> locVals, - AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) { - private boolean map(KeyCacheObject key, - Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, Map<K, V> locVals, - long topVer, - Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped) { ++ private boolean map( ++ KeyCacheObject key, ++ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, ++ Map<K, V> locVals, ++ AffinityTopologyVersion topVer, ++ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped ++ ) { GridDhtCacheAdapter<K, V> colocated = cache(); boolean remote = false; @@@ -587,26 -545,17 +550,17 @@@ /** Keys. */ @GridToStringInclude - private LinkedHashMap<K, Boolean> keys; + private LinkedHashMap<KeyCacheObject, Boolean> keys; /** Topology version on which this future was mapped. */ - private long topVer; + private AffinityTopologyVersion topVer; /** - * Empty constructor required for {@link Externalizable}. - */ - public MiniFuture() { - // No-op. - } - - /** * @param node Node. * @param keys Keys. * @param topVer Topology version. */ - MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, @NotNull AffinityTopologyVersion topVer) { - super(cctx.kernalContext()); - - MiniFuture(ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, long topVer) { ++ MiniFuture(ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, AffinityTopologyVersion topVer) { this.node = node; this.keys = keys; this.topVer = topVer; @@@ -652,9 -601,9 +606,9 @@@ if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); - long updTopVer = cctx.discovery().topologyVersion(); ++ AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - assert updTopVer > topVer : "Got topology exception but topology version did " + + assert updTopVer.compareTo(topVer) > 0 : "Got topology exception but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + ", nodeId=" + node.id() + ']'; @@@ -698,16 -647,16 +652,16 @@@ log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']'); // Need to wait for next topology version to remap. - IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); - IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer); ++ IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); - topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() { + topFut.listen(new CIX1<IgniteInternalFuture<Long>>() { @SuppressWarnings("unchecked") @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException { - long topVer = fut.get(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get()); // This will append new futures to compound list. - map(F.view(keys.keySet(), new P1<K>() { - @Override public boolean apply(K key) { + map(F.view(keys.keySet(), new P1<KeyCacheObject>() { + @Override public boolean apply(KeyCacheObject key) { return invalidParts.contains(cctx.affinity().partition(key)); } }), F.t(node, keys), topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index ddf4229,4474432..c8ff3e2 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@@ -118,11 -117,18 +118,18 @@@ public class GridDhtAtomicCache<K, V> e /** {@inheritDoc} */ @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + map.setEntryFactory(new GridCacheMapEntryFactory() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { - return new GridDhtAtomicCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); + @Override public GridCacheMapEntry create(GridCacheContext ctx, - long topVer, ++ AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { + return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId); } }); @@@ -439,7 -446,8 +445,7 @@@ null, true, true, - ctx.equalsPeekArray(oldVal)); - null, + ctx.equalsValArray(oldVal)); } /** {@inheritDoc} */ @@@ -458,7 -466,8 +464,7 @@@ null, false, false, - filter); - null, + filter).chain(RET2NULL); } /** {@inheritDoc} */ @@@ -662,8 -669,9 +665,8 @@@ args, null, null, - true, + false, false, - null, null); return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @@@ -705,8 -713,9 +708,8 @@@ args, null, null, - true, + false, false, - null, null); } @@@ -734,8 -743,9 +737,8 @@@ args, null, null, - true, + false, false, - null, null); } @@@ -757,11 -768,12 +760,11 @@@ @Nullable final Map<? extends K, ? extends V> map, @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap, @Nullable Object[] invokeArgs, - @Nullable final Map<? extends K, GridCacheDrInfo<V>> conflictPutMap, - @Nullable final Map<? extends K, GridCacheVersion> conflictRmvMap, + @Nullable final Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap, + @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, final boolean retval, final boolean rawRetval, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter - @Nullable GridCacheEntryEx cached, + @Nullable final CacheEntryPredicate[] filter ) { if (map != null && keyCheck) validateCacheKeys(map.keySet()); @@@ -885,15 -897,7 +888,7 @@@ boolean deserializePortable, @Nullable ExpiryPolicy expiryPlc, boolean skipVals) { - ctx.checkSecurity(GridSecurityPermission.CACHE_READ); - - if (F.isEmpty(keys)) - return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - - if (keyCheck) - validateCacheKeys(keys); - - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); @@@ -1622,14 -1640,14 +1631,14 @@@ String taskName, @Nullable IgniteCacheExpiryPolicy expiry ) throws GridCacheEntryRemovedException { - GridCacheReturn<Object> retVal = null; - Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted = null; + GridCacheReturn retVal = null; + Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; - List<K> keys = req.keys(); + List<KeyCacheObject> keys = req.keys(); - long topVer = req.topologyVersion(); + AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer.topologyVersion()); boolean readersOnly = false; @@@ -2078,10 -2086,10 +2077,10 @@@ * locks are released. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private List<GridDhtCacheEntry<K, V>> lockEntries(List<K> keys, AffinityTopologyVersion topVer) - private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, long topVer) ++ private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { if (keys.size() == 1) { - K key = keys.get(0); + KeyCacheObject key = keys.get(0); while (true) { try { @@@ -2161,7 -2169,7 +2160,7 @@@ * @param locked Locked entries. * @param topVer Topology version. */ - private void unlockEntries(Collection<GridDhtCacheEntry<K, V>> locked, AffinityTopologyVersion topVer) { - private void unlockEntries(Collection<GridDhtCacheEntry> locked, long topVer) { ++ private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) { // Process deleted entries before locks release. assert ctx.deferredDelete(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java index ec63130,76f8a40..c918e1e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java @@@ -36,8 -35,15 +36,15 @@@ public class GridDhtAtomicCacheEntry ex * @param ttl Time to live. * @param hdrId Header id. */ - public GridDhtAtomicCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, - GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + public GridDhtAtomicCacheEntry(GridCacheContext ctx, - long topVer, ++ AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { super(ctx, topVer, key, hash, val, next, ttl, hdrId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 5f8240d,f72665f..6dea845 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@@ -286,13 -271,13 +272,13 @@@ public class GridDhtAtomicUpdateFuture keys.add(entry.key()); - long topVer = updateReq.topologyVersion(); + AffinityTopologyVersion topVer = updateReq.topologyVersion(); for (UUID nodeId : readers) { - GridDhtAtomicUpdateRequest<K, V> updateReq = mappings.get(nodeId); + GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); if (updateReq == null) { - ClusterNode node = ctx.discovery().node(nodeId); + ClusterNode node = cctx.discovery().node(nodeId); // Node left the grid. if (node == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 3cd4fea,ebf26b2..77e07a4 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@@ -110,10 -105,10 +105,10 @@@ public class GridNearAtomicUpdateFutur private final ExpiryPolicy expiryPlc; /** Future map topology version. */ - private long topVer; + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; /** Optional filter. */ - private final IgnitePredicate<Cache.Entry<K, V>>[] filter; + private final CacheEntryPredicate[] filter; /** Write synchronization mode. */ private final CacheWriteSynchronizationMode syncMode; @@@ -420,10 -408,10 +408,10 @@@ * @param remap Boolean flag indicating if this is partial future remap. * @param oldNodeId Old node ID if remap. */ - private void mapOnTopology(final Collection<? extends K> keys, final boolean remap, final UUID oldNodeId) { + private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) { cache.topology().readLock(); - GridDiscoveryTopologySnapshot snapshot = null; + AffinityTopologyVersion topVer = null; try { GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); @@@ -435,10 -421,12 +423,10 @@@ // Assign future version in topology read lock before first exception may be thrown. futVer = cctx.versions().next(topVer); - // We are holding topology read lock and current topology is ready, we can start mapping. - snapshot = fut.topologySnapshot(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - fut.listen(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { ++ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { mapOnTopology(keys, remap, oldNodeId); } }); @@@ -477,9 -473,8 +465,9 @@@ * @param remap Flag indicating if this is partial remap for this future. * @param oldNodeId Old node ID if was remap. */ - private void map0(GridDiscoveryTopologySnapshot topSnapshot, + private void map0( + AffinityTopologyVersion topVer, - Collection<? extends K> keys, + Collection<?> keys, boolean remap, @Nullable UUID oldNodeId) { assert oldNodeId == null || remap; @@@ -746,8 -744,8 +735,12 @@@ * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. * @return Collection of nodes to which key is mapped. */ - private Collection<ClusterNode> mapKey(K key, AffinityTopologyVersion topVer, boolean fastMap) { - GridCacheAffinityManager<K, V> affMgr = cctx.affinity(); - private Collection<ClusterNode> mapKey(KeyCacheObject key, long topVer, boolean fastMap) { ++ private Collection<ClusterNode> mapKey( ++ KeyCacheObject key, ++ AffinityTopologyVersion topVer, ++ boolean fastMap ++ ) { + GridCacheAffinityManager affMgr = cctx.affinity(); // If we can send updates in parallel - do it. return fastMap ? http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index a478876,01ccb53..ee0439f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@@ -81,11 -79,18 +80,18 @@@ public class GridDhtColocatedCache<K, V /** {@inheritDoc} */ @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + map.setEntryFactory(new GridCacheMapEntryFactory() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { - return new GridDhtColocatedCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); + @Override public GridCacheMapEntry create(GridCacheContext ctx, - long topVer, ++ AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { + return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId); } }); } @@@ -119,9 -124,12 +125,13 @@@ * @throws GridDhtInvalidPartitionException If {@code allowDetached} is false and node is not primary * for given key. */ - public GridDistributedCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer, boolean allowDetached) { - public GridDistributedCacheEntry entryExx(KeyCacheObject key, - long topVer, - boolean allowDetached) - { ++ public GridDistributedCacheEntry entryExx( ++ KeyCacheObject key, ++ AffinityTopologyVersion topVer, ++ boolean allowDetached ++ ) { return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ? - new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer); + new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer); } /** {@inheritDoc} */ @@@ -200,7 -220,7 +222,10 @@@ } /** {@inheritDoc} */ - @Override protected GridCacheEntryEx<K, V> entryExSafe(K key, AffinityTopologyVersion topVer) { - @Override protected GridCacheEntryEx entryExSafe(KeyCacheObject key, long topVer) { ++ @Override protected GridCacheEntryEx entryExSafe( ++ KeyCacheObject key, ++ AffinityTopologyVersion topVer ++ ) { try { return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null; } @@@ -410,27 -421,28 +426,28 @@@ int keyCnt = -1; - Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null; + Map<ClusterNode, GridNearUnlockRequest> map = null; - Collection<K> locKeys = new ArrayList<>(); + Collection<KeyCacheObject> locKeys = new ArrayList<>(); for (K key : keys) { - GridDistributedCacheEntry<K, V> entry = peekExx(key); + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - Cache.Entry<K, V> Entry = entry == null ? entry(key) : entry.wrapLazyValue(); + GridDistributedCacheEntry entry = peekExx(cacheKey); - if (!ctx.isAll(Entry, filter)) + if (!ctx.isAll(entry, filter)) break; // While. - GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), key, null); + GridCacheMvccCandidate lock = + ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), cacheKey, null); if (lock != null) { - final long topVer = lock.topologyVersion(); + final AffinityTopologyVersion topVer = lock.topologyVersion(); - assert topVer > 0; + assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); + Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer.topologyVersion()); keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); @@@ -511,18 -523,18 +528,18 @@@ try { int keyCnt = -1; - Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null; + Map<ClusterNode, GridNearUnlockRequest> map = null; - Collection<K> locKeys = new LinkedList<>(); + Collection<KeyCacheObject> locKeys = new LinkedList<>(); - for (K key : keys) { - GridCacheMvccCandidate<K> lock = ctx.mvcc().removeExplicitLock(threadId, key, ver); + for (KeyCacheObject key : keys) { + GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, key, ver); if (lock != null) { - long topVer = lock.topologyVersion(); + AffinityTopologyVersion topVer = lock.topologyVersion(); if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); + Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer.topologyVersion()); keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); @@@ -594,11 -606,11 +611,11 @@@ */ IgniteInternalFuture<Exception> lockAllAsync( final GridCacheContext<K, V> cacheCtx, - @Nullable final GridNearTxLocal<K, V> tx, + @Nullable final GridNearTxLocal tx, final long threadId, final GridCacheVersion ver, - final AffinityTopologyVersion topVer, - final Collection<K> keys, - final long topVer, ++ AffinityTopologyVersion topVer, + final Collection<KeyCacheObject> keys, final boolean txRead, final long timeout, final long accessTtl, @@@ -667,11 -679,11 +684,11 @@@ */ private IgniteInternalFuture<Exception> lockAllAsync0( GridCacheContext<K, V> cacheCtx, - @Nullable final GridNearTxLocal<K, V> tx, + @Nullable final GridNearTxLocal tx, long threadId, final GridCacheVersion ver, - final AffinityTopologyVersion topVer, - final Collection<K> keys, - final long topVer, ++ AffinityTopologyVersion topVer, + final Collection<KeyCacheObject> keys, final boolean txRead, final long timeout, final long accessTtl, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java index f2de9d9,c95e2e2..1ccc0da --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java @@@ -36,8 -35,15 +36,15 @@@ public class GridDhtColocatedCacheEntr * @param ttl Time to live. * @param hdrId Header id. */ - public GridDhtColocatedCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, - GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + public GridDhtColocatedCacheEntry(GridCacheContext ctx, - long topVer, ++ AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, - int hdrId) - { ++ int hdrId ++ ) { super(ctx, topVer, key, hash, val, next, ttl, hdrId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 1e13dd1,08358d5..cb889f2 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@@ -574,8 -563,8 +564,8 @@@ public final class GridDhtColocatedLock markInitialized(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - fut.listen(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { ++ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { mapOnTopology(); } }); @@@ -871,8 -859,8 +860,11 @@@ * @param topVer Topology version to lock on. * @param mappings Optional collection of mappings to proceed locking. */ - private void lockLocally(final Collection<K> keys, AffinityTopologyVersion topVer, - @Nullable final Deque<GridNearLockMapping<K, V>> mappings) { - private void lockLocally(final Collection<KeyCacheObject> keys, long topVer, - @Nullable final Deque<GridNearLockMapping> mappings) { ++ private void lockLocally( ++ final Collection<KeyCacheObject> keys, ++ AffinityTopologyVersion topVer, ++ @Nullable final Deque<GridNearLockMapping> mappings ++ ) { if (log.isDebugEnabled()) log.debug("Before locally locking keys : " + keys); @@@ -948,14 -934,14 +938,14 @@@ * @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed. * @throws IgniteCheckedException If key cannot be added to mapping. */ - private boolean mapAsPrimary(Collection<? extends K> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException { - private boolean mapAsPrimary(Collection<KeyCacheObject> keys, long topVer) throws IgniteCheckedException { ++ private boolean mapAsPrimary(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException { // Assign keys to primary nodes. - Collection<K> distributedKeys = new ArrayList<>(keys.size()); + Collection<KeyCacheObject> distributedKeys = new ArrayList<>(keys.size()); - for (K key : keys) { + for (KeyCacheObject key : keys) { if (!cctx.affinity().primary(cctx.localNode(), key, topVer)) { // Remove explicit locks added so far. - for (K k : keys) + for (KeyCacheObject k : keys) cctx.mvcc().removeExplicitLock(threadId, k, lockVer); return false; @@@ -993,8 -979,9 +983,12 @@@ * @return {@code True} if transaction accesses key that was explicitly locked before. * @throws IgniteCheckedException If lock is externally held and transaction is explicit. */ - private boolean addLocalKey(K key, AffinityTopologyVersion topVer, Collection<K> distributedKeys) throws IgniteCheckedException { - GridDistributedCacheEntry<K, V> entry = cctx.colocated().entryExx(key, topVer, false); - private boolean addLocalKey(KeyCacheObject key, long topVer, Collection<KeyCacheObject> distributedKeys) - throws IgniteCheckedException { ++ private boolean addLocalKey( ++ KeyCacheObject key, ++ AffinityTopologyVersion topVer, ++ Collection<KeyCacheObject> distributedKeys ++ ) throws IgniteCheckedException { + GridDistributedCacheEntry entry = cctx.colocated().entryExx(key, topVer, false); assert !entry.detached(); @@@ -1022,8 -1009,8 +1016,11 @@@ * @return Near lock mapping. * @throws IgniteCheckedException If mapping failed. */ - private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping, - AffinityTopologyVersion topVer) throws IgniteCheckedException { - private GridNearLockMapping map(KeyCacheObject key, @Nullable GridNearLockMapping mapping, - long topVer) throws IgniteCheckedException { ++ private GridNearLockMapping map( ++ KeyCacheObject key, ++ @Nullable GridNearLockMapping mapping, ++ AffinityTopologyVersion topVer ++ ) throws IgniteCheckedException { assert mapping == null || mapping.node() != null; ClusterNode primary = cctx.affinity().primary(key, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index dbf6146,bec3bf0..d53f445 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@@ -92,11 -90,13 +91,13 @@@ public final class GridDhtForceKeysFutu * @param keys Keys. * @param preloader Preloader. */ - public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, @NotNull AffinityTopologyVersion topVer, - Collection<? extends K> keys, GridDhtPreloader<K, V> preloader) { - super(cctx.kernalContext()); - + public GridDhtForceKeysFuture( + GridCacheContext<K, V> cctx, - long topVer, ++ AffinityTopologyVersion topVer, + Collection<KeyCacheObject> keys, + GridDhtPreloader<K, V> preloader + ) { - assert topVer != 0 : topVer; + assert topVer.topologyVersion() != 0 : topVer; assert !F.isEmpty(keys) : keys; this.cctx = cctx; @@@ -493,13 -476,13 +477,13 @@@ boolean replicate = cctx.isDrEnabled(); - for (GridCacheEntryInfo<K, V> info : res.forcedInfos()) { + for (GridCacheEntryInfo info : res.forcedInfos()) { int p = cctx.affinity().partition(info.key()); - GridDhtLocalPartition<K, V> locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); - GridDhtLocalPartition locPart = top.localPartition(p, -1, false); ++ GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); if (locPart != null && locPart.state() == MOVING && locPart.reserve()) { - GridCacheEntryEx<K, V> entry = cctx.dht().entryEx(info.key()); + GridCacheEntryEx entry = cctx.dht().entryEx(info.key()); try { if (entry.initialValue( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java index fa6cd29,925a05d..29823f9 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java @@@ -46,17 -44,13 +46,13 @@@ public class GridDhtForceKeysRequest ex /** Mini-future ID. */ private IgniteUuid miniId; - /** Serialized keys. */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> keyBytes; - /** Keys to request. */ @GridToStringInclude - @GridDirectTransient - private Collection<K> keys; + @GridDirectCollection(KeyCacheObject.class) + private Collection<KeyCacheObject> keys; /** Topology version for which keys are requested. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * @param cacheId Cache ID. @@@ -69,8 -63,8 +65,8 @@@ int cacheId, IgniteUuid futId, IgniteUuid miniId, - Collection<K> keys, - @NotNull AffinityTopologyVersion topVer + Collection<KeyCacheObject> keys, - long topVer ++ AffinityTopologyVersion topVer ) { assert futId != null; assert miniId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 8f971a4,149929d..144ed7a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@@ -209,8 -208,8 +209,8 @@@ public class GridDhtPartitionDemandPool if (log.isDebugEnabled()) log.debug("Forcing preload event for future: " + exchFut); - exchFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - exchFut.listen(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { ++ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.shared().exchange().forcePreloadExchange(exchFut); } }); @@@ -358,8 -357,8 +358,8 @@@ obj = new GridTimeoutObjectAdapter(delay) { @Override public void onTimeout() { - exchFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - exchFut.listen(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> f) { ++ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { cctx.shared().exchange().forcePreloadExchange(exchFut); } }); @@@ -482,10 -481,10 +482,14 @@@ * @return {@code False} if partition has become invalid during preloading. * @throws IgniteInterruptedCheckedException If interrupted. */ - private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, AffinityTopologyVersion topVer) - private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo entry, long topVer) -- throws IgniteCheckedException { ++ private boolean preloadEntry( ++ ClusterNode pick, ++ int p, ++ GridCacheEntryInfo entry, ++ AffinityTopologyVersion topVer ++ ) throws IgniteCheckedException { try { - GridCacheEntryEx<K, V> cached = null; + GridCacheEntryEx cached = null; try { cached = cctx.dht().entryEx(entry.key()); @@@ -570,9 -568,9 +573,13 @@@ * @throws ClusterTopologyCheckedException If node left. * @throws IgniteCheckedException If failed to send message. */ - private Set<Integer> demandFromNode(ClusterNode node, final AffinityTopologyVersion topVer, GridDhtPartitionDemandMessage<K, V> d, - GridDhtPartitionsExchangeFuture<K, V> exchFut) throws InterruptedException, IgniteCheckedException { - GridDhtPartitionTopology<K, V> top = cctx.dht().topology(); - private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage d, - GridDhtPartitionsExchangeFuture exchFut) throws InterruptedException, IgniteCheckedException { ++ private Set<Integer> demandFromNode( ++ ClusterNode node, ++ final AffinityTopologyVersion topVer, ++ GridDhtPartitionDemandMessage d, ++ GridDhtPartitionsExchangeFuture exchFut ++ ) throws InterruptedException, IgniteCheckedException { + GridDhtPartitionTopology top = cctx.dht().topology(); cntr++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 236ec6c,cdd153f..43f5566 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -48,8 -46,8 +47,8 @@@ import static org.apache.ignite.interna /** * Future for exchanging partition maps. */ - public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<AffinityTopologyVersion> - implements Comparable<GridDhtPartitionsExchangeFuture<K, V>>, GridDhtTopologyFuture { -public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<Long> ++public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> + implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { /** */ private static final long serialVersionUID = 0L; @@@ -200,16 -193,8 +197,12 @@@ * @param busyLock Busy lock. * @param exchId Exchange ID. */ - public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, ReadWriteLock busyLock, - GridDhtPartitionExchangeId exchId) { + public GridDhtPartitionsExchangeFuture( - GridCacheSharedContext<K, V> cctx, ++ GridCacheSharedContext cctx, + ReadWriteLock busyLock, + GridDhtPartitionExchangeId exchId, + Collection<DynamicCacheChangeRequest> reqs + ) { - super(cctx.kernalContext()); - - syncNotify(true); - assert busyLock != null; assert exchId != null; @@@ -462,9 -423,9 +441,9 @@@ // If received any messages, process them. onReceive(m.getKey(), m.getValue()); - long topVer = exchId.topologyVersion(); + AffinityTopologyVersion topVer = exchId.topologyVersion(); - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@@ -487,10 -448,7 +466,10 @@@ if (log.isDebugEnabled()) log.debug("After waiting for partition release future: " + this); + if (!F.isEmpty(reqs)) + stopCaches(); + - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@@ -504,9 -462,9 +483,9 @@@ // Process queued undeploys prior to sending/spreading map. cacheCtx.preloader().unwindUndeploys(); - GridDhtPartitionTopology<K, V> top = cacheCtx.topology(); + GridDhtPartitionTopology top = cacheCtx.topology(); - assert topVer == top.topologyVersion() : + assert topVer.equals(top.topologyVersion()) : "Topology version is updated only in this class instances inside single ExchangeWorker thread."; top.beforeExchange(exchId); @@@ -680,24 -612,11 +660,24 @@@ } /** {@inheritDoc} */ - @Override public boolean onDone(Long res, Throwable err) { - if (err == null) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) { - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { ++ for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (err == null) { if (!cacheCtx.isLocal()) - cacheCtx.affinity().cleanUpCache(res - 10); + cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10); + } + + if (!F.isEmpty(reqs)) { + for (DynamicCacheChangeRequest req : reqs) { + if (F.eq(cacheCtx.name(), req.cacheName())) { + if (req.isStart()) + cacheCtx.preloader().onInitialExchangeComplete(err); + else if (req.isClientStart()) { - if (req.clientNodeId().equals(ctx.localNodeId())) ++ if (req.clientNodeId().equals(cacheCtx.localNodeId())) + cacheCtx.preloader().onInitialExchangeComplete(err); + } + } + } } } @@@ -893,9 -807,9 +873,9 @@@ if (log.isDebugEnabled()) log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); - assert exchId.topologyVersion() == msg.topologyVersion(); + assert exchId.topologyVersion().equals(msg.topologyVersion()); - initFut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() { + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> t) { assert msg.lastVersion() != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 0a74d0c,c171f97..c698f6f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@@ -71,7 -71,7 +72,7 @@@ public class GridDhtPreloader<K, V> ext private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); /** Pending affinity assignment futures. */ - private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture<K, V>> pendingAssignmentFetchFuts = - private ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = ++ private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); /** Discovery listener. */ @@@ -277,8 -277,8 +278,8 @@@ * @param topVer Requested topology version. * @param fut Future to add. */ - public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture<K, V> fut) { - GridDhtAssignmentFetchFuture<K, V> old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut); - public void addDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture fut) { ++ public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) { + GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut); assert old == null : "More than one thread is trying to fetch partition assignments: " + topVer; } @@@ -287,7 -287,7 +288,7 @@@ * @param topVer Requested topology version. * @param fut Future to remove. */ - public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture<K, V> fut) { - public void removeDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture fut) { ++ public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) { boolean rmv = pendingAssignmentFetchFuts.remove(topVer, fut); assert rmv : "Failed to remove assignment fetch future: " + topVer; @@@ -346,10 -346,10 +347,10 @@@ msg.futureId(), msg.miniId()); - for (K k : msg.keys()) { + for (KeyCacheObject k : msg.keys()) { int p = cctx.affinity().partition(k); - GridDhtLocalPartition<K, V> locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); - GridDhtLocalPartition locPart = top.localPartition(p, -1, false); ++ GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); // If this node is no longer an owner. if (locPart == null && !top.owners(p).contains(loc)) @@@ -423,14 -423,14 +424,14 @@@ * @param req Request. */ private void processAffinityAssignmentRequest(final ClusterNode node, - final GridDhtAffinityAssignmentRequest<K, V> req) { + final GridDhtAffinityAssignmentRequest req) { - final long topVer = req.topologyVersion(); + final AffinityTopologyVersion topVer = req.topologyVersion(); if (log.isDebugEnabled()) log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']'); - cctx.affinity().affinityReadyFuture(req.topologyVersion()).listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> fut) { ++ cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { if (log.isDebugEnabled()) log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + ", node=" + node + ']'); @@@ -489,7 -489,7 +490,7 @@@ * @return Future for request. */ @SuppressWarnings( {"unchecked", "RedundantCast"}) - @Override public GridDhtFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer) { - @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, long topVer) { ++ @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 76ea0a3,dda2115..369fc68 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@@ -34,18 -33,18 +34,18 @@@ public class GridDhtPreloaderAssignment /** Exchange future. */ @GridToStringExclude - private final GridDhtPartitionsExchangeFuture<K, V> exchFut; + private final GridDhtPartitionsExchangeFuture exchFut; /** Last join order. */ - private final long topVer; + private final AffinityTopologyVersion topVer; /** * @param exchFut Exchange future. * @param topVer Last join order. */ - public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture<K, V> exchFut, AffinityTopologyVersion topVer) { - public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, long topVer) { ++ public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion topVer) { assert exchFut != null; - assert topVer > 0; + assert topVer.topologyVersion() > 0; this.exchFut = exchFut; this.topVer = topVer; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 7bf4cc5,c889cf1..adc9d7b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@@ -209,9 -202,9 +203,9 @@@ public class GridNearAtomicCache<K, V> ) throws IgniteCheckedException { try { while (true) { - GridCacheEntryEx<K, V> entry = null; + GridCacheEntryEx entry = null; - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); try { entry = entryEx(key, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 3ab2cf0,1687cb5..933fad4 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@@ -67,13 -67,19 +67,21 @@@ public abstract class GridNearCacheAdap /** {@inheritDoc} */ @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + map.setEntryFactory(new GridCacheMapEntryFactory() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { - @Override public GridCacheMapEntry create(GridCacheContext ctx, - long topVer, KeyCacheObject key, ++ @Override public GridCacheMapEntry create( ++ GridCacheContext ctx, ++ AffinityTopologyVersion topVer, ++ KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, - int hdrId) - { ++ int hdrId ++ ) { // Can't hold any locks here - this method is invoked when // holding write-lock on the whole cache map. - return new GridNearCacheEntry<>(ctx, key, hash, val, next, ttl, hdrId); + return new GridNearCacheEntry(ctx, key, hash, val, next, ttl, hdrId); } }); } @@@ -113,8 -119,8 +121,8 @@@ } /** {@inheritDoc} */ - @Override public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) { - GridNearCacheEntry<K, V> entry = null; - @Override public GridCacheEntryEx entryEx(KeyCacheObject key, long topVer) { ++ @Override public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) { + GridNearCacheEntry entry = null; while (true) { try { @@@ -136,8 -142,8 +144,8 @@@ * @param topVer Topology version. * @return Entry. */ - public GridNearCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer) { - return (GridNearCacheEntry<K, V>)entryEx(key, topVer); - public GridNearCacheEntry entryExx(KeyCacheObject key, long topVer) { ++ public GridNearCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) { + return (GridNearCacheEntry)entryEx(key, topVer); } /** @@@ -372,8 -334,8 +336,8 @@@ /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> primaryEntrySet( - @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable final CacheEntryPredicate... filter) { - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); Collection<Cache.Entry<K, V>> entries = F.flatCollections(