http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java index 1823c49..71add90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@ -43,17 +45,17 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa private int evt; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * @param nodeId Node ID. * @param evt Event. * @param topVer Topology version. */ - public GridDhtPartitionExchangeId(UUID nodeId, int evt, long topVer) { + public GridDhtPartitionExchangeId(UUID nodeId, int evt, @NotNull AffinityTopologyVersion topVer) { assert nodeId != null; assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED; - assert topVer > 0; + assert topVer.topologyVersion() > 0; this.nodeId = nodeId; this.evt = evt; @@ -84,7 +86,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa /** * @return Order. */ - public long topologyVersion() { + public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -105,14 +107,14 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeUuid(out, nodeId); - out.writeLong(topVer); + out.writeObject(topVer); out.writeInt(evt); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { nodeId = U.readUuid(in); - topVer = in.readLong(); + topVer = (AffinityTopologyVersion)in.readObject(); evt = in.readInt(); } @@ -121,7 +123,9 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa if (o == this) return 0; - return topVer < o.topVer ? -1 : topVer == o.topVer ? 0 : 1; + + + return topVer.compareTo(o.topVer); } /** {@inheritDoc} */ @@ -129,7 +133,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa int res = nodeId.hashCode(); res = 31 * res + evt; - res = 31 * res + (int)(topVer ^ (topVer >>> 32)); + res = 31 * res + topVer.hashCode(); return res; } @@ -141,7 +145,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa GridDhtPartitionExchangeId id = (GridDhtPartitionExchangeId)o; - return evt == id.evt && topVer == id.topVer && nodeId.equals(id.nodeId); + return evt == id.evt && topVer.equals(id.topVer) && nodeId.equals(id.nodeId); } /** {@inheritDoc} */ @@ -169,7 +173,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa writer.incrementState(); case 2: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -204,7 +208,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa reader.incrementState(); case 2: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index db9bd08..81ab4bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -24,6 +24,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -47,7 +48,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** * Future for exchanging partition maps. */ -public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Long> +public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<AffinityTopologyVersion> implements Comparable<GridDhtPartitionsExchangeFuture<K, V>>, GridDhtTopologyFuture { /** */ private static final long serialVersionUID = 0L; @@ -216,7 +217,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon log = cctx.logger(getClass()); // Grab all nodes with order of equal or less than last joined node. - oldestNode.set(CU.oldest(cctx, exchId.topologyVersion())); + oldestNode.set(CU.oldest(cctx, exchId.topologyVersion().topologyVersion())); assert oldestNode.get() != null; @@ -432,7 +433,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon } // Grab all alive remote nodes with order of equal or less than last joined node. - rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, exchId.topologyVersion())); + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, exchId.topologyVersion().topologyVersion())); rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); @@ -444,7 +445,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon // If received any messages, process them. onReceive(m.getKey(), m.getValue()); - long topVer = exchId.topologyVersion(); + AffinityTopologyVersion topVer = exchId.topologyVersion(); for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) @@ -485,7 +486,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon GridDhtPartitionTopology<K, V> top = cacheCtx.topology(); - assert topVer == top.topologyVersion() : + assert topVer.equals(top.topologyVersion()) : "Topology version is updated only in this class instances inside single ExchangeWorker thread."; top.beforeExchange(exchId); @@ -632,11 +633,11 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon } /** {@inheritDoc} */ - @Override public boolean onDone(Long res, Throwable err) { + @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) { if (err == null) { for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) - cacheCtx.affinity().cleanUpCache(res - 10); + cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10); } } @@ -827,7 +828,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon if (log.isDebugEnabled()) log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); - assert exchId.topologyVersion() == msg.topologyVersion(); + assert exchId.topologyVersion().equals(msg.topologyVersion()); initFut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> t) { @@ -915,7 +916,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon boolean set = false; - ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion()); + ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion().topologyVersion()); // If local node is now oldest. if (newOldest.id().equals(cctx.localNodeId())) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 70fe7fd..b7cbf5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; @@ -46,7 +47,7 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac private byte[] partsBytes; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Required by {@link Externalizable}. @@ -60,11 +61,11 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac * @param lastVer Last version. */ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, - long topVer) { + @NotNull AffinityTopologyVersion topVer) { super(id, lastVer); assert parts != null; - assert id == null || topVer == id.topologyVersion(); + assert id == null || topVer.equals(id.topologyVersion()); this.topVer = topVer; } @@ -96,14 +97,14 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } /** * @param topVer Topology version. */ - public void topologyVersion(long topVer) { + public void topologyVersion(AffinityTopologyVersion topVer) { this.topVer = topVer; } @@ -137,7 +138,7 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac writer.incrementState(); case 6: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -167,7 +168,7 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac reader.incrementState(); case 6: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 0821431..0a74d0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -23,6 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.*; @@ -70,7 +71,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); /** Pending affinity assignment futures. */ - private ConcurrentMap<Long, GridDhtAssignmentFetchFuture<K, V>> pendingAssignmentFetchFuts = + private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture<K, V>> pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); /** Discovery listener. */ @@ -276,7 +277,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param topVer Requested topology version. * @param fut Future to add. */ - public void addDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture<K, V> fut) { + public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture<K, V> fut) { GridDhtAssignmentFetchFuture<K, V> old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut); assert old == null : "More than one thread is trying to fetch partition assignments: " + topVer; @@ -286,7 +287,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param topVer Requested topology version. * @param fut Future to remove. */ - public void removeDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture<K, V> fut) { + public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture<K, V> fut) { boolean rmv = pendingAssignmentFetchFuts.remove(topVer, fut); assert rmv : "Failed to remove assignment fetch future: " + topVer; @@ -348,7 +349,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { for (K k : msg.keys()) { int p = cctx.affinity().partition(k); - GridDhtLocalPartition<K, V> locPart = top.localPartition(p, -1, false); + GridDhtLocalPartition<K, V> locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); // If this node is no longer an owner. if (locPart == null && !top.owners(p).contains(loc)) @@ -423,13 +424,13 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { */ private void processAffinityAssignmentRequest(final ClusterNode node, final GridDhtAffinityAssignmentRequest<K, V> req) { - final long topVer = req.topologyVersion(); + final AffinityTopologyVersion topVer = req.topologyVersion(); if (log.isDebugEnabled()) log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']'); - cctx.affinity().affinityReadyFuture(req.topologyVersion()).listenAsync(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> fut) { + cctx.affinity().affinityReadyFuture(req.topologyVersion()).listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { if (log.isDebugEnabled()) log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + ", node=" + node + ']'); @@ -488,7 +489,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @return Future for request. */ @SuppressWarnings( {"unchecked", "RedundantCast"}) - @Override public GridDhtFuture<Object> request(Collection<? extends K> keys, long topVer) { + @Override public GridDhtFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer) { final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index fafec8d..76ea0a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -36,15 +37,15 @@ public class GridDhtPreloaderAssignments<K, V> extends private final GridDhtPartitionsExchangeFuture<K, V> exchFut; /** Last join order. */ - private final long topVer; + private final AffinityTopologyVersion topVer; /** * @param exchFut Exchange future. * @param topVer Last join order. */ - public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture<K, V> exchFut, long topVer) { + public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture<K, V> exchFut, AffinityTopologyVersion topVer) { assert exchFut != null; - assert topVer > 0; + assert topVer.topologyVersion() > 0; this.exchFut = exchFut; this.topVer = topVer; @@ -60,7 +61,7 @@ public class GridDhtPreloaderAssignments<K, V> extends /** * @return Topology version. */ - long topologyVersion() { + AffinityTopologyVersion topologyVersion() { return topVer; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 22403ef..7bf4cc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; @@ -210,7 +211,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { while (true) { GridCacheEntryEx<K, V> entry = null; - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); try { entry = entryEx(key, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 94d2e96..79f740c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -69,7 +70,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Override protected void init() { map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, + @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { // Can't hold any locks here - this method is invoked when // holding write-lock on the whole cache map. @@ -113,7 +114,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) { + @Override public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) { GridNearCacheEntry<K, V> entry = null; while (true) { @@ -136,7 +137,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param topVer Topology version. * @return Entry. */ - public GridNearCacheEntry<K, V> entryExx(K key, long topVer) { + public GridNearCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer) { return (GridNearCacheEntry<K, V>)entryEx(key, topVer); } @@ -373,7 +374,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> primaryEntrySet( @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) { - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); Collection<Cache.Entry<K, V>> entries = F.flatCollections( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 4d86a85..fc548fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -27,10 +28,8 @@ import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; -import javax.cache.*; import java.util.*; import static org.apache.ignite.events.EventType.*; @@ -85,8 +84,8 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { } /** {@inheritDoc} */ - @Override public boolean valid(long topVer) { - assert topVer > 0 : "Topology version is invalid: " + topVer; + @Override public boolean valid(AffinityTopologyVersion topVer) { + assert topVer.topologyVersion() > 0 : "Topology version is invalid: " + topVer; UUID primaryNodeId = this.primaryNodeId; @@ -116,7 +115,7 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @return {@code True} if this entry was initialized by this call. * @throws GridCacheEntryRemovedException If this entry is obsolete. */ - public boolean initializeFromDht(long topVer) throws GridCacheEntryRemovedException { + public boolean initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { while (true) { GridDhtCacheEntry<K, V> entry = cctx.near().dht().peekExx(key); @@ -345,7 +344,7 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { @SuppressWarnings({"RedundantTypeArguments"}) public boolean loadedValue(@Nullable IgniteInternalTx tx, UUID primaryNodeId, V val, byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer, @Nullable GridCacheVersion expVer, long ttl, long expireTime, - boolean evt, long topVer, UUID subjId) + boolean evt, AffinityTopologyVersion topVer, UUID subjId) throws IgniteCheckedException, GridCacheEntryRemovedException { boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 9628f3b..1a7a568 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -168,7 +169,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * Initializes future. */ public void init() { - long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer); @@ -278,7 +279,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma */ private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, - final long topVer) { + final AffinityTopologyVersion topVer) { Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer); if (affNodes.isEmpty()) { @@ -347,9 +348,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma remapKeys.add(key); } - long updTopVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); - assert updTopVer > topVer : "Got invalid partitions for local node but topology version did " + + assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + ", invalidParts=" + invalidParts + ']'; @@ -421,7 +422,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * @return Map. */ private Map<K, GridCacheVersion> map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings, - long topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, Map<K, GridCacheVersion> savedVers) { + AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, Map<K, GridCacheVersion> savedVers) { final GridNearCacheAdapter<K, V> near = cache(); // Allow to get cached value from the local node. @@ -603,7 +604,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma Collection<K> keys, Collection<GridCacheEntryInfo<K, V>> infos, Map<K, GridCacheVersion> savedVers, - long topVer) { + AffinityTopologyVersion topVer) { boolean empty = F.isEmpty(keys); Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new GridLeanMap<K, V>(keys.size()); @@ -688,7 +689,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma private Map<K, GridCacheVersion> savedVers; /** Topology version on which this future was mapped. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Empty constructor required for {@link Externalizable}. @@ -703,7 +704,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * @param savedVers Saved entry versions. * @param topVer Topology version. */ - MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, Map<K, GridCacheVersion> savedVers, long topVer) { + MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, Map<K, GridCacheVersion> savedVers, + @NotNull AffinityTopologyVersion topVer) { super(cctx.kernalContext()); this.node = node; @@ -751,9 +753,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - long updTopVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); - if (updTopVer > topVer) { + if (updTopVer.compareTo(topVer) > 0) { // Remap. map(keys.keySet(), F.t(node, keys), updTopVer); @@ -762,7 +764,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma else { final RemapTimeoutObject timeout = new RemapTimeoutObject(ctx.config().getNetworkTimeout(), topVer, e); - ctx.discovery().topologyFuture(topVer + 1).listenAsync(new CI1<IgniteInternalFuture<Long>>() { + ctx.discovery().topologyFuture(topVer.topologyVersion() + 1).listenAsync(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> longIgniteFuture) { if (timeout.finish()) { ctx.timeout().removeTimeoutObject(timeout); @@ -794,11 +796,11 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma // Remap invalid partitions. if (!F.isEmpty(invalidParts)) { - long rmtTopVer = res.topologyVersion(); + AffinityTopologyVersion rmtTopVer = res.topologyVersion(); - assert rmtTopVer != 0; + assert rmtTopVer.topologyVersion() != 0; - if (rmtTopVer <= topVer) { + if (rmtTopVer.compareTo(topVer) <= 0) { // Fail the whole get future. onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " + "invalid partitions but remote topology version does not differ from local) " + @@ -812,7 +814,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']'); // Need to wait for next topology version to remap. - IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer); + IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() { @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException { @@ -823,7 +825,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma @Override public boolean apply(K key) { return invalidParts.contains(cctx.affinity().partition(key)); } - }), F.t(node, keys), readyTopVer); + }), F.t(node, keys), new AffinityTopologyVersion(readyTopVer)); // It is critical to call onDone after adding futures to compound list. onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer)); @@ -847,7 +849,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma private AtomicBoolean finished = new AtomicBoolean(); /** Topology version to wait. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Exception cause. */ private IgniteCheckedException e; @@ -856,7 +858,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * @param timeout Timeout. * @param topVer Topology version timeout was created on. */ - private RemapTimeoutObject(long timeout, long topVer, IgniteCheckedException e) { + private RemapTimeoutObject(long timeout, @NotNull AffinityTopologyVersion topVer, IgniteCheckedException e) { super(timeout); this.topVer = topVer; @@ -867,7 +869,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma @Override public void onTimeout() { if (finish()) // Fail the whole get future. - onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + (topVer + 1), e)); + onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + + (topVer.topologyVersion() + 1), e)); // else remap happened concurrently. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 06b3a61..30126f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@ -68,7 +70,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements private LinkedHashMap<byte[], Boolean> keyBytes; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Subject ID. */ private UUID subjId; @@ -109,7 +111,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements LinkedHashMap<K, Boolean> keys, boolean readThrough, boolean reload, - long topVer, + @NotNull AffinityTopologyVersion topVer, UUID subjId, int taskNameHash, long accessTtl, @@ -201,7 +203,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -308,7 +310,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements writer.incrementState(); case 12: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -408,7 +410,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements reader.incrementState(); case 12: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java index 6bbbc0f..10b7291 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@ -62,7 +64,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements private Collection<Integer> invalidParts = new GridLeanSet<>(); /** Topology version if invalid partitions is not empty. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Error. */ @GridDirectTransient @@ -144,7 +146,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements * @param invalidParts Partitions to retry due to ownership shift. * @param topVer Topology version. */ - public void invalidPartitions(Collection<Integer> invalidParts, long topVer) { + public void invalidPartitions(Collection<Integer> invalidParts, @NotNull AffinityTopologyVersion topVer) { this.invalidParts = invalidParts; this.topVer = topVer; } @@ -152,7 +154,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements /** * @return Topology version if this response has invalid partitions. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -245,7 +247,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements writer.incrementState(); case 8: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -313,7 +315,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements reader.incrementState(); case 8: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 210772d..bab1691 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -296,7 +297,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @return Lock candidate. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable private GridCacheMvccCandidate<K> addEntry(long topVer, GridNearCacheEntry<K, V> entry, UUID dhtNodeId) + @Nullable private GridCacheMvccCandidate<K> addEntry(AffinityTopologyVersion topVer, GridNearCacheEntry<K, V> entry, UUID dhtNodeId) throws GridCacheEntryRemovedException { // Check if lock acquisition is timed out. if (timedOut) @@ -690,7 +691,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B GridDiscoveryTopologySnapshot snapshot = fut.topologySnapshot(); if (tx != null) { - tx.topologyVersion(snapshot.topologyVersion()); + tx.topologyVersion(new AffinityTopologyVersion(snapshot.topologyVersion())); tx.topologySnapshot(snapshot); } @@ -701,8 +702,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B markInitialized(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { + fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { mapOnTopology(); } }); @@ -730,9 +731,9 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B assert snapshot != null; - long topVer = snapshot.topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(snapshot.topologyVersion()); - assert topVer > 0; + assert topVer.topologyVersion() > 0; if (CU.affinityNodes(cctx, topVer).isEmpty()) { onDone(new ClusterTopologyCheckedException("Failed to map keys for near-only cache (all " + @@ -1151,7 +1152,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @throws IgniteCheckedException If mapping for key failed. */ private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping, - long topVer) throws IgniteCheckedException { + AffinityTopologyVersion topVer) throws IgniteCheckedException { assert mapping == null || mapping.node() != null; ClusterNode primary = cctx.affinity().primary(key, topVer); @@ -1345,7 +1346,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B int i = 0; - long topVer = topSnapshot.get().topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(topSnapshot.get().topologyVersion()); for (K k : keys) { while (true) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index d1ae174..e8c071b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -43,7 +44,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> private static final long serialVersionUID = 0L; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Mini future ID. */ private IgniteUuid miniId; @@ -115,7 +116,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> */ public GridNearLockRequest( int cacheId, - long topVer, + @NotNull AffinityTopologyVersion topVer, UUID nodeId, long threadId, IgniteUuid futId, @@ -153,7 +154,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> grpLockKey, partLock); - assert topVer > 0; + assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; this.topVer = topVer; this.implicitTx = implicitTx; @@ -169,7 +170,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -403,7 +404,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> writer.incrementState(); case 33: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -513,7 +514,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> reader.incrementState(); case 33: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index d6ec9dd..e772114 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -183,7 +184,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> List<K> keys = req.nearKeys(); if (keys != null) { - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); for (K key : keys) { while (true) { @@ -440,7 +441,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param topVer Topology version. * @return {@code True} if entry is locally mapped as a primary or back up node. */ - protected boolean isNearLocallyMapped(GridCacheEntryEx<K, V> e, long topVer) { + protected boolean isNearLocallyMapped(GridCacheEntryEx<K, V> e, AffinityTopologyVersion topVer) { return ctx.affinity().belongs(ctx.localNode(), e.key(), topVer); } @@ -451,7 +452,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param topVer Topology version. * @return {@code True} if attempt was made to evict the entry. */ - protected boolean evictNearEntry(GridCacheEntryEx<K, V> e, GridCacheVersion obsoleteVer, long topVer) { + protected boolean evictNearEntry(GridCacheEntryEx<K, V> e, GridCacheVersion obsoleteVer, AffinityTopologyVersion topVer) { assert e != null; assert obsoleteVer != null; @@ -490,7 +491,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> try { GridCacheMvccCandidate<K> cand = entry.candidate(ctx.nodeId(), Thread.currentThread().getId()); - long topVer = -1; + AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; if (cand != null) { assert cand.nearLocal() : "Got non-near-local candidate in near cache: " + cand; @@ -498,7 +499,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> ver = cand.version(); if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion()); + Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion().topologyVersion()); if (F.isEmpty(affNodes)) return; @@ -550,9 +551,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> } } - assert topVer != -1 || cand == null; + assert !topVer.equals(AffinityTopologyVersion.NONE) || cand == null; - if (topVer == -1) + if (topVer.equals(AffinityTopologyVersion.NONE)) topVer = ctx.affinity().affinityTopologyVersion(); ctx.evicts().touch(entry, topVer); @@ -616,7 +617,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (cand != null) { if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion()); + Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion().topologyVersion()); if (F.isEmpty(affNodes)) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index f3811c6..85a40e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -222,7 +223,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (super.onDone(tx, th != null ? th : err)) { if (error() instanceof IgniteTxHeuristicCheckedException) { - long topVer = this.tx.topologyVersion(); + AffinityTopologyVersion topVer = this.tx.topologyVersion(); for (IgniteTxEntry<K, V> e : this.tx.writeMap().values()) { GridCacheContext<K, V> cacheCtx = e.context(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index f29cfea..fc843df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; @@ -45,7 +46,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques private boolean storeEnabled; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Subject ID. */ private UUID subjId; @@ -86,7 +87,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques boolean syncRollback, boolean explicitLock, boolean storeEnabled, - long topVer, + @NotNull AffinityTopologyVersion topVer, GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, @@ -148,7 +149,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -198,7 +199,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques writer.incrementState(); case 24: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -260,7 +261,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques reader.incrementState(); case 24: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 06e4767..1ee19de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -183,7 +184,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { /** {@inheritDoc} */ @Override protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, - IgniteTxEntry<K, V> entry, long topVer) { + IgniteTxEntry<K, V> entry, AffinityTopologyVersion topVer) { // We are in near transaction, do not add local node as reader. return null; } @@ -1140,7 +1141,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key, long topVer) { + @Override protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key, AffinityTopologyVersion topVer) { if (cacheCtx.isColocated()) { IgniteTxEntry<K, V> txEntry = entry(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 18fda47..05a53ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; @@ -338,7 +339,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut GridDiscoveryTopologySnapshot snapshot = topFut.topologySnapshot(); - tx.topologyVersion(snapshot.topologyVersion()); + tx.topologyVersion(new AffinityTopologyVersion(snapshot.topologyVersion())); tx.topologySnapshot(snapshot); // Make sure to add future before calling prepare. @@ -364,8 +365,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut else { topFut.syncNotify(false); - topFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { + topFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { prepare(); } }); @@ -463,9 +464,9 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut assert snapshot != null; - long topVer = snapshot.topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(snapshot.topologyVersion()); - assert topVer > 0; + assert topVer.topologyVersion() > 0; txMapping = new GridDhtTxMapping<>(); @@ -548,7 +549,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut private void preparePessimistic() { Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping<K, V>> mappings = new HashMap<>(); - long topVer = tx.topologyVersion(); + AffinityTopologyVersion topVer = tx.topologyVersion(); txMapping = new GridDhtTxMapping<>(); @@ -752,7 +753,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut */ private GridDistributedTxMapping<K, V> map( IgniteTxEntry<K, V> entry, - long topVer, + AffinityTopologyVersion topVer, GridDistributedTxMapping<K, V> cur, boolean waitLock ) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index ca82996..4df0bb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -49,7 +50,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ private boolean near; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** {@code True} if this last prepare request for node. */ private boolean last; @@ -95,7 +96,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ */ public GridNearTxPrepareRequest( IgniteUuid futId, - long topVer, + @NotNull AffinityTopologyVersion topVer, IgniteInternalTx<K, V> tx, Collection<IgniteTxEntry<K, V>> reads, Collection<IgniteTxEntry<K, V>> writes, @@ -199,7 +200,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -312,7 +313,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ writer.incrementState(); case 32: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -406,7 +407,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ reader.incrementState(); case 32: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java index 85831a8..58d3b0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.dr; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; @@ -66,7 +67,7 @@ public interface GridCacheDrManager<K, V> extends GridCacheManager<K, V> { * @param left {@code True} if exchange has been caused by node leave. * @throws IgniteCheckedException If failed. */ - public void beforeExchange(long topVer, boolean left) throws IgniteCheckedException; + public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException; /** * @return {@code True} is DR is enabled. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java index 49f617b..516aa17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.dr.os; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -81,7 +82,7 @@ public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> { } /** {@inheritDoc} */ - @Override public void beforeExchange(long topVer, boolean left) throws IgniteCheckedException { + @Override public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 53b216c..49e893b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.local; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -73,8 +74,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { @Override protected void init() { map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, + K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { return new GridLocalCacheEntry<>(ctx, key, hash, val, next, ttl, hdrId); } }); @@ -178,7 +179,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void unlockAll(Collection<? extends K> keys, IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); for (K key : keys) { GridLocalCacheEntry<K, V> entry = peekExx(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index c3da493..cac086c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.local.atomic; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.local.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -75,7 +76,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override protected void init() { map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, + @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, @Nullable GridCacheMapEntry<K, V> next, long ttl, int hdrId) { return new GridLocalCacheEntry<K, V>(ctx, key, hash, val, next, ttl, hdrId); } @@ -1531,7 +1532,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { for (GridCacheEntryEx<K, V> entry : locked) UNSAFE.monitorExit(entry); - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); for (GridCacheEntryEx<K, V> entry : locked) ctx.evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 362077f..ba27368 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -24,6 +24,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -1263,7 +1264,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Collection<Object> data = new ArrayList<>(pageSize); - long topVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); final boolean statsEnabled = cctx.config().isStatisticsEnabled(); @@ -1869,7 +1870,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return new IgniteBiPredicate<K, V>() { @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1); + return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE); } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 0643c0e..239b223 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; @@ -152,7 +153,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K boolean initialized = false; - boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); + boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE); boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { @@ -203,8 +204,8 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K if (F.isEmpty(lsnrCol)) return; - if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) { - boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); + if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE)) { + boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE); boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); boolean initialized = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index a06a558..40df361 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.timeout.*; @@ -246,7 +247,7 @@ public interface IgniteInternalTx<K, V> extends AutoCloseable, GridTimeoutObject /** * @return Last recorded topology version. */ - public long topologyVersion(); + public AffinityTopologyVersion topologyVersion(); /** * @return Flag indicating whether transaction is implicit with only one key. @@ -266,7 +267,7 @@ public interface IgniteInternalTx<K, V> extends AutoCloseable, GridTimeoutObject * @param topVer Topology version. * @return Recorded topology version. */ - public long topologyVersion(long topVer); + public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer); /** * @return {@code True} if transaction is empty. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index abdb99c..0fa7806 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -184,7 +185,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>(); /** Topology version. */ - private AtomicLong topVer = new AtomicLong(-1); + private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE); /** Mutex. */ private final Lock lock = new ReentrantLock(); @@ -361,7 +362,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter * * @return Flag indicating whether near cache should be updated. */ - protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { + protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) { return false; } @@ -488,18 +489,18 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public long topologyVersion() { - long res = topVer.get(); + @Override public AffinityTopologyVersion topologyVersion() { + AffinityTopologyVersion res = topVer.get(); - if (res == -1) + if (res.equals(AffinityTopologyVersion.NONE)) return cctx.exchange().topologyVersion(); return res; } /** {@inheritDoc} */ - @Override public long topologyVersion(long topVer) { - this.topVer.compareAndSet(-1, topVer); + @Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) { + this.topVer.compareAndSet(AffinityTopologyVersion.NONE, topVer); return this.topVer.get(); } @@ -1707,7 +1708,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } @@ -1717,7 +1718,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public long topologyVersion(long topVer) { + @Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); }