http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index acf00eb..4af7534 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.*; @@ -65,7 +66,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, private GridDhtPartitionExchangeId lastExchangeId; /** */ - private long topVer = -1; + private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** A future that will be completed when topology with version topVer will be ready to use. */ private GridDhtTopologyFuture topReadyFut; @@ -154,7 +155,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, lock.writeLock().lock(); try { - assert exchId.topologyVersion() > topVer : "Invalid topology version [topVer=" + topVer + + assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; topVer = exchId.topologyVersion(); @@ -167,11 +168,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { lock.readLock().lock(); try { - assert topVer > 0; + assert topVer.topologyVersion() > 0; return topVer; } @@ -205,14 +206,14 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, lock.writeLock().lock(); try { - assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; if (!exchId.isJoined()) removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion()); if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -366,12 +367,12 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, int num = cctx.affinity().partitions(); - long topVer = exchId.topologyVersion(); + AffinityTopologyVersion topVer = exchId.topologyVersion(); lock.writeLock().lock(); try { - assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; if (log.isDebugEnabled()) @@ -449,7 +450,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) + @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create) throws GridDhtInvalidPartitionException { return localPartition(p, topVer, create, true); } @@ -461,7 +462,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, * @param updateSeq Update sequence. * @return Local partition. */ - private GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create, boolean updateSeq) { + private GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) { while (true) { boolean belongs = cctx.affinity().localNode(p, topVer); @@ -512,7 +513,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, /** {@inheritDoc} */ @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) { - return localPartition(cctx.affinity().partition(key), -1, create); + return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create); } /** {@inheritDoc} */ @@ -526,7 +527,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e) { + @Override public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e) { /* * Make sure not to acquire any locks here as this method * may be called from sensitive synchronization blocks. @@ -572,7 +573,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Override public Collection<ClusterNode> nodes(int p, long topVer) { + @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); lock.readLock().lock(); @@ -592,7 +593,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) { ClusterNode n = cctx.discovery().node(nodeId); - if (n != null && (topVer < 0 || n.order() <= topVer)) { + if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) { nodes = new ArrayList<>(affNodes.size() + 2); @@ -619,8 +620,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, * @param states Additional partition states. * @return List of nodes for the partition. */ - private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer.topologyVersion())) : null; lock.readLock().lock(); @@ -639,13 +640,13 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, List<ClusterNode> nodes = new ArrayList<>(size); for (UUID id : nodeIds) { - if (topVer > 0 && !allIds.contains(id)) + if (topVer.topologyVersion() > 0 && !allIds.contains(id)) continue; if (hasState(p, id, state, states)) { ClusterNode n = cctx.discovery().node(id); - if (n != null && (topVer < 0 || n.order() <= topVer)) + if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) nodes.add(n); } } @@ -658,7 +659,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Override public List<ClusterNode> owners(int p, long topVer) { + @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { if (!cctx.preloadEnabled()) return ownersAndMoving(p, topVer); @@ -667,15 +668,15 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, /** {@inheritDoc} */ @Override public List<ClusterNode> owners(int p) { - return owners(p, -1); + return owners(p, AffinityTopologyVersion.NONE); } /** {@inheritDoc} */ @Override public List<ClusterNode> moving(int p) { if (!cctx.preloadEnabled()) - return ownersAndMoving(p, -1); + return ownersAndMoving(p, AffinityTopologyVersion.NONE); - return nodes(p, -1, MOVING); + return nodes(p, AffinityTopologyVersion.NONE, MOVING); } /** @@ -683,7 +684,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, * @param topVer Topology version. * @return List of nodes in state OWNING or MOVING. */ - private List<ClusterNode> ownersAndMoving(int p, long topVer) { + private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) { return nodes(p, topVer, OWNING, MOVING); } @@ -978,7 +979,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, assert nodeId.equals(cctx.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion()); // If this node became the oldest node. if (oldest.id().equals(cctx.nodeId())) { @@ -1028,7 +1029,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion()); ClusterNode loc = cctx.localNode();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java index 2d2f431..704fadb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.affinity.*; /** * Future that implements a barrier after which dht topology is safe to use. Topology is considered to be @@ -29,7 +30,7 @@ import org.apache.ignite.internal.managers.discovery.*; * When new new transaction is started, it will wait for this future before acquiring new locks on particular * topology version. */ -public interface GridDhtTopologyFuture extends IgniteInternalFuture<Long> { +public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopologyVersion> { /** * Gets a topology snapshot for the topology version represented by the future. Note that by the time * partition exchange completes some nodes from the snapshot may leave the grid. One should use discovery http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 753f7e9..75de04d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; @@ -1229,7 +1230,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @throws IgniteCheckedException If failed. */ private void map(UUID nodeId, - long topVer, + AffinityTopologyVersion topVer, GridCacheEntryEx<K,V> cached, Collection<UUID> readers, Map<ClusterNode, List<T2<K, byte[]>>> dhtMap, @@ -1355,7 +1356,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (cand == null) cand = entry.candidate(dhtVer); - long topVer = cand == null ? -1 : cand.topologyVersion(); + AffinityTopologyVersion topVer = cand == null + ? AffinityTopologyVersion.NONE + : cand.topologyVersion(); // Note that we obtain readers before lock is removed. // Even in case if entry would be removed just after lock is removed, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 2835844..3daace5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -52,7 +53,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest private boolean sysInvalidate; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Pending versions with order less than one for this message (needed for commit ordering). */ @GridToStringInclude @@ -103,7 +104,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest UUID nearNodeId, IgniteUuid futId, IgniteUuid miniId, - long topVer, + @NotNull AffinityTopologyVersion topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, long threadId, @@ -204,7 +205,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -281,7 +282,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest writer.incrementState(); case 26: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -369,7 +370,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest reader.incrementState(); case 26: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 6aa159c..9f2ca20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -213,7 +214,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { + @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) { return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId()); } @@ -247,7 +248,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements /** {@inheritDoc} */ @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, - IgniteTxEntry<K, V> entry, long topVer) { + IgniteTxEntry<K, V> entry, AffinityTopologyVersion topVer) { // Don't add local node as reader. if (!cctx.localNodeId().equals(nearNodeId)) { GridCacheContext<K, V> cacheCtx = cached.context(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 1c71f12..e856aa8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -142,7 +143,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K @Nullable protected abstract IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, IgniteTxEntry<K, V> entry, - long topVer); + AffinityTopologyVersion topVer); /** * @param commit Commit flag. @@ -541,7 +542,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K try { Set<K> skipped = null; - long topVer = topologyVersion(); + AffinityTopologyVersion topVer = topologyVersion(); GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index e3cdc6e..8ad578b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -1247,7 +1248,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } } - long topVer = tx.topologyVersion(); + AffinityTopologyVersion topVer = tx.topologyVersion(); boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 0216774..8d894ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -51,7 +52,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque private IgniteUuid miniId; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Invalidate near entries flags. */ private BitSet invalidateNearEntries; @@ -112,7 +113,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque public GridDhtTxPrepareRequest( IgniteUuid futId, IgniteUuid miniId, - long topVer, + @NotNull AffinityTopologyVersion topVer, GridDhtTxLocalAdapter<K, V> tx, Collection<IgniteTxEntry<K, V>> dhtWrites, Collection<IgniteTxEntry<K, V>> nearWrites, @@ -243,7 +244,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -395,7 +396,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque writer.incrementState(); case 34: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -505,7 +506,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque reader.incrementState(); case 34: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 506888b..b2c2300 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -85,7 +86,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> IgniteUuid rmtFutId, UUID nodeId, long rmtThreadId, - long topVer, + AffinityTopologyVersion topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, @@ -145,7 +146,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> UUID nodeId, GridCacheVersion nearXidVer, long rmtThreadId, - long topVer, + AffinityTopologyVersion topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, @@ -219,7 +220,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> } /** {@inheritDoc} */ - @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { + @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) { if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || cctx.localNodeId().equals(nearNodeId)) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java index 8f78025..92d750a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -70,12 +71,12 @@ public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> { } /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> putEntry(long topVer, K key, @Nullable V val, long ttl) { + @Override public GridCacheMapEntry<K, V> putEntry(AffinityTopologyVersion topVer, K key, @Nullable V val, long ttl) { throw new AssertionError(); } /** {@inheritDoc} */ - @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(long topVer, K key, @Nullable V val, + @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, K key, @Nullable V val, long ttl, boolean create) { if (create) { GridCacheMapEntry<K, V> entry = new GridDhtCacheEntry<>(ctx, topVer, key, hash(key.hashCode()), val, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 980389c..fb51c91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -63,7 +64,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M private Collection<? extends K> keys; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Reload flag. */ private boolean reload; @@ -128,7 +129,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M public GridPartitionedGetFuture( GridCacheContext<K, V> cctx, Collection<? extends K> keys, - long topVer, + @NotNull AffinityTopologyVersion topVer, boolean readThrough, boolean reload, boolean forcePrimary, @@ -165,7 +166,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * Initializes future. */ public void init() { - long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer); @@ -274,7 +275,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param mapped Mappings to check for duplicates. * @param topVer Topology version on which keys should be mapped. */ - private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, long topVer) { + private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, AffinityTopologyVersion topVer) { if (CU.affinityNodes(cctx, topVer).isEmpty()) { onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all partition nodes left the grid).")); @@ -347,9 +348,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M remapKeys.add(key); } - long updTopVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); - assert updTopVer > topVer : "Got invalid partitions for local node but topology version did " + + assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + ", invalidParts=" + invalidParts + ']'; @@ -416,7 +417,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M */ @SuppressWarnings("ConstantConditions") private boolean map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings, Map<K, V> locVals, - long topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) { + AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) { GridDhtCacheAdapter<K, V> colocated = cache(); boolean remote = false; @@ -589,7 +590,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M private LinkedHashMap<K, Boolean> keys; /** Topology version on which this future was mapped. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Empty constructor required for {@link Externalizable}. @@ -603,7 +604,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param keys Keys. * @param topVer Topology version. */ - MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, long topVer) { + MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, @NotNull AffinityTopologyVersion topVer) { super(cctx.kernalContext()); this.node = node; @@ -651,9 +652,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - long updTopVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); - assert updTopVer > topVer : "Got topology exception but topology version did " + + assert updTopVer.compareTo(topVer) > 0 : "Got topology exception but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + ", nodeId=" + node.id() + ']'; @@ -679,11 +680,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M // Remap invalid partitions. if (!F.isEmpty(invalidParts)) { - long rmtTopVer = res.topologyVersion(); + AffinityTopologyVersion rmtTopVer = res.topologyVersion(); - assert rmtTopVer != 0; + assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO); - if (rmtTopVer <= topVer) { + if (rmtTopVer.compareTo(topVer) <= 0) { // Fail the whole get future. onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " + "invalid partitions but remote topology version does not differ from local) " + @@ -697,12 +698,12 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']'); // Need to wait for next topology version to remap. - IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer); + IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() { @SuppressWarnings("unchecked") @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException { - long topVer = fut.get(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get()); // This will append new futures to compound list. map(F.view(keys.keySet(), new P1<K>() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 9f9af31..78dce91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; @@ -119,7 +120,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override protected void init() { map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, + @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { return new GridDhtAtomicCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); } @@ -906,7 +907,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (keyCheck) validateCacheKeys(keys); - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); @@ -1086,7 +1087,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { // Do not check topology version for CLOCK versioning since // partition exchange will wait for near update future. - if (topology().topologyVersion() == req.topologyVersion() || + if (topology().topologyVersion().equals(req.topologyVersion()) || ctx.config().getAtomicWriteOrderMode() == CLOCK) { ClusterNode node = ctx.discovery().node(nodeId); @@ -1640,9 +1641,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { List<K> keys = req.keys(); - long topVer = req.topologyVersion(); + AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer.topologyVersion()); boolean readersOnly = false; @@ -1871,9 +1872,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert req.conflictVersions() == null : "Cannot be called when there are conflict entries in the batch."; - long topVer = req.topologyVersion(); + AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer.topologyVersion()); CacheStorePartialUpdateException storeErr = null; @@ -2091,7 +2092,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * locks are released. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private List<GridDhtCacheEntry<K, V>> lockEntries(List<K> keys, long topVer) + private List<GridDhtCacheEntry<K, V>> lockEntries(List<K> keys, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { if (keys.size() == 1) { K key = keys.get(0); @@ -2174,7 +2175,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param locked Locked entries. * @param topVer Topology version. */ - private void unlockEntries(Collection<GridDhtCacheEntry<K, V>> locked, long topVer) { + private void unlockEntries(Collection<GridDhtCacheEntry<K, V>> locked, AffinityTopologyVersion topVer) { // Process deleted entries before locks release. assert ctx.deferredDelete(); @@ -2350,9 +2351,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updateReq.fastMap()) return null; - long topVer = updateReq.topologyVersion(); + AffinityTopologyVersion topVer = updateReq.topologyVersion(); - Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer); + Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer.topologyVersion()); // We are on primary node for some key. assert !nodes.isEmpty(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java index 2fa9922..ec63130 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -35,7 +36,7 @@ public class GridDhtAtomicCacheEntry<K, V> extends GridDhtCacheEntry<K, V> { * @param ttl Time to live. * @param hdrId Header id. */ - public GridDhtAtomicCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val, + public GridDhtAtomicCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { super(ctx, topVer, key, hash, val, next, ttl, hdrId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 92fe74b..5f8240d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -195,7 +196,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> } /** {@inheritDoc} */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return updateReq.topologyVersion(); } @@ -220,7 +221,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> long ttl, long conflictExpireTime, @Nullable GridCacheVersion conflictVer) { - long topVer = updateReq.topologyVersion(); + AffinityTopologyVersion topVer = updateReq.topologyVersion(); Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); @@ -285,7 +286,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> keys.add(entry.key()); - long topVer = updateReq.topologyVersion(); + AffinityTopologyVersion topVer = updateReq.topologyVersion(); for (UUID nodeId : readers) { GridDhtAtomicUpdateRequest<K, V> updateReq = mappings.get(nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 91e83d2..685c466 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -53,7 +54,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp private GridCacheVersion writeVer; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Keys to update. */ @GridToStringInclude @@ -173,7 +174,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp GridCacheVersion futVer, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, - long topVer, + @NotNull AffinityTopologyVersion topVer, boolean forceTransformBackups, UUID subjId, int taskNameHash, @@ -415,7 +416,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -778,7 +779,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp writer.incrementState(); case 19: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -950,7 +951,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp reader.incrementState(); case 19: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index c3cc50a..8a3ca8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -113,7 +114,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> private final ExpiryPolicy expiryPlc; /** Future map topology version. */ - private long topVer; + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; /** Optional filter. */ private final IgnitePredicate<Cache.Entry<K, V>>[] filter; @@ -261,7 +262,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } /** {@inheritDoc} */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -444,8 +445,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> snapshot = fut.topologySnapshot(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { + fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { mapOnTopology(keys, remap, oldNodeId); } }); @@ -453,7 +454,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> return; } - topVer = snapshot.topologyVersion(); + topVer = new AffinityTopologyVersion(snapshot.topologyVersion()); mapTime = U.currentTimeMillis(); @@ -498,7 +499,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> @Nullable UUID oldNodeId) { assert oldNodeId == null || remap; - long topVer = topSnapshot.topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(topSnapshot.topologyVersion()); Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); @@ -592,7 +593,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> futVer, fastMap, updVer, - topSnapshot.topologyVersion(), + topVer, syncMode, op, retval, @@ -712,7 +713,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> futVer, fastMap, updVer, - topSnapshot.topologyVersion(), + topVer, syncMode, op, retval, @@ -762,7 +763,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. * @return Collection of nodes to which key is mapped. */ - private Collection<ClusterNode> mapKey(K key, long topVer, boolean fastMap) { + private Collection<ClusterNode> mapKey(K key, AffinityTopologyVersion topVer, boolean fastMap) { GridCacheAffinityManager<K, V> affMgr = cctx.affinity(); // If we can send updates in parallel - do it. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index b41e3a8..7457b0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -63,7 +64,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im private GridCacheVersion updateVer; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Write synchronization mode. */ private CacheWriteSynchronizationMode syncMode; @@ -166,7 +167,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im GridCacheVersion futVer, boolean fastMap, @Nullable GridCacheVersion updateVer, - long topVer, + @NotNull AffinityTopologyVersion topVer, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, boolean retval, @@ -255,7 +256,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -642,7 +643,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im writer.incrementState(); case 19: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -812,7 +813,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im reader.incrementState(); case 19: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index a59b6aa..5ab75d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -82,7 +83,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Override protected void init() { map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, + @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { return new GridDhtColocatedCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); } @@ -118,7 +119,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @throws GridDhtInvalidPartitionException If {@code allowDetached} is false and node is not primary * for given key. */ - public GridDistributedCacheEntry<K, V> entryExx(K key, long topVer, boolean allowDetached) { + public GridDistributedCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer, boolean allowDetached) { return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ? new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer); } @@ -179,7 +180,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte }); } - long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); @@ -199,7 +200,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override protected GridCacheEntryEx<K, V> entryExSafe(K key, long topVer) { + @Override protected GridCacheEntryEx<K, V> entryExSafe(K key, AffinityTopologyVersion topVer) { try { return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null; } @@ -224,7 +225,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean readThrough, boolean reload, boolean forcePrimary, - long topVer, + AffinityTopologyVersion topVer, @Nullable UUID subjId, String taskName, boolean deserializePortable, @@ -424,12 +425,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), key, null); if (lock != null) { - final long topVer = lock.topologyVersion(); + final AffinityTopologyVersion topVer = lock.topologyVersion(); - assert topVer > 0; + assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); + Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer.topologyVersion()); keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); @@ -518,10 +519,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte GridCacheMvccCandidate<K> lock = ctx.mvcc().removeExplicitLock(threadId, key, ver); if (lock != null) { - long topVer = lock.topologyVersion(); + AffinityTopologyVersion topVer = lock.topologyVersion(); if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); + Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer.topologyVersion()); keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); @@ -596,7 +597,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable final GridNearTxLocal<K, V> tx, final long threadId, final GridCacheVersion ver, - final long topVer, + final AffinityTopologyVersion topVer, final Collection<K> keys, final boolean txRead, final long timeout, @@ -669,7 +670,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable final GridNearTxLocal<K, V> tx, long threadId, final GridCacheVersion ver, - final long topVer, + final AffinityTopologyVersion topVer, final Collection<K> keys, final boolean txRead, final long timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java index b19a2da..f2de9d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -35,7 +36,7 @@ public class GridDhtColocatedCacheEntry<K, V> extends GridDhtCacheEntry<K, V> { * @param ttl Time to live. * @param hdrId Header id. */ - public GridDhtColocatedCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val, + public GridDhtColocatedCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { super(ctx, topVer, key, hash, val, next, ttl, hdrId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 21f6364..1e13dd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -295,7 +296,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity false, false); - cand.topologyVersion(topSnapshot.get().topologyVersion()); + cand.topologyVersion(new AffinityTopologyVersion(topSnapshot.get().topologyVersion())); } } else { @@ -314,7 +315,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity false, false); - cand.topologyVersion(topSnapshot.get().topologyVersion()); + cand.topologyVersion(new AffinityTopologyVersion(topSnapshot.get().topologyVersion())); } else cand = cand.reenter(); @@ -562,7 +563,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity GridDiscoveryTopologySnapshot snapshot = fut.topologySnapshot(); if (tx != null) { - tx.topologyVersion(snapshot.topologyVersion()); + tx.topologyVersion(new AffinityTopologyVersion(snapshot.topologyVersion())); tx.topologySnapshot(snapshot); } @@ -573,8 +574,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity markInitialized(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { + fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { mapOnTopology(); } }); @@ -602,9 +603,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity assert snapshot != null; - final long topVer = snapshot.topologyVersion(); + final AffinityTopologyVersion topVer = new AffinityTopologyVersion(snapshot.topologyVersion()); - assert topVer > 0; + assert topVer.topologyVersion() > 0; if (CU.affinityNodes(cctx, topVer).isEmpty()) { onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all partition nodes left the grid).")); @@ -870,7 +871,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param topVer Topology version to lock on. * @param mappings Optional collection of mappings to proceed locking. */ - private void lockLocally(final Collection<K> keys, long topVer, + private void lockLocally(final Collection<K> keys, AffinityTopologyVersion topVer, @Nullable final Deque<GridNearLockMapping<K, V>> mappings) { if (log.isDebugEnabled()) log.debug("Before locally locking keys : " + keys); @@ -947,7 +948,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed. * @throws IgniteCheckedException If key cannot be added to mapping. */ - private boolean mapAsPrimary(Collection<? extends K> keys, long topVer) throws IgniteCheckedException { + private boolean mapAsPrimary(Collection<? extends K> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException { // Assign keys to primary nodes. Collection<K> distributedKeys = new ArrayList<>(keys.size()); @@ -992,7 +993,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @return {@code True} if transaction accesses key that was explicitly locked before. * @throws IgniteCheckedException If lock is externally held and transaction is explicit. */ - private boolean addLocalKey(K key, long topVer, Collection<K> distributedKeys) throws IgniteCheckedException { + private boolean addLocalKey(K key, AffinityTopologyVersion topVer, Collection<K> distributedKeys) throws IgniteCheckedException { GridDistributedCacheEntry<K, V> entry = cctx.colocated().entryExx(key, topVer, false); assert !entry.detached(); @@ -1022,7 +1023,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @throws IgniteCheckedException If mapping failed. */ private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping, - long topVer) throws IgniteCheckedException { + AffinityTopologyVersion topVer) throws IgniteCheckedException { assert mapping == null || mapping.node() != null; ClusterNode primary = cctx.affinity().primary(key, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 170a0c0..dbf6146 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.*; @@ -74,7 +75,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec private AtomicInteger topCntr = new AtomicInteger(1); /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Future ID. */ private IgniteUuid futId = IgniteUuid.randomUuid(); @@ -91,11 +92,11 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param keys Keys. * @param preloader Preloader. */ - public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, long topVer, Collection<? extends K> keys, - GridDhtPreloader<K, V> preloader) { + public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, @NotNull AffinityTopologyVersion topVer, + Collection<? extends K> keys, GridDhtPreloader<K, V> preloader) { super(cctx.kernalContext()); - assert topVer != 0 : topVer; + assert topVer.topologyVersion() != 0 : topVer; assert !F.isEmpty(keys) : keys; this.cctx = cctx; @@ -495,7 +496,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec for (GridCacheEntryInfo<K, V> info : res.forcedInfos()) { int p = cctx.affinity().partition(info.key()); - GridDhtLocalPartition<K, V> locPart = top.localPartition(p, -1, false); + GridDhtLocalPartition<K, V> locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); if (locPart != null && locPart.state() == MOVING && locPart.reserve()) { GridCacheEntryEx<K, V> entry = cctx.dht().entryEx(info.key()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java index 2a28062..a17c5b74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java @@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@ -54,7 +56,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem private Collection<K> keys; /** Topology version for which keys are requested. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * @param cacheId Cache ID. @@ -68,7 +70,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem IgniteUuid futId, IgniteUuid miniId, Collection<K> keys, - long topVer + @NotNull AffinityTopologyVersion topVer ) { assert futId != null; assert miniId != null; @@ -133,7 +135,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem /** * @return Topology version for which keys are requested. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -195,7 +197,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem writer.incrementState(); case 6: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -241,7 +243,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem reader.incrementState(); case 6: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 54a47d5..84b376a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -19,10 +19,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@ -57,13 +59,13 @@ public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V> private int workerId = -1; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * @param updateSeq Update sequence for this node. * @param topVer Topology version. */ - GridDhtPartitionDemandMessage(long updateSeq, long topVer, int cacheId) { + GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) { assert updateSeq > 0; this.cacheId = cacheId; @@ -168,7 +170,7 @@ public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V> /** * @return Topology version for which demand message is sent. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -217,7 +219,7 @@ public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V> writer.incrementState(); case 5: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -273,7 +275,7 @@ public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V> reader.incrementState(); case 5: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 6a1f7a1..8f971a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.timeout.*; @@ -208,8 +209,8 @@ public class GridDhtPartitionDemandPool<K, V> { if (log.isDebugEnabled()) log.debug("Forcing preload event for future: " + exchFut); - exchFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { + exchFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.shared().exchange().forcePreloadExchange(exchFut); } }); @@ -292,7 +293,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param topVer Topology version. * @return Picked owners. */ - private Collection<ClusterNode> pickedOwners(int p, long topVer) { + private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) { Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); int affCnt = affNodes.size(); @@ -318,7 +319,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param topVer Topology version. * @return Nodes owning this partition. */ - private Collection<ClusterNode> remoteOwners(int p, long topVer) { + private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); } @@ -357,8 +358,8 @@ public class GridDhtPartitionDemandPool<K, V> { obj = new GridTimeoutObjectAdapter(delay) { @Override public void onTimeout() { - exchFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> f) { + exchFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { cctx.shared().exchange().forcePreloadExchange(exchFut); } }); @@ -481,7 +482,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @return {@code False} if partition has become invalid during preloading. * @throws IgniteInterruptedCheckedException If interrupted. */ - private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, long topVer) + private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, AffinityTopologyVersion topVer) throws IgniteCheckedException { try { GridCacheEntryEx<K, V> cached = null; @@ -569,7 +570,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @throws ClusterTopologyCheckedException If node left. * @throws IgniteCheckedException If failed to send message. */ - private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage<K, V> d, + private Set<Integer> demandFromNode(ClusterNode node, final AffinityTopologyVersion topVer, GridDhtPartitionDemandMessage<K, V> d, GridDhtPartitionsExchangeFuture<K, V> exchFut) throws InterruptedException, IgniteCheckedException { GridDhtPartitionTopology<K, V> top = cctx.dht().topology(); @@ -978,13 +979,13 @@ public class GridDhtPartitionDemandPool<K, V> { int partCnt = cctx.affinity().partitions(); assert exchFut.forcePreload() || exchFut.dummyReassign() || - exchFut.exchangeId().topologyVersion() == top.topologyVersion() : + exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", topVer=" + top.topologyVersion() + ']'; GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); - long topVer = assigns.topologyVersion(); + AffinityTopologyVersion topVer = assigns.topologyVersion(); for (int p = 0; p < partCnt; p++) { if (cctx.shared().exchange().hasPendingExchange()) {