http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java new file mode 100644 index 0000000..3fc6618 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * DHT cache lock response. + */ +public class GridDhtLockResponse<K, V> extends GridDistributedLockResponse<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Evicted readers. */ + @GridToStringInclude + @GridDirectTransient + private Collection<IgniteTxKey<K>> nearEvicted; + + /** Evicted reader key bytes. */ + @GridDirectCollection(byte[].class) + private Collection<byte[]> nearEvictedBytes; + + /** Mini ID. */ + private IgniteUuid miniId; + + /** Invalid partitions. */ + @GridToStringInclude + @GridDirectCollection(int.class) + private Set<Integer> invalidParts = new GridLeanSet<>(); + + @GridDirectTransient + /** Preload entries. */ + private List<GridCacheEntryInfo<K, V>> preloadEntries; + + /** */ + @GridDirectCollection(byte[].class) + @GridDirectVersion(1) + private List<byte[]> preloadEntriesBytes; + + /** + * Empty constructor (required by {@link Externalizable}). + */ + public GridDhtLockResponse() { + // No-op. + } + + /** + * @param lockVer Lock version. + * @param futId Future ID. + * @param miniId Mini future ID. + * @param cnt Key count. + */ + public GridDhtLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, IgniteUuid miniId, int cnt) { + super(cacheId, lockVer, futId, cnt); + + assert miniId != null; + + this.miniId = miniId; + } + + /** + * @param lockVer Lock ID. + * @param futId Future ID. + * @param miniId Mini future ID. + * @param err Error. + */ + public GridDhtLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, IgniteUuid miniId, Throwable err) { + super(cacheId, lockVer, futId, err); + + assert miniId != null; + + this.miniId = miniId; + } + + /** + * @return Evicted readers. + */ + public Collection<IgniteTxKey<K>> nearEvicted() { + return nearEvicted; + } + + /** + * @param nearEvicted Evicted readers. + */ + public void nearEvicted(Collection<IgniteTxKey<K>> nearEvicted) { + this.nearEvicted = nearEvicted; + } + + /** + * @param nearEvictedBytes Key bytes. + */ + public void nearEvictedBytes(Collection<byte[]> nearEvictedBytes) { + this.nearEvictedBytes = nearEvictedBytes; + } + + /** + * @return Mini future ID. + */ + public IgniteUuid miniId() { + return miniId; + } + + /** + * @param part Invalid partition. + */ + public void addInvalidPartition(int part) { + invalidParts.add(part); + } + + /** + * @return Invalid partitions. + */ + public Set<Integer> invalidPartitions() { + return invalidParts; + } + + /** + * Adds preload entry to lock response. + * + * @param info Info to add. + */ + public void addPreloadEntry(GridCacheEntryInfo<K, V> info) { + if (preloadEntries == null) + preloadEntries = new ArrayList<>(); + + preloadEntries.add(info); + } + + /** + * Gets preload entries returned from backup. + * + * @return Collection of preload entries. + */ + public Collection<GridCacheEntryInfo<K, V>> preloadEntries() { + return preloadEntries == null ? Collections.<GridCacheEntryInfo<K, V>>emptyList() : preloadEntries; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (nearEvictedBytes == null && nearEvicted != null) + nearEvictedBytes = marshalCollection(nearEvicted, ctx); + + if (preloadEntriesBytes == null && preloadEntries != null) + preloadEntriesBytes = marshalCollection(preloadEntries, ctx); + + if (preloadEntriesBytes == null && preloadEntries != null) { + marshalInfos(preloadEntries, ctx); + + preloadEntriesBytes = marshalCollection(preloadEntries, ctx); + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (nearEvicted == null && nearEvictedBytes != null) + nearEvicted = unmarshalCollection(nearEvictedBytes, ctx, ldr); + + if (preloadEntries == null && preloadEntriesBytes != null) + preloadEntries = unmarshalCollection(preloadEntriesBytes, ctx, ldr); + + if (preloadEntries == null && preloadEntriesBytes != null) { + preloadEntries = unmarshalCollection(preloadEntriesBytes, ctx, ldr); + + unmarshalInfos(preloadEntries, ctx.cacheContext(cacheId), ldr); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtLockResponse _clone = new GridDhtLockResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtLockResponse _clone = (GridDhtLockResponse)_msg; + + _clone.nearEvicted = nearEvicted; + _clone.nearEvictedBytes = nearEvictedBytes; + _clone.miniId = miniId; + _clone.invalidParts = invalidParts; + _clone.preloadEntries = preloadEntries; + _clone.preloadEntriesBytes = preloadEntriesBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 11: + if (invalidParts != null) { + if (commState.it == null) { + if (!commState.putInt(invalidParts.size())) + return false; + + commState.it = invalidParts.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putInt((int)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 12: + if (!commState.putGridUuid(miniId)) + return false; + + commState.idx++; + + case 13: + if (nearEvictedBytes != null) { + if (commState.it == null) { + if (!commState.putInt(nearEvictedBytes.size())) + return false; + + commState.it = nearEvictedBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 14: + if (preloadEntriesBytes != null) { + if (commState.it == null) { + if (!commState.putInt(preloadEntriesBytes.size())) + return false; + + commState.it = preloadEntriesBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 11: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (invalidParts == null) + invalidParts = new HashSet<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (buf.remaining() < 4) + return false; + + int _val = commState.getInt(); + + invalidParts.add((Integer)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 12: + IgniteUuid miniId0 = commState.getGridUuid(); + + if (miniId0 == GRID_UUID_NOT_READ) + return false; + + miniId = miniId0; + + commState.idx++; + + case 13: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (nearEvictedBytes == null) + nearEvictedBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + nearEvictedBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 14: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (preloadEntriesBytes == null) + preloadEntriesBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + preloadEntriesBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 30; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtLockResponse.class, this, super.toString()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java new file mode 100644 index 0000000..d8c1a91 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.jetbrains.annotations.*; + +/** + * Partition states. + */ +public enum GridDhtPartitionState { + /** Partition is being loaded from another node. */ + MOVING, + + /** This node is either a primary or backup owner. */ + OWNING, + + /** This node is neither primary or back up owner. */ + RENTING, + + /** Partition has been evicted from cache. */ + EVICTED; + + /** Enum values. */ + private static final GridDhtPartitionState[] VALS = values(); + + /** + * @param ord Ordinal value. + * @return Enum value. + */ + @Nullable public static GridDhtPartitionState fromOrdinal(int ord) { + return ord < 0 || ord >= VALS.length ? null : VALS[ord]; + } + + /** + * @return {@code True} if state is active or owning. + */ + public boolean active() { + return this != EVICTED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java new file mode 100644 index 0000000..d9a20ae --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * DHT partition topology. + */ +@GridToStringExclude +public interface GridDhtPartitionTopology<K, V> { + /** + * Locks the topology, usually during mapping on locks or transactions. + */ + public void readLock(); + + /** + * Unlocks topology locked by {@link #readLock()} method. + */ + public void readUnlock(); + + /** + * Updates topology version. + * + * @param exchId Exchange ID. + * @param exchFut Exchange future. + */ + public void updateTopologyVersion(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture<K, V> exchFut); + + /** + * Topology version. + * + * @return Topology version. + */ + public long topologyVersion(); + + /** + * Gets a future that will be completed when partition exchange map for this + * particular topology version is done. + * + * @return Topology version ready future. + */ + public GridDhtTopologyFuture topologyVersionFuture(); + + /** + * Pre-initializes this topology. + * + * @param exchId Exchange ID for this pre-initialization. + * @throws IgniteCheckedException If failed. + */ + public void beforeExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException; + + /** + * Post-initializes this topology. + * + * @param exchId Exchange ID for this post-initialization. + * @return {@code True} if mapping was changed. + * @throws IgniteCheckedException If failed. + */ + public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException; + + /** + * @param topVer Topology version at the time of creation. + * @param p Partition ID. + * @param create If {@code true}, then partition will be created if it's not there. + * @return Local partition. + * @throws GridDhtInvalidPartitionException If partition is evicted or absent and + * does not belong to this node. + */ + @Nullable public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) + throws GridDhtInvalidPartitionException; + + /** + * @param key Cache key. + * @param create If {@code true}, then partition will be created if it's not there. + * @return Local partition. + * @throws GridDhtInvalidPartitionException If partition is evicted or absent and + * does not belong to this node. + */ + @Nullable public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) + throws GridDhtInvalidPartitionException; + + /** + * @return All local partitions by copying them into another list. + */ + public List<GridDhtLocalPartition<K, V>> localPartitions(); + + /** + * + * @return All current local partitions. + */ + public Collection<GridDhtLocalPartition<K, V>> currentLocalPartitions(); + + /** + * @return Local IDs. + */ + public GridDhtPartitionMap localPartitionMap(); + + /** + * @return Current update sequence. + */ + public long updateSequence(); + + /** + * @param p Partition ID. + * @param topVer Topology version. + * @return Collection of all nodes responsible for this partition with primary node being first. + */ + public Collection<ClusterNode> nodes(int p, long topVer); + + /** + * @param p Partition ID. + * @return Collection of all nodes who {@code own} this partition. + */ + public List<ClusterNode> owners(int p); + + /** + * @param p Partition ID. + * @param topVer Topology version. + * @return Collection of all nodes who {@code own} this partition. + */ + public List<ClusterNode> owners(int p, long topVer); + + /** + * @param p Partition ID. + * @return Collection of all nodes who {@code are preloading} this partition. + */ + public List<ClusterNode> moving(int p); + + /** + * @param onlyActive If {@code true}, then only {@code active} partitions will be returned. + * @return Node IDs mapped to partitions. + */ + public GridDhtPartitionFullMap partitionMap(boolean onlyActive); + + /** + * @param topVer Topology version. + * @param e Entry added to cache. + * @return Local partition. + */ + public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e); + + /** + * @param e Entry removed from cache. + */ + public void onRemoved(GridDhtCacheEntry<K, V> e); + + /** + * @param exchId Exchange ID. + * @param partMap Update partition map. + * @return Local partition map if there were evictions or {@code null} otherwise. + */ + public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap); + + /** + * @param exchId Exchange ID. + * @param parts Partitions. + * @return Local partition map if there were evictions or {@code null} otherwise. + */ + @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionMap parts); + + /** + * @param part Partition to own. + * @return {@code True} if owned. + */ + public boolean own(GridDhtLocalPartition<K, V> part); + + /** + * @param part Evicted partition. + */ + public void onEvicted(GridDhtLocalPartition<K, V> part, boolean updateSeq); + + /** + * @param nodeId Node to get partitions for. + * @return Partitions for node. + */ + @Nullable public GridDhtPartitionMap partitions(UUID nodeId); + + /** + * Prints memory stats. + * + * @param threshold Threshold for number of entries. + */ + public void printMemoryStats(int threshold); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java new file mode 100644 index 0000000..71174e5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -0,0 +1,1195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Partition topology. + */ +@GridToStringExclude +class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, V> { + /** If true, then check consistency. */ + private static final boolean CONSISTENCY_CHECK = false; + + /** Flag to control amount of output for full map. */ + private static final boolean FULL_MAP_DEBUG = false; + + /** Context. */ + private final GridCacheContext<K, V> cctx; + + /** Logger. */ + private final IgniteLogger log; + + /** */ + private final ConcurrentMap<Integer, GridDhtLocalPartition<K, V>> locParts = + new ConcurrentHashMap8<>(); + + /** Node to partition map. */ + private GridDhtPartitionFullMap node2part; + + /** Partition to node map. */ + private Map<Integer, Set<UUID>> part2node = new HashMap<>(); + + /** */ + private GridDhtPartitionExchangeId lastExchangeId; + + /** */ + private long topVer = -1; + + /** A future that will be completed when topology with version topVer will be ready to use. */ + private GridDhtTopologyFuture topReadyFut; + + /** */ + private final GridAtomicLong updateSeq = new GridAtomicLong(1); + + /** Lock. */ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * @param cctx Context. + */ + GridDhtPartitionTopologyImpl(GridCacheContext<K, V> cctx) { + this.cctx = cctx; + + log = cctx.logger(getClass()); + } + + /** + * @return Full map string representation. + */ + @SuppressWarnings( {"ConstantConditions"}) + private String fullMapString() { + return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); + } + + /** + * @param map Map to get string for. + * @return Full map string representation. + */ + @SuppressWarnings( {"ConstantConditions"}) + private String mapString(GridDhtPartitionMap map) { + return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); + } + + /** + * Waits for renting partitions. + * + * @return {@code True} if mapping was changed. + * @throws IgniteCheckedException If failed. + */ + private boolean waitForRent() throws IgniteCheckedException { + boolean changed = false; + + // Synchronously wait for all renting partitions to complete. + for (Iterator<GridDhtLocalPartition<K, V>> it = locParts.values().iterator(); it.hasNext();) { + GridDhtLocalPartition<K, V> p = it.next(); + + GridDhtPartitionState state = p.state(); + + if (state == RENTING || state == EVICTED) { + if (log.isDebugEnabled()) + log.debug("Waiting for renting partition: " + p); + + // Wait for partition to empty out. + p.rent(true).get(); + + if (log.isDebugEnabled()) + log.debug("Finished waiting for renting partition: " + p); + + // Remove evicted partition. + it.remove(); + + changed = true; + } + } + + return changed; + } + + /** {@inheritDoc} */ + @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"}) + @Override public void readLock() { + lock.readLock().lock(); + } + + /** {@inheritDoc} */ + @Override public void readUnlock() { + lock.readLock().unlock(); + } + + /** {@inheritDoc} */ + @Override public void updateTopologyVersion(GridDhtPartitionExchangeId exchId, + GridDhtPartitionsExchangeFuture<K, V> exchFut) { + lock.writeLock().lock(); + + try { + assert exchId.topologyVersion() > topVer : "Invalid topology version [topVer=" + topVer + + ", exchId=" + exchId + ']'; + + topVer = exchId.topologyVersion(); + + topReadyFut = exchFut; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public long topologyVersion() { + lock.readLock().lock(); + + try { + assert topVer > 0; + + return topVer; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyVersionFuture() { + lock.readLock().lock(); + + try { + assert topReadyFut != null; + + return topReadyFut; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException { + waitForRent(); + + ClusterNode loc = cctx.localNode(); + + int num = cctx.affinity().partitions(); + + lock.writeLock().lock(); + + try { + assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; + + if (!exchId.isJoined()) + removeNode(exchId.nodeId()); + + // In case if node joins, get topology at the time of joining node. + ClusterNode oldest = CU.oldest(cctx, topVer); + + if (log.isDebugEnabled()) + log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + + long updateSeq = this.updateSeq.incrementAndGet(); + + // If this is the oldest node. + if (oldest.id().equals(loc.id())) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq); + + if (log.isDebugEnabled()) + log.debug("Created brand new full topology map on oldest node [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq, node2part, false); + + if (log.isDebugEnabled()) + log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + + node2part + ']'); + } + else if (!node2part.nodeId().equals(loc.id())) { + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq, node2part, false); + + if (log.isDebugEnabled()) + log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + } + + if (cctx.preloadEnabled()) { + for (int p = 0; p < num; p++) { + // If this is the first node in grid. + if (oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) { + assert exchId.isJoined(); + + try { + GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, true, false); + + assert locPart != null; + + boolean owned = locPart.own(); + + assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() + + ", part=" + locPart + ']'; + + if (log.isDebugEnabled()) + log.debug("Owned partition for oldest node: " + locPart); + + updateLocal(p, loc.id(), locPart.state(), updateSeq); + } + catch (GridDhtInvalidPartitionException e) { + if (log.isDebugEnabled()) + log.debug("Ignoring invalid partition on oldest node (no need to create a partition " + + "if it no longer belongs to local node: " + e.partition()); + } + } + // If this is not the first node in grid. + else { + if (node2part != null && node2part.valid()) { + if (cctx.affinity().localNode(p, topVer)) { + try { + // This will make sure that all non-existing partitions + // will be created in MOVING state. + GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, true, false); + + updateLocal(p, loc.id(), locPart.state(), updateSeq); + } + catch (GridDhtInvalidPartitionException e) { + if (log.isDebugEnabled()) + log.debug("Ignoring invalid partition (no need to create a partition if it " + + "no longer belongs to local node: " + e.partition()); + } + } + } + // If this node's map is empty, we pre-create local partitions, + // so local map will be sent correctly during exchange. + else if (cctx.affinity().localNode(p, topVer)) { + try { + localPartition(p, topVer, true, false); + } + catch (GridDhtInvalidPartitionException e) { + if (log.isDebugEnabled()) + log.debug("Ignoring invalid partition (no need to pre-create a partition if it " + + "no longer belongs to local node: " + e.partition()); + } + } + } + } + } + else { + // If preloader is disabled, then we simply clear out + // the partitions this node is not responsible for. + for (int p = 0; p < num; p++) { + GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, false, false); + + boolean belongs = cctx.affinity().localNode(p, topVer); + + if (locPart != null) { + if (!belongs) { + GridDhtPartitionState state = locPart.state(); + + if (state.active()) { + locPart.rent(false); + + updateLocal(p, loc.id(), locPart.state(), updateSeq); + + if (log.isDebugEnabled()) + log.debug("Evicting partition with preloading disabled " + + "(it does not belong to affinity): " + locPart); + } + } + } + else if (belongs) { + try { + // Pre-create partitions. + localPartition(p, topVer, true, false); + } + catch (GridDhtInvalidPartitionException e) { + if (log.isDebugEnabled()) + log.debug("Ignoring invalid partition with disabled preloader (no need to " + + "pre-create a partition if it no longer belongs to local node: " + e.partition()); + } + } + } + } + + if (node2part != null && node2part.valid()) + checkEvictions(updateSeq); + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); + } + finally { + lock.writeLock().unlock(); + } + + // Wait for evictions. + waitForRent(); + } + + /** {@inheritDoc} */ + @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException { + boolean changed = waitForRent(); + + ClusterNode loc = cctx.localNode(); + + int num = cctx.affinity().partitions(); + + long topVer = exchId.topologyVersion(); + + lock.writeLock().lock(); + + try { + assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; + + if (log.isDebugEnabled()) + log.debug("Partition map before afterExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); + + long updateSeq = this.updateSeq.incrementAndGet(); + + for (int p = 0; p < num; p++) { + GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, false, false); + + if (cctx.affinity().localNode(p, topVer)) { + // This partition will be created during next topology event, + // which obviously has not happened at this point. + if (locPart == null) { + if (log.isDebugEnabled()) + log.debug("Skipping local partition afterExchange (will not create): " + p); + + continue; + } + + GridDhtPartitionState state = locPart.state(); + + if (state == MOVING) { + if (cctx.preloadEnabled()) { + Collection<ClusterNode> owners = owners(p); + + // If there are no other owners, then become an owner. + if (F.isEmpty(owners)) { + boolean owned = locPart.own(); + + assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" + + locPart + ']'; + + updateLocal(p, loc.id(), locPart.state(), updateSeq); + + changed = true; + + if (log.isDebugEnabled()) + log.debug("Owned partition: " + locPart); + } + else if (log.isDebugEnabled()) + log.debug("Will not own partition (there are owners to preload from) [locPart=" + + locPart + ", owners = " + owners + ']'); + } + else + updateLocal(p, loc.id(), locPart.state(), updateSeq); + } + } + else { + if (locPart != null) { + GridDhtPartitionState state = locPart.state(); + + if (state == MOVING) { + locPart.rent(false); + + updateLocal(p, loc.id(), locPart.state(), updateSeq); + + changed = true; + + if (log.isDebugEnabled()) + log.debug("Evicting moving partition (it does not belong to affinity): " + locPart); + } + } + } + } + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } + + return changed; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) + throws GridDhtInvalidPartitionException { + return localPartition(p, topVer, create, true); + } + + /** + * @param p Partition number. + * @param topVer Topology version. + * @param create Create flag. + * @param updateSeq Update sequence. + * @return Local partition. + */ + private GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create, boolean updateSeq) { + while (true) { + boolean belongs = cctx.affinity().localNode(p, topVer); + + GridDhtLocalPartition<K, V> loc = locParts.get(p); + + if (loc != null && loc.state() == EVICTED) { + locParts.remove(p, loc); + + if (!create) + return null; + + if (!belongs) + throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition [part=" + p + + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); + + continue; + } + + if (loc == null && create) { + if (!belongs) + throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong [part=" + + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); + + lock.writeLock().lock(); + + try { + GridDhtLocalPartition<K, V> old = locParts.putIfAbsent(p, + loc = new GridDhtLocalPartition<>(cctx, p)); + + if (old != null) + loc = old; + else { + if (updateSeq) + this.updateSeq.incrementAndGet(); + + if (log.isDebugEnabled()) + log.debug("Created local partition: " + loc); + } + } + finally { + lock.writeLock().unlock(); + } + } + + return loc; + } + } + + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) { + return localPartition(cctx.affinity().partition(key), -1, create); + } + + /** {@inheritDoc} */ + @Override public List<GridDhtLocalPartition<K, V>> localPartitions() { + return new LinkedList<>(locParts.values()); + } + + /** {@inheritDoc} */ + @Override public Collection<GridDhtLocalPartition<K, V>> currentLocalPartitions() { + return locParts.values(); + } + + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e) { + /* + * Make sure not to acquire any locks here as this method + * may be called from sensitive synchronization blocks. + * =================================================== + */ + + int p = cctx.affinity().partition(e.key()); + + GridDhtLocalPartition<K, V> loc = localPartition(p, topVer, true); + + assert loc != null; + + loc.onAdded(e); + + return loc; + } + + /** {@inheritDoc} */ + @Override public void onRemoved(GridDhtCacheEntry<K, V> e) { + /* + * Make sure not to acquire any locks here as this method + * may be called from sensitive synchronization blocks. + * =================================================== + */ + + GridDhtLocalPartition<K, V> loc = localPartition(cctx.affinity().partition(e.key()), topologyVersion(), false); + + if (loc != null) + loc.onRemoved(e); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap localPartitionMap() { + lock.readLock().lock(); + + try { + return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(), + F.viewReadOnly(locParts, CU.<K, V>part2state()), true); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> nodes(int p, long topVer) { + Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); + + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + + ", node2part=" + node2part + ']'; + + Collection<ClusterNode> nodes = null; + + Collection<UUID> nodeIds = part2node.get(p); + + if (!F.isEmpty(nodeIds)) { + Collection<UUID> affIds = new HashSet<>(F.viewReadOnly(affNodes, F.node2id())); + + for (UUID nodeId : nodeIds) { + if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) { + ClusterNode n = cctx.discovery().node(nodeId); + + if (n != null && (topVer < 0 || n.order() <= topVer)) { + if (nodes == null) { + nodes = new ArrayList<>(affNodes.size() + 2); + + nodes.addAll(affNodes); + } + + nodes.add(n); + } + } + } + } + + return nodes != null ? nodes : affNodes; + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @param p Partition. + * @param topVer Topology version ({@code -1} for all nodes). + * @param state Partition state. + * @param states Additional partition states. + * @return List of nodes for the partition. + */ + private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { + Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + + ", allIds=" + allIds + ", node2part=" + node2part + ']'; + + Collection<UUID> nodeIds = part2node.get(p); + + // Node IDs can be null if both, primary and backup, nodes disappear. + int size = nodeIds == null ? 0 : nodeIds.size(); + + if (size == 0) + return Collections.emptyList(); + + List<ClusterNode> nodes = new ArrayList<>(size); + + for (UUID id : nodeIds) { + if (topVer > 0 && !allIds.contains(id)) + continue; + + if (hasState(p, id, state, states)) { + ClusterNode n = cctx.discovery().node(id); + + if (n != null && (topVer < 0 || n.order() <= topVer)) + nodes.add(n); + } + } + + return nodes; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> owners(int p, long topVer) { + if (!cctx.preloadEnabled()) + return ownersAndMoving(p, topVer); + + return nodes(p, topVer, OWNING); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> owners(int p) { + return owners(p, -1); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> moving(int p) { + if (!cctx.preloadEnabled()) + return ownersAndMoving(p, -1); + + return nodes(p, -1, MOVING); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return List of nodes in state OWNING or MOVING. + */ + private List<ClusterNode> ownersAndMoving(int p, long topVer) { + return nodes(p, topVer, OWNING, MOVING); + } + + /** {@inheritDoc} */ + @Override public long updateSequence() { + return updateSeq.get(); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) { + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + + ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']'; + + GridDhtPartitionFullMap m = node2part; + + return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionFullMap partMap) { + if (log.isDebugEnabled()) + log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + + lock.writeLock().lock(); + + try { + if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { + if (log.isDebugEnabled()) + log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + + lastExchangeId + ", exchId=" + exchId + ']'); + + return null; + } + + if (node2part != null && node2part.compareTo(partMap) >= 0) { + if (log.isDebugEnabled()) + log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + + lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + + return null; + } + + long updateSeq = this.updateSeq.incrementAndGet(); + + if (exchId != null) + lastExchangeId = exchId; + + if (node2part != null) { + for (GridDhtPartitionMap part : node2part.values()) { + GridDhtPartitionMap newPart = partMap.get(part.nodeId()); + + // If for some nodes current partition has a newer map, + // then we keep the newer value. + if (newPart != null && newPart.updateSequence() < part.updateSequence()) { + if (log.isDebugEnabled()) + log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + + mapString(part) + ", newPart=" + mapString(newPart) + ']'); + + partMap.put(part.nodeId(), part); + } + } + + for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext();) { + UUID nodeId = it.next(); + + if (!cctx.discovery().alive(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + + partMap + ']'); + + it.remove(); + } + } + } + + node2part = partMap; + + Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f); + + for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { + for (Integer p : e.getValue().keySet()) { + Set<UUID> ids = p2n.get(p); + + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partitions. + p2n.put(p, ids = U.newHashSet(3)); + + ids.add(e.getKey()); + } + } + + part2node = p2n; + + boolean changed = checkEvictions(updateSeq); + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after full update: " + fullMapString()); + + return changed ? localPartitionMap() : null; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionMap parts) { + if (log.isDebugEnabled()) + log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); + + if (!cctx.discovery().alive(parts.nodeId())) { + if (log.isDebugEnabled()) + log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + + ", parts=" + parts + ']'); + + return null; + } + + lock.writeLock().lock(); + + try { + if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { + if (log.isDebugEnabled()) + log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + + lastExchangeId + ", exchId=" + exchId + ']'); + + return null; + } + + if (exchId != null) + lastExchangeId = exchId; + + if (node2part == null) + // Create invalid partition map. + node2part = new GridDhtPartitionFullMap(); + + GridDhtPartitionMap cur = node2part.get(parts.nodeId()); + + if (cur != null && cur.updateSequence() >= parts.updateSequence()) { + if (log.isDebugEnabled()) + log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + + ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); + + return null; + } + + long updateSeq = this.updateSeq.incrementAndGet(); + + node2part = new GridDhtPartitionFullMap(node2part, updateSeq); + + boolean changed = false; + + if (cur == null || !cur.equals(parts)) + changed = true; + + node2part.put(parts.nodeId(), parts); + + part2node = new HashMap<>(part2node); + + // Add new mappings. + for (Integer p : parts.keySet()) { + Set<UUID> ids = part2node.get(p); + + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); + + changed |= ids.add(parts.nodeId()); + } + + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { + Set<UUID> ids = part2node.get(p); + + if (ids != null) + changed |= ids.remove(parts.nodeId()); + } + } + + changed |= checkEvictions(updateSeq); + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after single update: " + fullMapString()); + + return changed ? localPartitionMap() : null; + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param updateSeq Update sequence. + * @return Checks if any of the local partitions need to be evicted. + */ + private boolean checkEvictions(long updateSeq) { + assert lock.isWriteLockedByCurrentThread(); + + boolean changed = false; + + UUID locId = cctx.nodeId(); + + for (GridDhtLocalPartition<K, V> part : locParts.values()) { + GridDhtPartitionState state = part.state(); + + if (state.active()) { + int p = part.id(); + + List<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); + + if (!affNodes.contains(cctx.localNode())) { + Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING)); + + // If all affinity nodes are owners, then evict partition from local node. + if (nodeIds.containsAll(F.nodeIds(affNodes))) { + part.rent(false); + + updateLocal(part.id(), locId, part.state(), updateSeq); + + changed = true; + + if (log.isDebugEnabled()) + log.debug("Evicted local partition (all affinity nodes are owners): " + part); + } + else { + int ownerCnt = nodeIds.size(); + int affCnt = affNodes.size(); + + if (ownerCnt > affCnt) { + List<ClusterNode> sorted = new ArrayList<>(cctx.discovery().nodes(nodeIds)); + + // Sort by node orders in ascending order. + Collections.sort(sorted, CU.nodeComparator(true)); + + int diff = sorted.size() - affCnt; + + for (int i = 0; i < diff; i++) { + ClusterNode n = sorted.get(i); + + if (locId.equals(n.id())) { + part.rent(false); + + updateLocal(part.id(), locId, part.state(), updateSeq); + + changed = true; + + if (log.isDebugEnabled()) + log.debug("Evicted local partition (this node is oldest non-affinity node): " + + part); + + break; + } + } + } + } + } + } + } + + return changed; + } + + /** + * Updates value for single partition. + * + * @param p Partition. + * @param nodeId Node ID. + * @param state State. + * @param updateSeq Update sequence. + */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { + assert lock.isWriteLockedByCurrentThread(); + assert nodeId.equals(cctx.nodeId()); + + // In case if node joins, get topology at the time of joining node. + ClusterNode oldest = CU.oldest(cctx, topVer); + + // If this node became the oldest node. + if (oldest.id().equals(cctx.nodeId())) { + long seq = node2part.updateSequence(); + + if (seq != updateSeq) { + if (seq > updateSeq) { + if (this.updateSeq.get() < seq) { + // Update global counter if necessary. + boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1); + + assert b : "Invalid update sequence [updateSeq=" + updateSeq + ", seq=" + seq + + ", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString() + ']'; + + updateSeq = seq + 1; + } + else + updateSeq = seq; + } + + node2part.updateSequence(updateSeq); + } + } + + GridDhtPartitionMap map = node2part.get(nodeId); + + if (map == null) + node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, updateSeq, + Collections.<Integer, GridDhtPartitionState>emptyMap(), false)); + + map.updateSequence(updateSeq); + + map.put(p, state); + + Set<UUID> ids = part2node.get(p); + + if (ids == null) + part2node.put(p, ids = U.newHashSet(3)); + + ids.add(nodeId); + } + + /** + * @param nodeId Node to remove. + */ + private void removeNode(UUID nodeId) { + assert nodeId != null; + assert lock.writeLock().isHeldByCurrentThread(); + + ClusterNode oldest = CU.oldest(cctx, topVer); + + ClusterNode loc = cctx.localNode(); + + if (node2part != null) { + if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) { + updateSeq.setIfGreater(node2part.updateSequence()); + + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(), + node2part, false); + } + else + node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); + + part2node = new HashMap<>(part2node); + + GridDhtPartitionMap parts = node2part.remove(nodeId); + + if (parts != null) { + for (Integer p : parts.keySet()) { + Set<UUID> nodeIds = part2node.get(p); + + if (nodeIds != null) { + nodeIds.remove(nodeId); + + if (nodeIds.isEmpty()) + part2node.remove(p); + } + } + } + + consistencyCheck(); + } + } + + /** {@inheritDoc} */ + @Override public boolean own(GridDhtLocalPartition<K, V> part) { + ClusterNode loc = cctx.localNode(); + + lock.writeLock().lock(); + + try { + if (part.own()) { + updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); + + consistencyCheck(); + + return true; + } + + consistencyCheck(); + + return false; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onEvicted(GridDhtLocalPartition<K, V> part, boolean updateSeq) { + assert updateSeq || lock.isWriteLockedByCurrentThread(); + + lock.writeLock().lock(); + + try { + assert part.state() == EVICTED; + + long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); + + updateLocal(part.id(), cctx.localNodeId(), part.state(), seq); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) { + lock.readLock().lock(); + + try { + return node2part.get(nodeId); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats(int threshold) { + X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); + + for (GridDhtLocalPartition part : locParts.values()) { + int size = part.size(); + + if (size >= threshold) + X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); + } + } + + /** + * @param p Partition. + * @param nodeId Node ID. + * @param match State to match. + * @param matches Additional states. + * @return Filter for owners of this partition. + */ + private boolean hasState(final int p, @Nullable UUID nodeId, final GridDhtPartitionState match, + final GridDhtPartitionState... matches) { + if (nodeId == null) + return false; + + GridDhtPartitionMap parts = node2part.get(nodeId); + + // Set can be null if node has been removed. + if (parts != null) { + GridDhtPartitionState state = parts.get(p); + + if (state == match) + return true; + + if (matches != null && matches.length > 0) + for (GridDhtPartitionState s : matches) + if (state == s) + return true; + } + + return false; + } + + /** + * Checks consistency after all operations. + */ + private void consistencyCheck() { + if (CONSISTENCY_CHECK) { + assert lock.writeLock().isHeldByCurrentThread(); + + if (node2part == null) + return; + + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + for (Integer p : e.getValue().keySet()) { + Set<UUID> nodeIds = part2node.get(p); + + assert nodeIds != null : "Failed consistency check [part=" + p + ", nodeId=" + e.getKey() + ']'; + assert nodeIds.contains(e.getKey()) : "Failed consistency check [part=" + p + ", nodeId=" + + e.getKey() + ", nodeIds=" + nodeIds + ']'; + } + } + + for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) { + for (UUID nodeId : e.getValue()) { + GridDhtPartitionMap map = node2part.get(nodeId); + + assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']'; + assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() + + ", nodeId=" + nodeId + ']'; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java new file mode 100644 index 0000000..45eda34 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.managers.discovery.*; + +/** + * Future that implements a barrier after which dht topology is safe to use. Topology is considered to be + * safe to use when all transactions that involve moving primary partitions are completed and partition map + * exchange is also completed. + * <p/> + * When new new transaction is started, it will wait for this future before acquiring new locks on particular + * topology version. + */ +public interface GridDhtTopologyFuture extends IgniteFuture<Long> { + /** + * Gets a topology snapshot for the topology version represented by the future. Note that by the time + * partition exchange completes some nodes from the snapshot may leave the grid. One should use discovery + * service to check if the node is valid. + * <p/> + * This method will block until the topology future is ready. + * + * @return Topology snapshot for particular topology version. + * @throws IgniteCheckedException If topology future failed. + */ + public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException; +}