IGNITE-313 Need to change affinity topology version from long to custom object
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4362085a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4362085a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4362085a Branch: refs/heads/ignite-45 Commit: 4362085aac52105ad8106019f9b59660500675e4 Parents: 9a69903 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Wed Feb 25 19:45:29 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Wed Feb 25 19:45:29 2015 +0300 ---------------------------------------------------------------------- .../impl/ClientPartitionAffinitySelfTest.java | 2 +- .../affinity/CacheAffinityFunctionContext.java | 3 +- .../affinity/AffinityTopologyVersion.java | 113 +++++++++++++++++++ .../affinity/GridAffinityAssignment.java | 12 +- .../affinity/GridAffinityAssignmentCache.java | 70 ++++++------ .../affinity/GridAffinityProcessor.java | 27 ++--- .../processors/affinity/GridAffinityUtils.java | 10 +- .../GridCacheAffinityFunctionContextImpl.java | 6 +- .../processors/cache/GridCacheAdapter.java | 32 +++--- .../cache/GridCacheAffinityManager.java | 68 ++++++----- .../processors/cache/GridCacheAtomicFuture.java | 4 +- .../cache/GridCacheConcurrentMap.java | 18 ++- .../processors/cache/GridCacheContext.java | 11 +- .../processors/cache/GridCacheEntryEx.java | 12 +- .../cache/GridCacheEvictionManager.java | 37 +++--- .../cache/GridCacheEvictionRequest.java | 14 ++- .../processors/cache/GridCacheIoManager.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 19 ++-- .../cache/GridCacheMapEntryFactory.java | 5 +- .../processors/cache/GridCacheMessage.java | 5 +- .../cache/GridCacheMvccCandidate.java | 7 +- .../processors/cache/GridCacheMvccManager.java | 42 ++++--- .../GridCachePartitionExchangeManager.java | 28 +++-- .../processors/cache/GridCachePreloader.java | 3 +- .../cache/GridCachePreloaderAdapter.java | 3 +- .../cache/GridCacheSharedContext.java | 3 +- .../processors/cache/GridCacheSwapManager.java | 9 +- .../processors/cache/GridCacheUtils.java | 14 ++- .../cache/affinity/GridCacheAffinityImpl.java | 13 ++- .../CacheDataStructuresManager.java | 17 +-- .../distributed/GridCacheTtlUpdateRequest.java | 12 +- .../GridDistributedCacheAdapter.java | 23 ++-- .../GridDistributedTxRemoteAdapter.java | 3 +- .../dht/GridClientPartitionTopology.java | 47 ++++---- .../dht/GridDhtAffinityAssignmentRequest.java | 12 +- .../dht/GridDhtAffinityAssignmentResponse.java | 13 ++- .../dht/GridDhtAssignmentFetchFuture.java | 7 +- .../distributed/dht/GridDhtCacheAdapter.java | 56 ++++----- .../distributed/dht/GridDhtCacheEntry.java | 11 +- .../cache/distributed/dht/GridDhtGetFuture.java | 7 +- .../distributed/dht/GridDhtLocalPartition.java | 3 +- .../distributed/dht/GridDhtLockFuture.java | 7 +- .../distributed/dht/GridDhtLockRequest.java | 11 +- .../dht/GridDhtPartitionTopology.java | 11 +- .../dht/GridDhtPartitionTopologyImpl.java | 51 +++++---- .../distributed/dht/GridDhtTopologyFuture.java | 3 +- .../dht/GridDhtTransactionalCacheAdapter.java | 7 +- .../distributed/dht/GridDhtTxFinishRequest.java | 11 +- .../cache/distributed/dht/GridDhtTxLocal.java | 5 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 5 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 3 +- .../dht/GridDhtTxPrepareRequest.java | 11 +- .../cache/distributed/dht/GridDhtTxRemote.java | 7 +- .../distributed/dht/GridNoStorageCacheMap.java | 5 +- .../dht/GridPartitionedGetFuture.java | 33 +++--- .../dht/atomic/GridDhtAtomicCache.java | 23 ++-- .../dht/atomic/GridDhtAtomicCacheEntry.java | 3 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 7 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 11 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 19 ++-- .../dht/atomic/GridNearAtomicUpdateRequest.java | 11 +- .../dht/colocated/GridDhtColocatedCache.java | 25 ++-- .../colocated/GridDhtColocatedCacheEntry.java | 3 +- .../colocated/GridDhtColocatedLockFuture.java | 23 ++-- .../dht/preloader/GridDhtForceKeysFuture.java | 11 +- .../dht/preloader/GridDhtForceKeysRequest.java | 12 +- .../GridDhtPartitionDemandMessage.java | 12 +- .../preloader/GridDhtPartitionDemandPool.java | 21 ++-- .../preloader/GridDhtPartitionExchangeId.java | 26 +++-- .../GridDhtPartitionsExchangeFuture.java | 19 ++-- .../preloader/GridDhtPartitionsFullMessage.java | 15 +-- .../dht/preloader/GridDhtPreloader.java | 17 +-- .../preloader/GridDhtPreloaderAssignments.java | 9 +- .../distributed/near/GridNearAtomicCache.java | 3 +- .../distributed/near/GridNearCacheAdapter.java | 9 +- .../distributed/near/GridNearCacheEntry.java | 11 +- .../distributed/near/GridNearGetFuture.java | 41 +++---- .../distributed/near/GridNearGetRequest.java | 12 +- .../distributed/near/GridNearGetResponse.java | 12 +- .../distributed/near/GridNearLockFuture.java | 17 +-- .../distributed/near/GridNearLockRequest.java | 13 ++- .../near/GridNearTransactionalCache.java | 17 +-- .../near/GridNearTxFinishFuture.java | 3 +- .../near/GridNearTxFinishRequest.java | 11 +- .../cache/distributed/near/GridNearTxLocal.java | 5 +- .../near/GridNearTxPrepareFuture.java | 15 +-- .../near/GridNearTxPrepareRequest.java | 11 +- .../processors/cache/dr/GridCacheDrManager.java | 3 +- .../cache/dr/os/GridOsCacheDrManager.java | 3 +- .../processors/cache/local/GridLocalCache.java | 7 +- .../local/atomic/GridLocalAtomicCache.java | 5 +- .../cache/query/GridCacheQueryManager.java | 5 +- .../continuous/CacheContinuousQueryManager.java | 7 +- .../cache/transactions/IgniteInternalTx.java | 5 +- .../cache/transactions/IgniteTxAdapter.java | 19 ++-- .../transactions/IgniteTxLocalAdapter.java | 7 +- .../cache/transactions/IgniteTxManager.java | 10 +- .../cache/version/GridCacheVersionManager.java | 9 +- .../dataload/IgniteDataLoaderImpl.java | 2 +- .../datastructures/GridCacheSetImpl.java | 3 +- .../service/GridServiceProcessor.java | 3 +- .../ignite/internal/visor/cache/VisorCache.java | 3 +- .../GridCachePartitionFairAffinitySelfTest.java | 9 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 7 +- .../cache/GridCacheAffinityApiSelfTest.java | 24 ++-- .../GridCacheFinishPartitionsSelfTest.java | 16 ++- ...GridCacheMixedPartitionExchangeSelfTest.java | 11 +- .../cache/GridCacheMultiUpdateLockSelfTest.java | 5 +- .../processors/cache/GridCacheTestEntryEx.java | 22 ++-- ...actQueueFailoverDataConsistencySelfTest.java | 4 +- ...dCachePartitionedQueueEntryMoveSelfTest.java | 3 +- .../GridCacheDhtPreloadDisabledSelfTest.java | 3 +- .../distributed/dht/GridCacheDhtTestUtils.java | 5 +- .../near/GridCacheNearMultiNodeSelfTest.java | 3 +- .../near/GridCacheNearReadersSelfTest.java | 5 +- .../GridCachePartitionedTxSalvageSelfTest.java | 3 +- .../ignite/testframework/GridTestUtils.java | 3 +- .../junits/common/GridCommonAbstractTest.java | 3 +- .../processors/query/h2/IgniteH2Indexing.java | 4 +- .../query/h2/twostep/GridMapQueryExecutor.java | 4 +- 120 files changed, 968 insertions(+), 677 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java index 3a45615..fabb4f4 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java @@ -345,7 +345,7 @@ public class ClientPartitionAffinitySelfTest extends GridCommonAbstractTest { int part = srvAff.partition(key); CacheAffinityFunctionContext ctx = new GridCacheAffinityFunctionContextImpl(new ArrayList<>(srvNodes), - null, null, 1, 0); + null, null, new AffinityTopologyVersion(1), 0); ClusterNode srvNode = F.first(srvAff.assignPartitions(ctx).get(part)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java index ea5a0ec..fd1be95 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java @@ -19,6 +19,7 @@ package org.apache.ignite.cache.affinity; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.processors.affinity.*; import org.jetbrains.annotations.*; import java.util.*; @@ -59,7 +60,7 @@ public interface CacheAffinityFunctionContext { * * @return Current topology version number. */ - public long currentTopologyVersion(); + public AffinityTopologyVersion currentTopologyVersion(); /** * Gets discovery event caused topology change. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java new file mode 100644 index 0000000..fc5f193 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.plugin.extensions.communication.*; + +import java.io.*; + +/** + * + */ +public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersion>, Externalizable { + /** */ + public static final AffinityTopologyVersion NONE = new AffinityTopologyVersion(-1); + + /** */ + public static final AffinityTopologyVersion ZERO = new AffinityTopologyVersion(0); + + /** */ + private long topVer; + + /** + * @param ver Version. + */ + public AffinityTopologyVersion(long ver) { + topVer = ver; + } + + /** + * @return Topology version. + */ + public long topologyVersion() { + return topVer; + } + + /** + * @param topVer New topology version. + */ + public void topologyVersion(long topVer) { + this.topVer = topVer; + } + + /** + * + */ + public AffinityTopologyVersion previous() { + return new AffinityTopologyVersion(topVer - 1); + } + + /** {@inheritDoc} */ + @Override public int compareTo(AffinityTopologyVersion o) { + return Long.compare(topVer, o.topVer); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (!(o instanceof AffinityTopologyVersion)) + return false; + + return topVer == ((AffinityTopologyVersion)o).topVer; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)topVer; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(topVer); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + topVer = in.readLong(); + } + + /** + * @param msgWriter Message writer. + */ + public boolean writeTo(MessageWriter msgWriter) { + return msgWriter.writeLong("topVer.idx", topVer); + } + + /** + * @param msgReader Message reader. + */ + public static AffinityTopologyVersion readFrom(MessageReader msgReader) { + long topVer = msgReader.readLong("topVer.idx"); + + return new AffinityTopologyVersion(topVer); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(topVer); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 673db6d..e9df8b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -31,7 +31,7 @@ class GridAffinityAssignment implements Serializable { private static final long serialVersionUID = 0L; /** Topology version. */ - private final long topVer; + private final AffinityTopologyVersion topVer; /** Collection of calculated affinity nodes. */ private List<List<ClusterNode>> assignment; @@ -47,7 +47,7 @@ class GridAffinityAssignment implements Serializable { * * @param topVer Topology version. */ - GridAffinityAssignment(long topVer) { + GridAffinityAssignment(AffinityTopologyVersion topVer) { this.topVer = topVer; primary = new HashMap<>(); backup = new HashMap<>(); @@ -57,7 +57,7 @@ class GridAffinityAssignment implements Serializable { * @param topVer Topology version. * @param assignment Assignment. */ - GridAffinityAssignment(long topVer, List<List<ClusterNode>> assignment) { + GridAffinityAssignment(AffinityTopologyVersion topVer, List<List<ClusterNode>> assignment) { this.topVer = topVer; this.assignment = assignment; @@ -77,7 +77,7 @@ class GridAffinityAssignment implements Serializable { /** * @return Topology version. */ - public long topologyVersion() { + public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -152,7 +152,7 @@ class GridAffinityAssignment implements Serializable { /** {@inheritDoc} */ @Override public int hashCode() { - return (int)(topVer ^ (topVer >>> 32)); + return topVer.hashCode(); } /** {@inheritDoc} */ @@ -164,7 +164,7 @@ class GridAffinityAssignment implements Serializable { if (o == null || getClass() != o.getClass()) return false; - return topVer == ((GridAffinityAssignment)o).topVer; + return topVer.equals(((GridAffinityAssignment)o).topVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 9c12a17..ee6ee2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -60,7 +60,7 @@ public class GridAffinityAssignmentCache { private final CacheAffinityKeyMapper affMapper; /** Affinity calculation results cache: topology version => partition => nodes. */ - private final ConcurrentMap<Long, GridAffinityAssignment> affCache; + private final ConcurrentMap<AffinityTopologyVersion, GridAffinityAssignment> affCache; /** Cache item corresponding to the head topology version. */ private final AtomicReference<GridAffinityAssignment> head; @@ -69,7 +69,7 @@ public class GridAffinityAssignmentCache { private final GridCacheContext ctx; /** Ready futures. */ - private final ConcurrentMap<Long, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>(); + private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>(); /** Log. */ private IgniteLogger log; @@ -103,7 +103,7 @@ public class GridAffinityAssignmentCache { partsCnt = aff.partitions(); affCache = new ConcurrentLinkedHashMap<>(); - head = new AtomicReference<>(new GridAffinityAssignment(-1)); + head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); } /** @@ -113,14 +113,14 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @param affAssignment Affinity assignment for topology version. */ - public void initialize(long topVer, List<List<ClusterNode>> affAssignment) { + public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment); affCache.put(topVer, assignment); head.set(assignment); - for (Map.Entry<Long, AffinityReadyFuture> entry : readyFuts.entrySet()) { - if (entry.getKey() >= topVer) + for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { + if (entry.getKey().compareTo(topVer) >= 0) entry.getValue().onDone(topVer); } } @@ -146,12 +146,12 @@ public class GridAffinityAssignmentCache { * @return Affinity assignments. */ @SuppressWarnings("IfMayBeConditional") - public List<List<ClusterNode>> calculate(long topVer, DiscoveryEvent discoEvt) { + public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { if (log.isDebugEnabled()) log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvt=" + discoEvt + ']'); - GridAffinityAssignment prev = affCache.get(topVer - 1); + GridAffinityAssignment prev = affCache.get(topVer.previous()); List<ClusterNode> sorted; @@ -160,7 +160,7 @@ public class GridAffinityAssignmentCache { sorted = Collections.singletonList(ctx.localNode()); else { // Resolve nodes snapshot for specified topology version. - Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer); + Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer.topologyVersion()); sorted = sort(nodes); } @@ -193,15 +193,15 @@ public class GridAffinityAssignmentCache { while (true) { GridAffinityAssignment headItem = head.get(); - if (headItem.topologyVersion() >= topVer) + if (headItem.topologyVersion().compareTo(topVer) >= 0) break; if (head.compareAndSet(headItem, updated)) break; } - for (Map.Entry<Long, AffinityReadyFuture> entry : readyFuts.entrySet()) { - if (entry.getKey() <= topVer) { + for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { + if (entry.getKey().compareTo(topVer) <= 0) { if (log.isDebugEnabled()) log.debug("Completing topology ready future (calculated affinity) [locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); @@ -216,7 +216,7 @@ public class GridAffinityAssignmentCache { /** * @return Last calculated affinity version. */ - public long lastVersion() { + public AffinityTopologyVersion lastVersion() { return head.get().topologyVersion(); } @@ -230,8 +230,8 @@ public class GridAffinityAssignmentCache { log.debug("Cleaning up cache for version [locNodeId=" + ctx.localNodeId() + ", topVer=" + topVer + ']'); - for (Iterator<Long> it = affCache.keySet().iterator(); it.hasNext(); ) - if (it.next() < topVer) + for (Iterator<AffinityTopologyVersion> it = affCache.keySet().iterator(); it.hasNext(); ) + if (it.next().topologyVersion() < topVer) it.remove(); } @@ -239,7 +239,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @return Affinity assignment. */ - public List<List<ClusterNode>> assignments(long topVer) { + public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) { GridAffinityAssignment aff = cachedAffinity(topVer); return aff.assignment(); @@ -251,10 +251,10 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version to await for. * @return Future that will be completed after affinity for topology version {@code topVer} is calculated. */ - @Nullable public IgniteInternalFuture<Long> readyFuture(long topVer) { + @Nullable public IgniteInternalFuture<AffinityTopologyVersion> readyFuture(AffinityTopologyVersion topVer) { GridAffinityAssignment aff = head.get(); - if (aff.topologyVersion() >= topVer) { + if (aff.topologyVersion().compareTo(topVer) >= 0) { if (log.isDebugEnabled()) log.debug("Returning finished future for readyFuture [head=" + aff.topologyVersion() + ", topVer=" + topVer + ']'); @@ -262,12 +262,12 @@ public class GridAffinityAssignmentCache { return null; } - GridFutureAdapter<Long> fut = F.addIfAbsent(readyFuts, topVer, + GridFutureAdapter<AffinityTopologyVersion> fut = F.addIfAbsent(readyFuts, topVer, new AffinityReadyFuture(ctx.kernalContext(), topVer)); aff = head.get(); - if (aff.topologyVersion() >= topVer) { + if (aff.topologyVersion().compareTo(topVer) >= 0) { if (log.isDebugEnabled()) log.debug("Completing topology ready future right away [head=" + aff.topologyVersion() + ", topVer=" + topVer + ']'); @@ -315,7 +315,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @return Affinity nodes. */ - public List<ClusterNode> nodes(int part, long topVer) { + public List<ClusterNode> nodes(int part, AffinityTopologyVersion topVer) { // Resolve cached affinity nodes. return cachedAffinity(topVer).get(part); } @@ -327,7 +327,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @return Primary partitions for specified node ID. */ - public Set<Integer> primaryPartitions(UUID nodeId, long topVer) { + public Set<Integer> primaryPartitions(UUID nodeId, AffinityTopologyVersion topVer) { return cachedAffinity(topVer).primaryPartitions(nodeId); } @@ -338,7 +338,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @return Backup partitions for specified node ID. */ - public Set<Integer> backupPartitions(UUID nodeId, long topVer) { + public Set<Integer> backupPartitions(UUID nodeId, AffinityTopologyVersion topVer) { return cachedAffinity(topVer).backupPartitions(nodeId); } @@ -348,17 +348,17 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @return Cached affinity. */ - private GridAffinityAssignment cachedAffinity(long topVer) { - if (topVer == -1) + private GridAffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) { + if (topVer.equals(AffinityTopologyVersion.NONE)) topVer = lastVersion(); else awaitTopologyVersion(topVer); - assert topVer >= 0 : topVer; + assert topVer.topologyVersion() >= 0 : topVer; GridAffinityAssignment cache = head.get(); - if (cache.topologyVersion() != topVer) { + if (!cache.topologyVersion().equals(topVer)) { cache = affCache.get(topVer); if (cache == null) { @@ -368,7 +368,7 @@ public class GridAffinityAssignmentCache { } } - assert cache.topologyVersion() == topVer : "Invalid cached affinity: " + cache; + assert cache.topologyVersion().equals(topVer) : "Invalid cached affinity: " + cache; return cache; } @@ -376,10 +376,10 @@ public class GridAffinityAssignmentCache { /** * @param topVer Topology version to wait. */ - private void awaitTopologyVersion(long topVer) { + private void awaitTopologyVersion(AffinityTopologyVersion topVer) { GridAffinityAssignment aff = head.get(); - if (aff.topologyVersion() >= topVer) + if (aff.topologyVersion().compareTo(topVer) >= 0) return; try { @@ -387,7 +387,7 @@ public class GridAffinityAssignmentCache { log.debug("Will wait for topology version [locNodeId=" + ctx.localNodeId() + ", topVer=" + topVer + ']'); - IgniteInternalFuture<Long> fut = readyFuture(topVer); + IgniteInternalFuture<AffinityTopologyVersion> fut = readyFuture(topVer); if (fut != null) fut.get(); @@ -417,12 +417,12 @@ public class GridAffinityAssignmentCache { /** * Affinity ready future. Will remove itself from ready futures map. */ - private class AffinityReadyFuture extends GridFutureAdapter<Long> { + private class AffinityReadyFuture extends GridFutureAdapter<AffinityTopologyVersion> { /** */ private static final long serialVersionUID = 0L; /** */ - private long reqTopVer; + private AffinityTopologyVersion reqTopVer; /** * Empty constructor required by {@link Externalizable}. @@ -434,14 +434,14 @@ public class GridAffinityAssignmentCache { /** * @param ctx Kernal context. */ - private AffinityReadyFuture(GridKernalContext ctx, long reqTopVer) { + private AffinityReadyFuture(GridKernalContext ctx, AffinityTopologyVersion reqTopVer) { super(ctx); this.reqTopVer = reqTopVer; } /** {@inheritDoc} */ - @Override public boolean onDone(Long res, @Nullable Throwable err) { + @Override public boolean onDone(AffinityTopologyVersion res, @Nullable Throwable err) { assert res != null || err != null; boolean done = super.onDone(res, err); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index d7d0391..58aad82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -85,7 +85,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { final Collection<AffinityAssignmentKey> rmv = new HashSet<>(); for (AffinityAssignmentKey key : affMap.keySet()) { - if (!caches.contains(key.cacheName) || key.topVer < discoEvt.topologyVersion() - 10) + if (!caches.contains(key.cacheName) || key.topVer.topologyVersion() < discoEvt.topologyVersion() - 10) rmv.add(key); } @@ -167,7 +167,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @return Picked node. * @throws IgniteCheckedException If failed. */ - @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key, long topVer) throws IgniteCheckedException { + @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key, AffinityTopologyVersion topVer) throws IgniteCheckedException { Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, F.asList(key), topVer); return map != null ? F.first(map.keySet()) : null; @@ -189,7 +189,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { if (U.hasCache(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL) return Collections.singletonList(loc); - long topVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); AffinityInfo affInfo = affinityCache(cacheName, topVer); @@ -220,7 +220,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { if (key == null) return null; - AffinityInfo affInfo = affinityCache(cacheName, ctx.discovery().topologyVersion()); + AffinityInfo affInfo = affinityCache(cacheName, new AffinityTopologyVersion(ctx.discovery().topologyVersion())); if (affInfo == null || affInfo.mapper == null) return null; @@ -255,7 +255,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { */ private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName, Collection<? extends K> keys) throws IgniteCheckedException { - return keysToNodes(cacheName, keys, ctx.discovery().topologyVersion()); + return keysToNodes(cacheName, keys, new AffinityTopologyVersion(ctx.discovery().topologyVersion())); } /** @@ -266,7 +266,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName, - Collection<? extends K> keys, long topVer) throws IgniteCheckedException { + Collection<? extends K> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException { if (F.isEmpty(keys)) return Collections.emptyMap(); @@ -286,7 +286,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException In case of error. */ @SuppressWarnings("ErrorNotRethrown") - private AffinityInfo affinityCache(@Nullable final String cacheName, long topVer) throws IgniteCheckedException { + private AffinityInfo affinityCache(@Nullable final String cacheName, AffinityTopologyVersion topVer) + throws IgniteCheckedException { AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); IgniteInternalFuture<AffinityInfo> fut = affMap.get(key); @@ -406,7 +407,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @return Affinity cached function. * @throws IgniteCheckedException If either local or remote node cannot get deployment for affinity objects. */ - private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, long topVer, ClusterNode n) + private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, AffinityTopologyVersion topVer, ClusterNode n) throws IgniteCheckedException { GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure() .callAsyncNoFailover(BALANCE, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/).get(); @@ -561,13 +562,13 @@ public class GridAffinityProcessor extends GridProcessorAdapter { private String cacheName; /** */ - private long topVer; + private AffinityTopologyVersion topVer; /** * @param cacheName Cache name. * @param topVer Topology version. */ - private AffinityAssignmentKey(String cacheName, long topVer) { + private AffinityAssignmentKey(String cacheName, @NotNull AffinityTopologyVersion topVer) { this.cacheName = cacheName; this.topVer = topVer; } @@ -582,14 +583,14 @@ public class GridAffinityProcessor extends GridProcessorAdapter { AffinityAssignmentKey that = (AffinityAssignmentKey)o; - return topVer == that.topVer && F.eq(cacheName, that.cacheName); + return topVer.equals(that.topVer) && F.eq(cacheName, that.cacheName); } /** {@inheritDoc} */ @Override public int hashCode() { int res = cacheName != null ? cacheName.hashCode() : 0; - res = 31 * res + (int)(topVer ^ (topVer >>> 32)); + res = 31 * res + topVer.hashCode(); return res; } @@ -853,7 +854,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @return Affinity info for current topology version. */ private AffinityInfo cache() throws IgniteCheckedException { - return affinityCache(cacheName, topologyVersion()); + return affinityCache(cacheName, new AffinityTopologyVersion(topologyVersion())); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java index a5e8349..33bc851 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java @@ -48,7 +48,7 @@ class GridAffinityUtils { * @return Affinity job. */ static Callable<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> affinityJob( - String cacheName, long topVer) { + String cacheName, AffinityTopologyVersion topVer) { return new AffinityJob(cacheName, topVer); } @@ -135,12 +135,12 @@ class GridAffinityUtils { private String cacheName; /** */ - private long topVer; + private AffinityTopologyVersion topVer; /** * @param cacheName Cache name. */ - private AffinityJob(@Nullable String cacheName, long topVer) { + private AffinityJob(@Nullable String cacheName, @NotNull AffinityTopologyVersion topVer) { this.cacheName = cacheName; this.topVer = topVer; } @@ -175,13 +175,13 @@ class GridAffinityUtils { /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, cacheName); - out.writeLong(topVer); + out.writeObject(topVer); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { cacheName = U.readString(in); - topVer = in.readLong(); + topVer = (AffinityTopologyVersion)in.readObject(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java index c56355b..1d4a5cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java @@ -38,7 +38,7 @@ public class GridCacheAffinityFunctionContextImpl implements CacheAffinityFuncti private DiscoveryEvent discoEvt; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** Number of backups to assign. */ private int backups; @@ -48,7 +48,7 @@ public class GridCacheAffinityFunctionContextImpl implements CacheAffinityFuncti * @param topVer Topology version. */ public GridCacheAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment, - DiscoveryEvent discoEvt, long topVer, int backups) { + DiscoveryEvent discoEvt, @NotNull AffinityTopologyVersion topVer, int backups) { this.topSnapshot = topSnapshot; this.prevAssignment = prevAssignment; this.discoEvt = discoEvt; @@ -67,7 +67,7 @@ public class GridCacheAffinityFunctionContextImpl implements CacheAffinityFuncti } /** {@inheritDoc} */ - @Override public long currentTopologyVersion() { + @Override public AffinityTopologyVersion currentTopologyVersion() { return topVer; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 12ea535..21f6137 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -25,6 +25,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.compute.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -750,7 +751,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, // Swap and offheap are disabled for near cache. if (modes.primary || modes.backup) { - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); GridCacheSwapManager<K, V> swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); @@ -797,7 +798,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, V val = null; if (!ctx.isLocal()) { - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); int part = ctx.affinity().partition(key); @@ -905,7 +906,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, GridCacheEntryEx<K, V> e = peekEx(key); if (e != null) - return e.peek(heap, offheap, swap, -1, plc); + return e.peek(heap, offheap, swap, AffinityTopologyVersion.NONE, plc); } if (offheap || swap) { @@ -1277,7 +1278,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param key Entry key. * @return Entry (never {@code null}). */ - public GridCacheEntryEx<K, V> entryEx(K key, long topVer) { + public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) { GridCacheEntryEx<K, V> e = entry0(key, topVer, true, false); assert e != null; @@ -1292,7 +1293,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param touch Flag to touch created entry (only if entry was actually created). * @return Entry or <tt>null</tt>. */ - @Nullable private GridCacheEntryEx<K, V> entry0(K key, long topVer, boolean create, boolean touch) { + @Nullable private GridCacheEntryEx<K, V> entry0(K key, AffinityTopologyVersion topVer, boolean create, boolean touch) { GridTriple<GridCacheMapEntry<K, V>> t = map.putEntryIfObsoleteOrAbsent(topVer, key, null, ctx.config().getDefaultTimeToLive(), create); @@ -1767,7 +1768,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, @Nullable UUID subjId, String taskName) { ctx.denyOnFlag(READ); - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); if (!F.isEmpty(keys)) { final String uid = CU.uuid(); // Get meta UUID for this thread. @@ -1915,7 +1916,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param key Key. * @return Entry. */ - @Nullable protected GridCacheEntryEx<K, V> entryExSafe(K key, long topVer) { + @Nullable protected GridCacheEntryEx<K, V> entryExSafe(K key, AffinityTopologyVersion topVer) { return entryEx(key); } @@ -2129,7 +2130,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, try { assert keys != null; - final long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + final AffinityTopologyVersion topVer = tx == null + ? ctx.affinity().affinityTopologyVersion() + : tx.topologyVersion(); final Map<K, V> map = new GridLeanMap<>(keys.size()); @@ -3772,7 +3775,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, validateCacheKey(key); try { - GridCacheEntryEx<K, V> e = entry0(key, ctx.discovery().topologyVersion(), false, false); + GridCacheEntryEx<K, V> e = entry0(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion()), + false, false); if (e == null) return false; @@ -3868,7 +3872,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException { final boolean replicate = ctx.isDrEnabled(); - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); @@ -3925,7 +3929,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, V val, GridCacheVersion ver, @Nullable IgniteBiPredicate<K, V> p, - long topVer, + AffinityTopologyVersion topVer, boolean replicate, long ttl) { if (p != null && !p.apply(key, val)) @@ -4081,7 +4085,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, throws IgniteCheckedException { final boolean replicate = ctx.isDrEnabled(); - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); @@ -4231,7 +4235,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, // Swap and offheap are disabled for near cache. if (modes.primary || modes.backup) { - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); GridCacheSwapManager<K, V> swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); @@ -5159,7 +5163,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (keyCheck) validateCacheKey(key); - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); if (ctx.portableEnabled()) key = (K)ctx.marshalToPortable(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index d3510e4..6cdfc6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -36,6 +36,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V /** Factor for maximum number of attempts to calculate all partition affinity keys. */ private static final int MAX_PARTITION_KEY_ATTEMPT_RATIO = 10; + /** */ + private static final AffinityTopologyVersion TOP_FIRST = new AffinityTopologyVersion(1); + /** Affinity cached function. */ private GridAffinityAssignmentCache aff; @@ -83,7 +86,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V @Override protected void onKernalStart0() throws IgniteCheckedException { if (cctx.isLocal()) // No discovery event needed for local affinity. - aff.calculate(1, null); + aff.calculate(TOP_FIRST, null); } /** {@inheritDoc} */ @@ -103,10 +106,21 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version to wait. * @return Affinity ready future. */ - public IgniteInternalFuture<Long> affinityReadyFuture(long topVer) { + public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(long topVer) { + return affinityReadyFuture(new AffinityTopologyVersion(topVer)); + } + + /** + * Gets affinity ready future, a future that will be completed after affinity with given + * topology version is calculated. + * + * @param topVer Topology version to wait. + * @return Affinity ready future. + */ + public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(AffinityTopologyVersion topVer) { assert !cctx.isLocal(); - IgniteInternalFuture<Long> fut = aff.readyFuture(topVer); + IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(topVer); return fut != null ? fut : new GridFinishedFutureEx<>(topVer); } @@ -118,7 +132,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version to wait. * @return Affinity ready future or {@code null}. */ - @Nullable public IgniteInternalFuture<Long> affinityReadyFuturex(long topVer) { + @Nullable public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuturex(AffinityTopologyVersion topVer) { assert !cctx.isLocal(); return aff.readyFuture(topVer); @@ -141,7 +155,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @param affAssignment Affinity assignment for this topology version. */ - public void initializeAffinity(long topVer, List<List<ClusterNode>> affAssignment) { + public void initializeAffinity(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { assert !cctx.isLocal(); aff.initialize(topVer, affAssignment); @@ -151,9 +165,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return Affinity assignments. */ - public List<List<ClusterNode>> assignments(long topVer) { + public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = 1; + topVer = new AffinityTopologyVersion(1); return aff.assignments(topVer); } @@ -164,7 +178,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version to calculate affinity for. * @param discoEvt Discovery event that causes this topology change. */ - public List<List<ClusterNode>> calculateAffinity(long topVer, DiscoveryEvent discoEvt) { + public List<List<ClusterNode>> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { assert !cctx.isLocal(); return aff.calculate(topVer, discoEvt); @@ -207,7 +221,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return Affinity nodes. */ - public List<ClusterNode> nodes(K key, long topVer) { + public List<ClusterNode> nodes(K key, AffinityTopologyVersion topVer) { return nodes(partition(key), topVer); } @@ -216,9 +230,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return Affinity nodes. */ - public List<ClusterNode> nodes(int part, long topVer) { + public List<ClusterNode> nodes(int part, AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = 1; + topVer = new AffinityTopologyVersion(1); return aff.nodes(part, topVer); } @@ -228,7 +242,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return Primary node for given key. */ - @Nullable public ClusterNode primary(K key, long topVer) { + @Nullable public ClusterNode primary(K key, AffinityTopologyVersion topVer) { return primary(partition(key), topVer); } @@ -237,7 +251,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return Primary node for given key. */ - @Nullable public ClusterNode primary(int part, long topVer) { + @Nullable public ClusterNode primary(int part, AffinityTopologyVersion topVer) { List<ClusterNode> nodes = nodes(part, topVer); if (nodes.isEmpty()) @@ -252,7 +266,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return {@code True} if checked node is primary for given key. */ - public boolean primary(ClusterNode n, K key, long topVer) { + public boolean primary(ClusterNode n, K key, AffinityTopologyVersion topVer) { return F.eq(primary(key, topVer), n); } @@ -262,7 +276,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return {@code True} if checked node is primary for given key. */ - public boolean primary(ClusterNode n, int part, long topVer) { + public boolean primary(ClusterNode n, int part, AffinityTopologyVersion topVer) { return F.eq(primary(part, topVer), n); } @@ -271,7 +285,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return Backup nodes. */ - public Collection<ClusterNode> backups(K key, long topVer) { + public Collection<ClusterNode> backups(K key, AffinityTopologyVersion topVer) { return backups(partition(key), topVer); } @@ -280,7 +294,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return Backup nodes. */ - public Collection<ClusterNode> backups(int part, long topVer) { + public Collection<ClusterNode> backups(int part, AffinityTopologyVersion topVer) { List<ClusterNode> nodes = nodes(part, topVer); assert !F.isEmpty(nodes); @@ -296,7 +310,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return Nodes for the keys. */ - public Collection<ClusterNode> remoteNodes(Iterable<? extends K> keys, long topVer) { + public Collection<ClusterNode> remoteNodes(Iterable<? extends K> keys, AffinityTopologyVersion topVer) { Collection<Collection<ClusterNode>> colcol = new GridLeanSet<>(); for (K key : keys) @@ -310,7 +324,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return {@code true} if given key belongs to local node. */ - public boolean localNode(K key, long topVer) { + public boolean localNode(K key, AffinityTopologyVersion topVer) { return localNode(partition(key), topVer); } @@ -319,7 +333,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return {@code true} if given partition belongs to local node. */ - public boolean localNode(int part, long topVer) { + public boolean localNode(int part, AffinityTopologyVersion topVer) { assert part >= 0 : "Invalid partition: " + part; return nodes(part, topVer).contains(cctx.localNode()); @@ -331,7 +345,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return {@code true} if given partition belongs to specified node. */ - public boolean belongs(ClusterNode node, int part, long topVer) { + public boolean belongs(ClusterNode node, int part, AffinityTopologyVersion topVer) { assert node != null; assert part >= 0 : "Invalid partition: " + part; @@ -344,7 +358,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version. * @return {@code true} if given key belongs to specified node. */ - public boolean belongs(ClusterNode node, K key, long topVer) { + public boolean belongs(ClusterNode node, K key, AffinityTopologyVersion topVer) { assert node != null; return belongs(node, partition(key), topVer); @@ -355,9 +369,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version to calculate affinity. * @return Partitions for which given node is primary. */ - public Set<Integer> primaryPartitions(UUID nodeId, long topVer) { + public Set<Integer> primaryPartitions(UUID nodeId, AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = 1; + topVer = new AffinityTopologyVersion(1); return aff.primaryPartitions(nodeId, topVer); } @@ -367,9 +381,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V * @param topVer Topology version to calculate affinity. * @return Partitions for which given node is backup. */ - public Set<Integer> backupPartitions(UUID nodeId, long topVer) { + public Set<Integer> backupPartitions(UUID nodeId, AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = 1; + topVer = new AffinityTopologyVersion(1); return aff.backupPartitions(nodeId, topVer); } @@ -377,7 +391,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V /** * @return Affinity-ready topology version. */ - public long affinityTopologyVersion() { + public AffinityTopologyVersion affinityTopologyVersion() { return aff.lastVersion(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index ec0999a..0c42eba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.internal.processors.affinity.*; + import java.util.*; /** @@ -31,7 +33,7 @@ public interface GridCacheAtomicFuture<K, R> extends GridCacheFuture<R> { /** * @return Future topology version. */ - public long topologyVersion(); + public AffinityTopologyVersion topologyVersion(); /** * @return Future keys. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index a169706..c1634d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; @@ -463,7 +464,7 @@ public class GridCacheConcurrentMap<K, V> { * @param ttl Time to live. * @return Cache entry for corresponding key-value pair. */ - public GridCacheMapEntry<K, V> putEntry(long topVer, K key, @Nullable V val, long ttl) { + public GridCacheMapEntry<K, V> putEntry(AffinityTopologyVersion topVer, K key, @Nullable V val, long ttl) { assert key != null; checkWeakQueue(); @@ -482,7 +483,7 @@ public class GridCacheConcurrentMap<K, V> { * @return Triple where the first element is current entry associated with the key, * the second is created entry and the third is doomed (all may be null). */ - public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(long topVer, K key, @Nullable V val, + public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, K key, @Nullable V val, long ttl, boolean create) { assert key != null; @@ -504,7 +505,7 @@ public class GridCacheConcurrentMap<K, V> { */ public void putAll(Map<? extends K, ? extends V> m, long ttl) { for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) - putEntry(-1, e.getKey(), e.getValue(), ttl); + putEntry(AffinityTopologyVersion.NONE, e.getKey(), e.getValue(), ttl); } /** @@ -894,7 +895,7 @@ public class GridCacheConcurrentMap<K, V> { * @return Associated value. */ @SuppressWarnings({"unchecked"}) - GridCacheMapEntry<K, V> put(K key, int hash, @Nullable V val, long topVer, long ttl) { + GridCacheMapEntry<K, V> put(K key, int hash, @Nullable V val, AffinityTopologyVersion topVer, long ttl) { lock(); try { @@ -914,7 +915,7 @@ public class GridCacheConcurrentMap<K, V> { * @return Associated value. */ @SuppressWarnings({"unchecked", "SynchronizationOnLocalVariableOrMethodParameter"}) - private GridCacheMapEntry<K, V> put0(K key, int hash, V val, long topVer, long ttl) { + private GridCacheMapEntry<K, V> put0(K key, int hash, V val, AffinityTopologyVersion topVer, long ttl) { try { SegmentHeader<K, V> hdr = this.hdr; @@ -989,8 +990,8 @@ public class GridCacheConcurrentMap<K, V> { * the second is created entry and the third is doomed (all may be null). */ @SuppressWarnings( {"unchecked"}) - GridTriple<GridCacheMapEntry<K, V>> putIfObsolete(K key, int hash, @Nullable V val, long topVer, long ttl, - boolean create) { + GridTriple<GridCacheMapEntry<K, V>> putIfObsolete(K key, int hash, @Nullable V val, + AffinityTopologyVersion topVer, long ttl, boolean create) { lock(); try { @@ -1872,9 +1873,6 @@ public class GridCacheConcurrentMap<K, V> { boolean containsValue(V v) { A.notNull(v, "value"); - if (v == null) - return false; - for (Iterator<V> it = valueIterator(); it.hasNext(); ) { V v0 = it.next(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 3ec013c..c4367b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.managers.swapspace.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; @@ -479,7 +480,8 @@ public class GridCacheContext<K, V> implements Externalizable { cache.map().incrementSize(e); if (isDht() || isColocated() || isDhtAtomic()) { - GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), -1, false); + GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), AffinityTopologyVersion.NONE, + false); if (part != null) part.incrementPublicSize(); @@ -497,7 +499,8 @@ public class GridCacheContext<K, V> implements Externalizable { cache.map().decrementSize(e); if (isDht() || isColocated() || isDhtAtomic()) { - GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), -1, false); + GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), AffinityTopologyVersion.NONE, + false); if (part != null) part.decrementPublicSize(); @@ -1506,10 +1509,10 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if mapped. * @throws GridCacheEntryRemovedException If reader for entry is removed. */ - public boolean dhtMap(UUID nearNodeId, long topVer, GridDhtCacheEntry<K, V> entry, IgniteLogger log, + public boolean dhtMap(UUID nearNodeId, AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> entry, IgniteLogger log, Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtMap, @Nullable Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap) throws GridCacheEntryRemovedException { - assert topVer != -1; + assert !topVer.equals(AffinityTopologyVersion.NONE); Collection<ClusterNode> dhtNodes = dht().topology().nodes(entry.partition(), topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 1ffed64..4bc1e63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -274,7 +275,7 @@ public interface GridCacheEntryEx<K, V> { * * @return Checks if value is valid. */ - public boolean valid(long topVer); + public boolean valid(AffinityTopologyVersion topVer); /** * @return {@code True} if partition is in valid. @@ -361,7 +362,7 @@ public interface GridCacheEntryEx<K, V> { long ttl, boolean evt, boolean metrics, - long topVer, + AffinityTopologyVersion topVer, IgnitePredicate<Cache.Entry<K, V>>[] filter, GridDrType drType, long drExpireTime, @@ -397,7 +398,7 @@ public interface GridCacheEntryEx<K, V> { boolean retval, boolean evt, boolean metrics, - long topVer, + AffinityTopologyVersion topVer, IgnitePredicate<Cache.Entry<K, V>>[] filter, GridDrType drType, @Nullable GridCacheVersion explicitVer, @@ -612,7 +613,7 @@ public interface GridCacheEntryEx<K, V> { @Nullable public V peek(boolean heap, boolean offheap, boolean swap, - long topVer, + AffinityTopologyVersion topVer, @Nullable IgniteCacheExpiryPolicy plc) throws GridCacheEntryRemovedException, IgniteCheckedException; @@ -684,7 +685,8 @@ public interface GridCacheEntryEx<K, V> { * @throws GridCacheEntryRemovedException If entry was removed. */ public boolean initialValue(V val, @Nullable byte[] valBytes, GridCacheVersion ver, long ttl, long expireTime, - boolean preload, long topVer, GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException; + boolean preload, AffinityTopologyVersion topVer, GridDrType drType) + throws IgniteCheckedException, GridCacheEntryRemovedException; /** * Sets new value if current version is <tt>0</tt> using swap entry data. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 382eb61..1ade451 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -26,6 +26,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -347,10 +348,10 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V return; } - long topVer = lockTopology(); + AffinityTopologyVersion topVer = lockTopology(); try { - if (topVer != req.topologyVersion()) { + if (!topVer.equals(req.topologyVersion())) { if (log.isDebugEnabled()) log.debug("Topology version is different [locTopVer=" + topVer + ", rmtTopVer=" + req.topologyVersion() + ']'); @@ -498,7 +499,8 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V if (!cctx.isNear()) { try { - GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, -1, false); + GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, + AffinityTopologyVersion.NONE, false); assert part != null; @@ -525,7 +527,8 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V if (!cctx.isNear()) { try { - GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, -1, false); + GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE, + false); if (part != null && part.reserve()) { part.lock(); @@ -561,7 +564,8 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V if (!cctx.isNear()) { try { - GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, -1, false); + GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE, + false); if (part != null) { part.unlock(); @@ -582,14 +586,14 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V * * @return Topology version after lock. */ - private long lockTopology() { + private AffinityTopologyVersion lockTopology() { if (!cctx.isNear()) { cctx.dht().topology().readLock(); return cctx.dht().topology().topologyVersion(); } - return 0; + return AffinityTopologyVersion.ZERO; } /** @@ -738,7 +742,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V * @param e Entry for eviction policy notification. * @param topVer Topology version. */ - public void touch(GridCacheEntryEx<K, V> e, long topVer) { + public void touch(GridCacheEntryEx<K, V> e, AffinityTopologyVersion topVer) { if (e.detached() || e.isInternal()) return; @@ -1092,7 +1096,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V } try { - long topVer = lockTopology(); + AffinityTopologyVersion topVer = lockTopology(); try { onFutureCompleted((EvictionFuture)f, topVer); @@ -1124,7 +1128,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V * @param fut Completed eviction future. * @param topVer Topology version on future complete. */ - private void onFutureCompleted(EvictionFuture fut, long topVer) { + private void onFutureCompleted(EvictionFuture fut, AffinityTopologyVersion topVer) { if (!busyLock.enterBusy()) return; @@ -1151,7 +1155,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V } // Check if topology version is different. - if (fut.topologyVersion() != topVer) { + if (!fut.topologyVersion().equals(topVer)) { if (log.isDebugEnabled()) log.debug("Topology has changed, all entries will be touched: " + fut); @@ -1264,7 +1268,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V */ @SuppressWarnings( {"IfMayBeConditional"}) private IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>> remoteNodes(GridCacheEntryEx<K, V> entry, - long topVer) + AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { assert entry != null; @@ -1422,7 +1426,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V if (!evts.isEmpty()) break; - if (!cctx.affinity().primary(loc, it.next(), evt.topologyVersion())) + if (!cctx.affinity().primary(loc, it.next(), new AffinityTopologyVersion(evt.topologyVersion()))) it.remove(); } @@ -1434,7 +1438,8 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V if (!evts.isEmpty()) break; - if (part.primary(evt.topologyVersion()) && primaryParts.add(part.id())) { + if (part.primary(new AffinityTopologyVersion(evt.topologyVersion())) + && primaryParts.add(part.id())) { if (log.isDebugEnabled()) log.debug("Touching partition entries: " + part); @@ -1562,7 +1567,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V private GridTimeoutObject timeoutObj; /** Topology version future is processed on. */ - private long topVer; + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; /** * @param ctx Context. @@ -1793,7 +1798,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V /** * @return Topology version. */ - long topologyVersion() { + AffinityTopologyVersion topologyVersion() { return topVer; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java index 676144d..8f32c33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java @@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@ -50,7 +52,7 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple private byte[] entriesBytes; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Required by {@link Externalizable}. @@ -65,10 +67,10 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple * @param size Size. * @param topVer Topology version. */ - GridCacheEvictionRequest(int cacheId, long futId, int size, long topVer) { + GridCacheEvictionRequest(int cacheId, long futId, int size, @NotNull AffinityTopologyVersion topVer) { assert futId > 0; assert size > 0; - assert topVer > 0; + assert topVer.topologyVersion() > 0; this.cacheId = cacheId; this.futId = futId; @@ -116,7 +118,7 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple /** * @return Topology version. */ - @Override public long topologyVersion() { + @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -167,7 +169,7 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple writer.incrementState(); case 5: - if (!writer.writeLong("topVer", topVer)) + if (!topVer.writeTo(writer)) return false; writer.incrementState(); @@ -205,7 +207,7 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple reader.incrementState(); case 5: - topVer = reader.readLong("topVer"); + topVer = AffinityTopologyVersion.readFrom(reader); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 8cd7c4b..b504e21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -106,7 +106,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V } long locTopVer = cctx.discovery().topologyVersion(); - long rmtTopVer = cacheMsg.topologyVersion(); + long rmtTopVer = cacheMsg.topologyVersion().topologyVersion(); if (locTopVer < rmtTopVer) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index db7272b..ff337fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.eviction.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.extras.*; import org.apache.ignite.internal.processors.cache.query.*; @@ -398,7 +399,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } /** {@inheritDoc} */ - @Override public boolean valid(long topVer) { + @Override public boolean valid(AffinityTopologyVersion topVer) { return true; } @@ -976,7 +977,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> long ttl, boolean evt, boolean metrics, - long topVer, + AffinityTopologyVersion topVer, IgnitePredicate<Cache.Entry<K, V>>[] filter, GridDrType drType, long drExpireTime, @@ -1114,7 +1115,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean retval, boolean evt, boolean metrics, - long topVer, + AffinityTopologyVersion topVer, IgnitePredicate<Cache.Entry<K, V>>[] filter, GridDrType drType, @Nullable GridCacheVersion explicitVer, @@ -2728,7 +2729,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Nullable @Override public V peek(boolean heap, boolean offheap, boolean swap, - long topVer, + AffinityTopologyVersion topVer, @Nullable IgniteCacheExpiryPolicy expiryPlc) throws GridCacheEntryRemovedException, IgniteCheckedException { @@ -2812,7 +2813,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException { assert tx == null || tx.local(); - long topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion(); switch (mode) { case TX: @@ -2864,7 +2865,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> synchronized (this) { checkObsolete(); - if (isNew() || !valid(-1)) + if (isNew() || !valid(AffinityTopologyVersion.NONE)) unswap(true, true); if (deletedUnlocked()) @@ -2928,7 +2929,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (peek != null) return peek; - long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); return peekGlobal(failFast, topVer, filter, null); } @@ -2958,7 +2959,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> */ @SuppressWarnings({"RedundantTypeArguments"}) @Nullable private GridTuple<V> peekGlobal(boolean failFast, - long topVer, + AffinityTopologyVersion topVer, IgnitePredicate<Cache.Entry<K, V>>[] filter, @Nullable IgniteCacheExpiryPolicy expiryPlc ) @@ -3147,7 +3148,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> long ttl, long expireTime, boolean preload, - long topVer, + AffinityTopologyVersion topVer, GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException { if (cctx.isUnmarshalValues() && valBytes != null && val == null && isNewLocked()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java index ad7cf9e..73a0996 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.internal.processors.affinity.*; import org.jetbrains.annotations.*; /** @@ -34,6 +35,6 @@ public interface GridCacheMapEntryFactory<K, V> { * @param hdrId Header id. * @return New cache entry. */ - public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val, - @Nullable GridCacheMapEntry<K, V> next, long ttl, int hdrId); + public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, + V val, @Nullable GridCacheMapEntry<K, V> next, long ttl, int hdrId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 6b8689c..8165cc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -163,8 +164,8 @@ public abstract class GridCacheMessage<K, V> implements Message { * * @return Topology version. */ - public long topologyVersion() { - return -1; + public AffinityTopologyVersion topologyVersion() { + return AffinityTopologyVersion.NONE; } /**