http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java index 7bb513e..a96bce5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; @@ -70,7 +71,7 @@ public class GridCacheMvccCandidate<K> implements Externalizable, /** Topology version. */ @SuppressWarnings( {"TransientFieldNotInitialized"}) @GridToStringInclude - private transient volatile long topVer = -1; + private transient volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** Linked reentry. */ private GridCacheMvccCandidate<K> reentry; @@ -185,14 +186,14 @@ public class GridCacheMvccCandidate<K> implements Externalizable, /** * @return Topology for which this lock was acquired. */ - public long topologyVersion() { + public AffinityTopologyVersion topologyVersion() { return topVer; } /** * @param topVer Topology version. */ - public void topologyVersion(long topVer) { + public void topologyVersion(AffinityTopologyVersion topVer) { this.topVer = topVer; }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index d125c02..c947592 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -558,9 +559,8 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, public Collection<GridCacheMvccCandidate<K>> remoteCandidates() { Collection<GridCacheMvccCandidate<K>> rmtCands = new LinkedList<>(); - for (GridDistributedCacheEntry<K, V> entry : locked()) { + for (GridDistributedCacheEntry<K, V> entry : locked()) rmtCands.addAll(entry.remoteMvccSnapshot()); - } return rmtCands; } @@ -919,8 +919,8 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, * @return Future that signals when all locks for given partitions are released. */ @SuppressWarnings({"unchecked"}) - public IgniteInternalFuture<?> finishLocks(long topVer) { - assert topVer > 0; + public IgniteInternalFuture<?> finishLocks(AffinityTopologyVersion topVer) { + assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; return finishLocks(null, topVer); } @@ -931,13 +931,13 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, * @param topVer Topology version to wait for. * @return Explicit locks release future. */ - public IgniteInternalFuture<?> finishExplicitLocks(long topVer) { + public IgniteInternalFuture<?> finishExplicitLocks(AffinityTopologyVersion topVer) { GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(cctx.kernalContext()); for (GridCacheExplicitLockSpan<K> span : pendingExplicit.values()) { GridDiscoveryTopologySnapshot snapshot = span.topologySnapshot(); - if (snapshot != null && snapshot.topologyVersion() < topVer) + if (snapshot != null && snapshot.topologyVersion() < topVer.topologyVersion()) res.add(span.releaseFuture()); } @@ -951,13 +951,13 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, * * @return Finish update future. */ - public IgniteInternalFuture<?> finishAtomicUpdates(long topVer) { + public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) { GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(cctx.kernalContext()); res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class); for (GridCacheAtomicFuture<K, ?> fut : atomicFuts.values()) { - if (fut.waitForPartitionExchange() && fut.topologyVersion() < topVer) + if (fut.waitForPartitionExchange() && fut.topologyVersion().compareTo(topVer) < 0) res.add((IgniteInternalFuture<Object>)fut); } @@ -972,7 +972,7 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, * @return Future that signals when all locks for given keys are released. */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<?> finishKeys(Collection<K> keys, long topVer) { + public IgniteInternalFuture<?> finishKeys(Collection<K> keys, AffinityTopologyVersion topVer) { if (!(keys instanceof Set)) keys = new HashSet<>(keys); @@ -990,10 +990,10 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, * @param topVer Topology version. * @return Future that signals when all locks for given partitions will be released. */ - private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<K> keyFilter, long topVer) { - assert topVer != 0; + private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<K> keyFilter, AffinityTopologyVersion topVer) { + assert topVer.topologyVersion() != 0; - if (topVer < 0) + if (topVer.equals(AffinityTopologyVersion.NONE)) return new GridFinishedFuture(context().kernalContext()); final FinishLockFuture finishFut = new FinishLockFuture( @@ -1045,7 +1045,7 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, /** Topology version. Instance field for toString method only. */ @GridToStringInclude - private final long topVer; + private final AffinityTopologyVersion topVer; /** */ @GridToStringInclude @@ -1058,17 +1058,17 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, public FinishLockFuture() { assert false; - topVer = 0; + topVer = AffinityTopologyVersion.ZERO; } /** * @param topVer Topology version. * @param entries Entries. */ - FinishLockFuture(Iterable<GridDistributedCacheEntry<K, V>> entries, long topVer) { + FinishLockFuture(Iterable<GridDistributedCacheEntry<K, V>> entries, AffinityTopologyVersion topVer) { super(cctx.kernalContext(), true); - assert topVer > 0; + assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; this.topVer = topVer; @@ -1078,11 +1078,9 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, Collection<GridCacheMvccCandidate<K>> locs = entry.localCandidates(); if (!F.isEmpty(locs)) { - Collection<GridCacheMvccCandidate<K>> cands = - new ConcurrentLinkedQueue<>(); + Collection<GridCacheMvccCandidate<K>> cands = new ConcurrentLinkedQueue<>(); - if (locs != null) - cands.addAll(F.view(locs, versionFilter())); + cands.addAll(F.view(locs, versionFilter())); if (!F.isEmpty(cands)) pendingLocks.put(entry.txKey(), cands); @@ -1102,14 +1100,14 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, * @return Filter. */ private IgnitePredicate<GridCacheMvccCandidate<K>> versionFilter() { - assert topVer > 0; + assert topVer.topologyVersion() > 0; return new P1<GridCacheMvccCandidate<K>>() { @Override public boolean apply(GridCacheMvccCandidate<K> c) { assert c.nearLocal() || c.dhtLocal(); // Wait for explicit locks. - return c.topologyVersion() == 0 || c.topologyVersion() < topVer; + return c.topologyVersion().equals(AffinityTopologyVersion.ZERO) || c.topologyVersion().compareTo(topVer) < 0; } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d7b1914..246ff37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -23,6 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.processors.timeout.*; @@ -120,7 +121,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - GridDhtPartitionExchangeId exchId = exchangeId(n.id(), e.topologyVersion(), e.type()); + GridDhtPartitionExchangeId exchId = exchangeId(n.id(), new AffinityTopologyVersion(e.topologyVersion()), + e.type()); GridDhtPartitionsExchangeFuture<K, V> exchFut = exchangeFuture(exchId, e); @@ -198,7 +200,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert startTime > 0; - final long startTopVer = loc.order(); + final AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(loc.order()); GridDhtPartitionExchangeId exchId = exchangeId(loc.id(), startTopVer, EVT_NODE_JOINED); @@ -207,7 +209,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert discoEvt != null; - assert discoEvt.topologyVersion() == startTopVer; + assert discoEvt.topologyVersion() == startTopVer.topologyVersion(); GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt); @@ -320,7 +322,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * * @return Topology version. */ - public long topologyVersion() { + public AffinityTopologyVersion topologyVersion() { return lastInitializedFuture.exchangeId().topologyVersion(); } @@ -487,7 +489,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) throws IgniteCheckedException { - GridDhtPartitionsFullMessage<K, V> m = new GridDhtPartitionsFullMessage<>(null, null, -1); + GridDhtPartitionsFullMessage<K, V> m = new GridDhtPartitionsFullMessage<>(null, null, AffinityTopologyVersion.NONE); for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) @@ -543,7 +545,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param evt Event type. * @return Activity future ID. */ - private GridDhtPartitionExchangeId exchangeId(UUID nodeId, long topVer, int evt) { + private GridDhtPartitionExchangeId exchangeId(UUID nodeId, AffinityTopologyVersion topVer, int evt) { return new GridDhtPartitionExchangeId(nodeId, evt, topVer); } @@ -576,7 +578,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (exchFuts0 != null) { for (GridDhtPartitionsExchangeFuture<K, V> fut : exchFuts0.values()) { - if (fut.exchangeId().topologyVersion() < exchFut.exchangeId().topologyVersion() - 10) + if (fut.exchangeId().topologyVersion().topologyVersion() < exchFut.exchangeId().topologyVersion().topologyVersion() - 10) fut.cleanUp(); } } @@ -959,14 +961,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override public int compare( GridDhtPartitionsExchangeFuture<K, V> f1, GridDhtPartitionsExchangeFuture<K, V> f2) { - long t1 = f1.exchangeId().topologyVersion(); - long t2 = f2.exchangeId().topologyVersion(); + AffinityTopologyVersion t1 = f1.exchangeId().topologyVersion(); + AffinityTopologyVersion t2 = f2.exchangeId().topologyVersion(); - assert t1 > 0; - assert t2 > 0; + assert t1.topologyVersion() > 0; + assert t2.topologyVersion() > 0; // Reverse order. - return t1 < t2 ? 1 : t1 == t2 ? 0 : -1; + int cmp = t1.compareTo(t2); + + return cmp < 0 ? 1 : cmp == 0 ? 0 : -1; } }, /*not strict*/false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index e800137..c2b2b88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -115,7 +116,7 @@ public interface GridCachePreloader<K, V> { * @param topVer Topology version, {@code -1} if not required. * @return Future to complete when all keys are preloaded. */ - public IgniteInternalFuture<Object> request(Collection<? extends K> keys, long topVer); + public IgniteInternalFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer); /** * Force preload process. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 40e090b..fd334f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.lang.*; @@ -111,7 +112,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(Collection<? extends K> keys, long topVer) { + @Override public IgniteInternalFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer) { return new GridFinishedFuture<>(cctx.kernalContext()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 6b17038..de9ec0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.timeout.*; @@ -382,7 +383,7 @@ public class GridCacheSharedContext<K, V> { * @return {@code true} if waiting was successful. */ @SuppressWarnings({"unchecked"}) - public IgniteInternalFuture<?> partitionReleaseFuture(long topVer) { + public IgniteInternalFuture<?> partitionReleaseFuture(AffinityTopologyVersion topVer) { GridCompoundFuture f = new GridCompoundFuture(kernalCtx); f.add(mvcc().finishExplicitLocks(topVer)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 0a7b768..b92cc2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.managers.swapspace.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.offheap.*; @@ -173,7 +174,7 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return Number of swap entries. * @throws IgniteCheckedException If failed. */ - public int swapEntriesCount(boolean primary, boolean backup, long topVer) throws IgniteCheckedException { + public int swapEntriesCount(boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException { assert primary || backup; if (!swapEnabled) @@ -196,7 +197,7 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return Number of offheap entries. * @throws IgniteCheckedException If failed. */ - public int offheapEntriesCount(boolean primary, boolean backup, long topVer) throws IgniteCheckedException { + public int offheapEntriesCount(boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException { assert primary || backup; if (!offheapEnabled) @@ -1515,7 +1516,7 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return Swap entries iterator. * @throws IgniteCheckedException If failed. */ - public Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, long topVer) + public Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException { assert primary || backup; @@ -1545,7 +1546,7 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return Offheap entries iterator. * @throws IgniteCheckedException If failed. */ - public Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, long topVer) + public Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException { assert primary || backup; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 7cd13df..b0f3170 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -23,6 +23,7 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -596,6 +597,17 @@ public class GridCacheUtils { } /** + * Gets DHT affinity nodes. + * + * @param ctx Cache context. + * @param topOrder Maximum allowed node order. + * @return Affinity nodes. + */ + public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) { + return affinityNodes(ctx, topOrder.topologyVersion()); + } + + /** * Checks if given node has specified cache started and the local DHT storage is enabled. * * @param ctx Cache context. @@ -1089,7 +1101,7 @@ public class GridCacheUtils { if (ctx.config().getCacheMode() == LOCAL) return F.asMap(ctx.localNode(), (Collection<K>)keys); - long topVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); if (CU.affinityNodes(ctx, topVer).isEmpty()) return Collections.emptyMap(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 3b2d073..1e2bfe9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.affinity; import org.apache.ignite.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -83,7 +84,7 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> { @Override public int[] primaryPartitions(ClusterNode n) { A.notNull(n, "n"); - long topVer = cctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer); @@ -94,7 +95,7 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> { @Override public int[] backupPartitions(ClusterNode n) { A.notNull(n, "n"); - long topVer = cctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer); @@ -107,7 +108,7 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> { Collection<Integer> parts = new HashSet<>(); - long topVer = cctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) { for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) { @@ -170,9 +171,9 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> { @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) { A.notNull(keys, "keys"); - long topVer = topologyVersion(); + AffinityTopologyVersion topVer = topologyVersion(); - int nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size(); + int nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer.topologyVersion()).size(); // Must return empty map if no alive nodes present or keys is empty. Map<ClusterNode, Collection<K>> res = new HashMap<>(nodesCnt, 1.0f); @@ -215,7 +216,7 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> { * * @return Topology version. */ - private long topologyVersion() { + private AffinityTopologyVersion topologyVersion() { return cctx.affinity().affinityTopologyVersion(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 25abfb0..3daadcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.processors.task.*; @@ -357,7 +358,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - private void removeSetData(IgniteUuid setId, long topVer) throws IgniteCheckedException { + private void removeSetData(IgniteUuid setId, AffinityTopologyVersion topVer) throws IgniteCheckedException { boolean loc = cctx.isLocal(); GridCacheAffinityManager aff = cctx.affinity(); @@ -408,7 +409,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, if (!cctx.isLocal()) { while (true) { - long topVer = cctx.topologyVersionFuture().get(); + AffinityTopologyVersion topVer = cctx.topologyVersionFuture().get(); Collection<ClusterNode> nodes = CU.affinityNodes(cctx, topVer); @@ -446,14 +447,14 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, throw e; } - if (cctx.topologyVersionFuture().get() == topVer) + if (topVer.equals(cctx.topologyVersionFuture().get())) break; } } else { blockSet(id); - cctx.dataStructures().removeSetData(id, 0); + cctx.dataStructures().removeSetData(id, AffinityTopologyVersion.ZERO); } } @@ -641,7 +642,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, private IgniteUuid setId; /** */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Required by {@link Externalizable}. @@ -655,7 +656,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, * @param setId Set ID. * @param topVer Topology version. */ - private RemoveSetDataCallable(String cacheName, IgniteUuid setId, long topVer) { + private RemoveSetDataCallable(String cacheName, IgniteUuid setId, @NotNull AffinityTopologyVersion topVer) { this.cacheName = cacheName; this.setId = setId; this.topVer = topVer; @@ -687,14 +688,14 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, cacheName); U.writeGridUuid(out, setId); - out.writeLong(topVer); + out.writeObject(topVer); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { cacheName = U.readString(in); setId = U.readGridUuid(in); - topVer = in.readLong(); + topVer = (AffinityTopologyVersion)in.readObject(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java index b2f6abc..1e8ba38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@ -19,11 +19,13 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.nio.*; import java.util.*; @@ -65,7 +67,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { private long ttl; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Required empty constructor. @@ -78,7 +80,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { * @param topVer Topology version. * @param ttl TTL. */ - public GridCacheTtlUpdateRequest(long topVer, long ttl) { + public GridCacheTtlUpdateRequest(@NotNull AffinityTopologyVersion topVer, long ttl) { assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl; this.topVer = topVer; @@ -88,7 +90,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { /** * @return Topology version. */ - public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -215,7 +217,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { writer.incrementState(); case 6: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -273,7 +275,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { reader.incrementState(); case 6: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index dc82e83..09b7143 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -137,7 +138,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public void removeAll() throws IgniteCheckedException { try { - long topVer; + AffinityTopologyVersion topVer; do { topVer = ctx.affinity().affinityTopologyVersion(); @@ -150,7 +151,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get(); } } - while (ctx.affinity().affinityTopologyVersion() > topVer); + while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -162,7 +163,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter @Override public IgniteInternalFuture<?> removeAllAsync() { GridFutureAdapter<Void> opFut = new GridFutureAdapter<>(ctx.kernalContext()); - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); removeAllAsync(opFut, topVer); @@ -173,7 +174,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param opFut Future. * @param topVer Topology version. */ - private void removeAllAsync(final GridFutureAdapter<Void> opFut, final long topVer) { + private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) { Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { @@ -185,9 +186,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { fut.get(); - long topVer0 = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion(); - if (topVer0 == topVer) + if (topVer0.equals(topVer)) opFut.onDone(); else removeAllAsync(opFut, topVer0); @@ -231,7 +232,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter private String cacheName; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Injected grid instance. */ @IgniteInstanceResource @@ -248,7 +249,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param cacheName Cache name. * @param topVer Topology version. */ - private GlobalRemoveAllCallable(String cacheName, long topVer) { + private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer) { this.cacheName = cacheName; this.topVer = topVer; } @@ -266,7 +267,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter ctx.gate().enter(); try { - if (ctx.affinity().affinityTopologyVersion() != topVer) + if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) return null; // Ignore this remove request because remove request will be sent again. GridDhtCacheAdapter<K, V> dht; @@ -309,13 +310,13 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, cacheName); - out.writeLong(topVer); + out.writeObject(topVer); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { cacheName = U.readString(in); - topVer = in.readLong(); + topVer = (AffinityTopologyVersion)in.readObject(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index dbf82dd..d72c9b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -467,7 +468,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> // ensure proper lock ordering for removed entries. cctx.tm().addCommittedTx(this); - long topVer = topologyVersion(); + AffinityTopologyVersion topVer = topologyVersion(); // Node that for near transactions we grab all entries. for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 239efc3..bfda92d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.*; @@ -62,7 +63,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo private GridDhtPartitionExchangeId lastExchangeId; /** */ - private long topVer = -1; + private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** A future that will be completed when topology with version topVer will be ready to use. */ private GridDhtTopologyFuture topReadyFut; @@ -131,7 +132,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo lock.writeLock().lock(); try { - assert exchId.topologyVersion() > topVer : "Invalid topology version [topVer=" + topVer + + assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; topVer = exchId.topologyVersion(); @@ -144,11 +145,11 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo } /** {@inheritDoc} */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { lock.readLock().lock(); try { - assert topVer > 0; + assert topVer.topologyVersion() > 0; return topVer; } @@ -178,14 +179,14 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo lock.writeLock().lock(); try { - assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; if (!exchId.isJoined()) removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion()); if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -230,12 +231,12 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException { - long topVer = exchId.topologyVersion(); + AffinityTopologyVersion topVer = exchId.topologyVersion(); lock.writeLock().lock(); try { - assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; if (log.isDebugEnabled()) @@ -254,7 +255,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo } /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) + @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create) throws GridDhtInvalidPartitionException { if (!create) return null; @@ -265,7 +266,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo /** {@inheritDoc} */ @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) { - return localPartition(1, -1, create); + return localPartition(1, AffinityTopologyVersion.NONE, create); } /** {@inheritDoc} */ @@ -279,7 +280,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e) { + @Override public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e) { assert false : "Entry should not be added to client topology: " + e; return null; @@ -304,7 +305,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo } /** {@inheritDoc} */ - @Override public Collection<ClusterNode> nodes(int p, long topVer) { + @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { lock.readLock().lock(); try { @@ -319,7 +320,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo for (UUID nodeId : nodeIds) { ClusterNode n = cctx.discovery().node(nodeId); - if (n != null && (topVer < 0 || n.order() <= topVer)) { + if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) nodes = new ArrayList<>(); @@ -342,8 +343,8 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo * @param states Additional partition states. * @return List of nodes for the partition. */ - private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer.topologyVersion())) : null; lock.readLock().lock(); @@ -362,13 +363,13 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo List<ClusterNode> nodes = new ArrayList<>(size); for (UUID id : nodeIds) { - if (topVer > 0 && !allIds.contains(id)) + if (topVer.topologyVersion() > 0 && !allIds.contains(id)) continue; if (hasState(p, id, state, states)) { ClusterNode n = cctx.discovery().node(id); - if (n != null && (topVer < 0 || n.order() <= topVer)) + if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) nodes.add(n); } } @@ -381,18 +382,18 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo } /** {@inheritDoc} */ - @Override public List<ClusterNode> owners(int p, long topVer) { + @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { return nodes(p, topVer, OWNING); } /** {@inheritDoc} */ @Override public List<ClusterNode> owners(int p) { - return owners(p, -1); + return owners(p, AffinityTopologyVersion.NONE); } /** {@inheritDoc} */ @Override public List<ClusterNode> moving(int p) { - return nodes(p, -1, MOVING); + return nodes(p, AffinityTopologyVersion.NONE, MOVING); } /** @@ -400,7 +401,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo * @param topVer Topology version. * @return List of nodes in state OWNING or MOVING. */ - private List<ClusterNode> ownersAndMoving(int p, long topVer) { + private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) { return nodes(p, topVer, OWNING, MOVING); } @@ -623,7 +624,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion()); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -673,7 +674,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion()); ClusterNode loc = cctx.localNode(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java index 547d414..52fb062 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.nio.*; @@ -31,7 +33,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K, private static final long serialVersionUID = 0L; /** Topology version being queried. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Empty constructor. @@ -44,7 +46,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K, * @param cacheId Cache ID. * @param topVer Topology version. */ - public GridDhtAffinityAssignmentRequest(int cacheId, long topVer) { + public GridDhtAffinityAssignmentRequest(int cacheId, @NotNull AffinityTopologyVersion topVer) { this.cacheId = cacheId; this.topVer = topVer; } @@ -57,7 +59,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K, /** * @return Requested topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -87,7 +89,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K, switch (writer.state()) { case 3: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -109,7 +111,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K, switch (reader.state()) { case 3: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index 9b86e55..2e32632 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.nio.*; import java.util.*; @@ -36,7 +38,7 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K, private static final long serialVersionUID = 0L; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Affinity assignment. */ @GridDirectTransient @@ -58,7 +60,8 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K, * @param topVer Topology version. * @param affAssignment Affinity assignment. */ - public GridDhtAffinityAssignmentResponse(int cacheId, long topVer, List<List<ClusterNode>> affAssignment) { + public GridDhtAffinityAssignmentResponse(int cacheId, @NotNull AffinityTopologyVersion topVer, + List<List<ClusterNode>> affAssignment) { this.cacheId = cacheId; this.topVer = topVer; this.affAssignment = affAssignment; @@ -72,7 +75,7 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K, /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -133,7 +136,7 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K, writer.incrementState(); case 4: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -163,7 +166,7 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K, reader.incrementState(); case 4: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index 101d657..d74a62e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.future.*; @@ -48,7 +49,7 @@ public class GridDhtAssignmentFetchFuture<K, V> extends GridFutureAdapter<List<L private Queue<ClusterNode> availableNodes; /** Topology version. */ - private final long topVer; + private final AffinityTopologyVersion topVer; /** Pending node from which response is being awaited. */ private ClusterNode pendingNode; @@ -57,7 +58,7 @@ public class GridDhtAssignmentFetchFuture<K, V> extends GridFutureAdapter<List<L * @param ctx Cache context. * @param availableNodes Available nodes. */ - public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, long topVer, Collection<ClusterNode> availableNodes) { + public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, Collection<ClusterNode> availableNodes) { super(ctx.kernalContext()); this.ctx = ctx; @@ -85,7 +86,7 @@ public class GridDhtAssignmentFetchFuture<K, V> extends GridFutureAdapter<List<L * @param res Reponse. */ public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse<K, V> res) { - if (res.topologyVersion() != topVer) { + if (!res.topologyVersion().equals(topVer)) { if (log.isDebugEnabled()) log.debug("Received affinity assignment for wrong topolgy version (will ignore) " + "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 2ef157c..338cd3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; @@ -96,8 +97,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override protected void init() { map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, + K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { return new GridDhtCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); } }); @@ -189,7 +190,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return Topology version. * @throws IgniteCheckedException If failed. */ - public long beginMultiUpdate() throws IgniteCheckedException { + public AffinityTopologyVersion beginMultiUpdate() throws IgniteCheckedException { IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get(); if (tup != null) @@ -199,7 +200,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridDhtTopologyFuture topFut; - long topVer; + AffinityTopologyVersion topVer; try { // While we are holding read lock, register lock future for partition release future. @@ -260,11 +261,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param topVer Topology version. * @return Finish future. */ - @Nullable public IgniteInternalFuture<?> multiUpdateFinishFuture(long topVer) { + @Nullable public IgniteInternalFuture<?> multiUpdateFinishFuture(AffinityTopologyVersion topVer) { GridCompoundFuture<IgniteUuid, Object> fut = null; for (MultiUpdateFuture multiFut : multiTxFuts.values()) { - if (multiFut.topologyVersion() <= topVer) { + if (multiFut.topologyVersion().compareTo(topVer) <= 0) { if (fut == null) fut = new GridCompoundFuture<>(ctx.kernalContext()); @@ -309,7 +310,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) throws GridDhtInvalidPartitionException { + @Override public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { return super.entryEx(key, topVer); } @@ -328,7 +329,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return DHT entry. * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - public GridDhtCacheEntry<K, V> entryExx(K key, long topVer) throws GridDhtInvalidPartitionException { + public GridDhtCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { return (GridDhtCacheEntry<K, V>)entryEx(key, topVer); } @@ -344,7 +345,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @throws GridDhtInvalidPartitionException if entry does not belong to this node and * {@code allowDetached} is {@code false}. */ - public GridCacheEntryEx<K, V> entryExx(K key, long topVer, boolean allowDetached, boolean touch) { + public GridCacheEntryEx<K, V> entryExx(K key, AffinityTopologyVersion topVer, boolean allowDetached, boolean touch) { try { return allowDetached && !ctx.affinity().localNode(key, topVer) ? new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : @@ -372,7 +373,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final boolean replicate = ctx.isDrEnabled(); - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); @@ -396,7 +397,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final boolean replicate = ctx.isDrEnabled(); - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); @@ -426,14 +427,15 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap V val, GridCacheVersion ver, @Nullable IgniteBiPredicate<K, V> p, - long topVer, + AffinityTopologyVersion topVer, boolean replicate, @Nullable ExpiryPolicy plc) { if (p != null && !p.apply(key, val)) return; try { - GridDhtLocalPartition<K, V> part = top.localPartition(ctx.affinity().partition(key), -1, true); + GridDhtLocalPartition<K, V> part = top.localPartition(ctx.affinity().partition(key), + AffinityTopologyVersion.NONE, true); // Reserve to make sure that partition does not get unloaded. if (part.reserve()) { @@ -483,7 +485,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public int primarySize() { int sum = 0; - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); for (GridDhtLocalPartition<K, V> p : topology().currentLocalPartitions()) { if (p.primary(topVer)) @@ -495,7 +497,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * This method is used internally. Use - * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, long, UUID, int, boolean, IgniteCacheExpiryPolicy, boolean)} + * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, AffinityTopologyVersion, UUID, int, boolean, IgniteCacheExpiryPolicy, boolean)} * method instead to retrieve DHT value. * * @param keys {@inheritDoc} @@ -583,7 +585,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap LinkedHashMap<? extends K, Boolean> keys, boolean readThrough, boolean reload, - long topVer, + AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, boolean deserializePortable, @@ -653,7 +655,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap res.error(e); } - res.invalidPartitions(fut.invalidPartitions(), ctx.discovery().topologyVersion()); + res.invalidPartitions(fut.invalidPartitions(), + new AffinityTopologyVersion(ctx.discovery().topologyVersion())); try { ctx.io().send(nodeId, res, ctx.ioPolicy()); @@ -682,7 +685,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = new HashMap<>(); - long topVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); for (Map.Entry<Object, IgniteBiTuple<byte[], GridCacheVersion>> e : entries.entrySet()) { List<ClusterNode> nodes = ctx.affinity().nodes((K)e.getKey(), topVer); @@ -793,7 +796,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } finally { if (entry != null) - cache.context().evicts().touch(entry, -1L); + cache.context().evicts().touch(entry, AffinityTopologyVersion.NONE); } } catch (IgniteCheckedException e) { @@ -835,7 +838,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @NotNull @Override public Iterator<Cache.Entry<K, V>> iterator() { final GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId, - ctx.discovery().topologyVersion(), false); + new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false); Iterator<GridDhtCacheEntry<K, V>> partIt = part == null ? null : part.entries().iterator(); @@ -888,7 +891,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public int size() { GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId, - ctx.discovery().topologyVersion(), false); + new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false); return part != null ? part.publicSize() : 0; } @@ -911,7 +914,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion ver) { assert entry.isDht(); - GridDhtLocalPartition<K, V> part = topology().localPartition(entry.partition(), -1, false); + GridDhtLocalPartition<K, V> part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE, + false); // Do not remove entry on replica topology. Instead, add entry to removal queue. // It will be cleared eventually. @@ -936,7 +940,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (primary && backup) return iterator(map.entries0().iterator(), !ctx.keepPortable()); else { - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); final Iterator<GridDhtLocalPartition<K, V>> partIt = topology().currentLocalPartitions().iterator(); @@ -1081,7 +1085,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap private static final long serialVersionUID = 0L; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Empty constructor required by {@link Externalizable}. @@ -1094,7 +1098,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param ctx Kernal context. * @param topVer Topology version. */ - private MultiUpdateFuture(GridKernalContext ctx, long topVer) { + private MultiUpdateFuture(GridKernalContext ctx, @NotNull AffinityTopologyVersion topVer) { super(ctx); this.topVer = topVer; @@ -1103,7 +1107,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * @return Topology version. */ - private long topologyVersion() { + private AffinityTopologyVersion topologyVersion() { return topVer; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index cf4f5df..3ceb180 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -65,7 +66,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @param ttl Time to live. * @param hdrId Header id. */ - public GridDhtCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val, + public GridDhtCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { super(ctx, key, hash, val, next, ttl, hdrId); @@ -153,7 +154,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { @Nullable public GridCacheMvccCandidate<K> addDhtLocal( UUID nearNodeId, GridCacheVersion nearVer, - long topVer, + AffinityTopologyVersion topVer, long threadId, GridCacheVersion ver, long timeout, @@ -299,9 +300,9 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @throws GridCacheEntryRemovedException If entry has been removed. */ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"}) - @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue(long topVer) + @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { - if (isNew() || !valid(-1) || deletedUnlocked()) + if (isNew() || !valid(AffinityTopologyVersion.NONE) || deletedUnlocked()) return null; else { V val0 = null; @@ -353,7 +354,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @throws GridCacheEntryRemovedException If entry was removed. */ @SuppressWarnings("unchecked") - @Nullable public IgniteInternalFuture<Boolean> addReader(UUID nodeId, long msgId, long topVer) + @Nullable public IgniteInternalFuture<Boolean> addReader(UUID nodeId, long msgId, AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { // Don't add local node as reader. if (cctx.nodeId().equals(nodeId)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 0be5b97..90dde96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -73,7 +74,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col private GridCacheVersion ver; /** Topology version .*/ - private long topVer; + private AffinityTopologyVersion topVer; /** Transaction. */ private IgniteTxLocalEx<K, V> tx; @@ -129,7 +130,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col boolean readThrough, boolean reload, @Nullable IgniteTxLocalEx<K, V> tx, - long topVer, + @NotNull AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, boolean deserializePortable, @@ -261,7 +262,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @return {@code True} if mapped. */ private boolean map(K key, Collection<GridDhtLocalPartition> parts) { - GridDhtLocalPartition part = topVer > 0 ? + GridDhtLocalPartition part = topVer.topologyVersion() > 0 ? cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) : cache().topology().localPartition(key, false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index af63307..7457de5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -517,7 +518,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * @param topVer Topology version. * @return {@code True} if local node is primary for this partition. */ - public boolean primary(long topVer) { + public boolean primary(AffinityTopologyVersion topVer) { return cctx.affinity().primary(cctx.localNode(), id, topVer); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index dba5ed2..10146f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -66,7 +67,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo private GridCacheVersion nearLockVer; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Thread. */ private long threadId; @@ -154,7 +155,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo GridCacheContext<K, V> cctx, UUID nearNodeId, GridCacheVersion nearLockVer, - long topVer, + @NotNull AffinityTopologyVersion topVer, int cnt, boolean read, long timeout, @@ -166,7 +167,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo assert nearNodeId != null; assert nearLockVer != null; - assert topVer > 0; + assert topVer.topologyVersion() > 0; this.cctx = cctx; this.nearNodeId = nearNodeId; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index b4337f8..c7f5d85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -68,7 +69,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { private byte[] ownedBytes; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Subject ID. */ private UUID subjId; @@ -120,7 +121,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { IgniteUuid futId, IgniteUuid miniId, GridCacheVersion lockVer, - long topVer, + @NotNull AffinityTopologyVersion topVer, boolean isInTx, boolean isRead, TransactionIsolation isolation, @@ -194,7 +195,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -400,7 +401,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { writer.incrementState(); case 30: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -486,7 +487,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { reader.incrementState(); case 30: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d9a20ae..4f28334 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.tostring.*; import org.jetbrains.annotations.*; @@ -53,7 +54,7 @@ public interface GridDhtPartitionTopology<K, V> { * * @return Topology version. */ - public long topologyVersion(); + public AffinityTopologyVersion topologyVersion(); /** * Gets a future that will be completed when partition exchange map for this @@ -88,7 +89,7 @@ public interface GridDhtPartitionTopology<K, V> { * @throws GridDhtInvalidPartitionException If partition is evicted or absent and * does not belong to this node. */ - @Nullable public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) + @Nullable public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create) throws GridDhtInvalidPartitionException; /** @@ -127,7 +128,7 @@ public interface GridDhtPartitionTopology<K, V> { * @param topVer Topology version. * @return Collection of all nodes responsible for this partition with primary node being first. */ - public Collection<ClusterNode> nodes(int p, long topVer); + public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer); /** * @param p Partition ID. @@ -140,7 +141,7 @@ public interface GridDhtPartitionTopology<K, V> { * @param topVer Topology version. * @return Collection of all nodes who {@code own} this partition. */ - public List<ClusterNode> owners(int p, long topVer); + public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer); /** * @param p Partition ID. @@ -159,7 +160,7 @@ public interface GridDhtPartitionTopology<K, V> { * @param e Entry added to cache. * @return Local partition. */ - public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e); + public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e); /** * @param e Entry removed from cache.