http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridClientPartitionTopology.java deleted file mode 100644 index 1fee0da..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ /dev/null @@ -1,816 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.locks.*; - -import static org.gridgain.grid.kernal.processors.cache.distributed.dht.GridDhtPartitionState.*; - -/** - * Partition topology for node which does not have any local partitions. - */ -@GridToStringExclude -public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopology<K, V> { - /** If true, then check consistency. */ - private static final boolean CONSISTENCY_CHECK = false; - - /** Flag to control amount of output for full map. */ - private static final boolean FULL_MAP_DEBUG = false; - - /** Cache shared context. */ - private GridCacheSharedContext<K, V> cctx; - - /** Cache ID. */ - private int cacheId; - - /** Logger. */ - private final IgniteLogger log; - - /** Node to partition map. */ - private GridDhtPartitionFullMap node2part; - - /** Partition to node map. */ - private Map<Integer, Set<UUID>> part2node = new HashMap<>(); - - /** */ - private GridDhtPartitionExchangeId lastExchangeId; - - /** */ - private long topVer = -1; - - /** A future that will be completed when topology with version topVer will be ready to use. */ - private GridDhtTopologyFuture topReadyFut; - - /** */ - private final GridAtomicLong updateSeq = new GridAtomicLong(1); - - /** Lock. */ - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - /** - * @param cctx Context. - * @param cacheId Cache ID. - * @param exchId Exchange ID. - */ - public GridClientPartitionTopology(GridCacheSharedContext<K, V> cctx, int cacheId, - GridDhtPartitionExchangeId exchId) { - this.cctx = cctx; - this.cacheId = cacheId; - - topVer = exchId.topologyVersion(); - - log = cctx.logger(getClass()); - - beforeExchange(exchId); - } - - /** - * @return Full map string representation. - */ - @SuppressWarnings( {"ConstantConditions"}) - private String fullMapString() { - return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); - } - - /** - * @param map Map to get string for. - * @return Full map string representation. - */ - @SuppressWarnings( {"ConstantConditions"}) - private String mapString(GridDhtPartitionMap map) { - return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); - } - - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - - /** {@inheritDoc} */ - @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"}) - @Override public void readLock() { - lock.readLock().lock(); - } - - /** {@inheritDoc} */ - @Override public void readUnlock() { - lock.readLock().unlock(); - } - - /** {@inheritDoc} */ - @Override public void updateTopologyVersion(GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture<K, V> exchFut) { - lock.writeLock().lock(); - - try { - assert exchId.topologyVersion() > topVer : "Invalid topology version [topVer=" + topVer + - ", exchId=" + exchId + ']'; - - topVer = exchId.topologyVersion(); - - topReadyFut = exchFut; - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public long topologyVersion() { - lock.readLock().lock(); - - try { - assert topVer > 0; - - return topVer; - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public GridDhtTopologyFuture topologyVersionFuture() { - lock.readLock().lock(); - - try { - assert topReadyFut != null; - - return topReadyFut; - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) { - ClusterNode loc = cctx.localNode(); - - lock.writeLock().lock(); - - try { - assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; - - if (!exchId.isJoined()) - removeNode(exchId.nodeId()); - - // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); - - if (log.isDebugEnabled()) - log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); - - long updateSeq = this.updateSeq.incrementAndGet(); - - // If this is the oldest node. - if (oldest.id().equals(loc.id())) { - if (node2part == null) { - node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq); - - if (log.isDebugEnabled()) - log.debug("Created brand new full topology map on oldest node [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } - else if (!node2part.valid()) { - node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq, node2part, false); - - if (log.isDebugEnabled()) - log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + - node2part + ']'); - } - else if (!node2part.nodeId().equals(loc.id())) { - node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq, node2part, false); - - if (log.isDebugEnabled()) - log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } - } - - consistencyCheck(); - - if (log.isDebugEnabled()) - log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + - fullMapString() + ']'); - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException { - long topVer = exchId.topologyVersion(); - - lock.writeLock().lock(); - - try { - assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; - - if (log.isDebugEnabled()) - log.debug("Partition map before afterExchange [exchId=" + exchId + ", fullMap=" + - fullMapString() + ']'); - - updateSeq.incrementAndGet(); - - consistencyCheck(); - } - finally { - lock.writeLock().unlock(); - } - - return false; - } - - /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) - throws GridDhtInvalidPartitionException { - if (!create) - return null; - - throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition [part=" + p + - ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); - } - - /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) { - return localPartition(1, -1, create); - } - - /** {@inheritDoc} */ - @Override public List<GridDhtLocalPartition<K, V>> localPartitions() { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Override public Collection<GridDhtLocalPartition<K, V>> currentLocalPartitions() { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e) { - assert false : "Entry should not be added to client topology: " + e; - - return null; - } - - /** {@inheritDoc} */ - @Override public void onRemoved(GridDhtCacheEntry<K, V> e) { - assert false : "Entry should not be removed from client topology: " + e; - } - - /** {@inheritDoc} */ - @Override public GridDhtPartitionMap localPartitionMap() { - lock.readLock().lock(); - - try { - return new GridDhtPartitionMap(cctx.localNodeId(), updateSeq.get(), - Collections.<Integer, GridDhtPartitionState>emptyMap(), true); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> nodes(int p, long topVer) { - lock.readLock().lock(); - - try { - assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + - ", node2part=" + node2part + ']'; - - Collection<ClusterNode> nodes = null; - - Collection<UUID> nodeIds = part2node.get(p); - - if (!F.isEmpty(nodeIds)) { - for (UUID nodeId : nodeIds) { - ClusterNode n = cctx.discovery().node(nodeId); - - if (n != null && (topVer < 0 || n.order() <= topVer)) { - if (nodes == null) - nodes = new ArrayList<>(); - - nodes.add(n); - } - } - } - - return nodes; - } - finally { - lock.readLock().unlock(); - } - } - - /** - * @param p Partition. - * @param topVer Topology version ({@code -1} for all nodes). - * @param state Partition state. - * @param states Additional partition states. - * @return List of nodes for the partition. - */ - private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; - - lock.readLock().lock(); - - try { - assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + - ", allIds=" + allIds + ", node2part=" + node2part + ']'; - - Collection<UUID> nodeIds = part2node.get(p); - - // Node IDs can be null if both, primary and backup, nodes disappear. - int size = nodeIds == null ? 0 : nodeIds.size(); - - if (size == 0) - return Collections.emptyList(); - - List<ClusterNode> nodes = new ArrayList<>(size); - - for (UUID id : nodeIds) { - if (topVer > 0 && !allIds.contains(id)) - continue; - - if (hasState(p, id, state, states)) { - ClusterNode n = cctx.discovery().node(id); - - if (n != null && (topVer < 0 || n.order() <= topVer)) - nodes.add(n); - } - } - - return nodes; - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> owners(int p, long topVer) { - return nodes(p, topVer, OWNING); - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> owners(int p) { - return owners(p, -1); - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> moving(int p) { - return nodes(p, -1, MOVING); - } - - /** - * @param p Partition. - * @param topVer Topology version. - * @return List of nodes in state OWNING or MOVING. - */ - private List<ClusterNode> ownersAndMoving(int p, long topVer) { - return nodes(p, topVer, OWNING, MOVING); - } - - /** {@inheritDoc} */ - @Override public long updateSequence() { - return updateSeq.get(); - } - - /** {@inheritDoc} */ - @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) { - lock.readLock().lock(); - - try { - assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.gridName() + ']'; - - GridDhtPartitionFullMap m = node2part; - - return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionFullMap partMap) { - if (log.isDebugEnabled()) - log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); - - lock.writeLock().lock(); - - try { - if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { - if (log.isDebugEnabled()) - log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); - - return null; - } - - if (node2part != null && node2part.compareTo(partMap) >= 0) { - if (log.isDebugEnabled()) - log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); - - return null; - } - - updateSeq.incrementAndGet(); - - if (exchId != null) - lastExchangeId = exchId; - - if (node2part != null) { - for (GridDhtPartitionMap part : node2part.values()) { - GridDhtPartitionMap newPart = partMap.get(part.nodeId()); - - // If for some nodes current partition has a newer map, - // then we keep the newer value. - if (newPart != null && newPart.updateSequence() < part.updateSequence()) { - if (log.isDebugEnabled()) - log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + - mapString(part) + ", newPart=" + mapString(newPart) + ']'); - - partMap.put(part.nodeId(), part); - } - } - - for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext();) { - UUID nodeId = it.next(); - - if (!cctx.discovery().alive(nodeId)) { - if (log.isDebugEnabled()) - log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + - partMap + ']'); - - it.remove(); - } - } - } - - node2part = partMap; - - Map<Integer, Set<UUID>> p2n = new HashMap<>(); - - for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { - for (Integer p : e.getValue().keySet()) { - Set<UUID> ids = p2n.get(p); - - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partitions. - p2n.put(p, ids = U.newHashSet(3)); - - ids.add(e.getKey()); - } - } - - part2node = p2n; - - consistencyCheck(); - - if (log.isDebugEnabled()) - log.debug("Partition map after full update: " + fullMapString()); - - return null; - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts) { - if (log.isDebugEnabled()) - log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); - - if (!cctx.discovery().alive(parts.nodeId())) { - if (log.isDebugEnabled()) - log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + - ", parts=" + parts + ']'); - - return null; - } - - lock.writeLock().lock(); - - try { - if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { - if (log.isDebugEnabled()) - log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); - - return null; - } - - if (exchId != null) - lastExchangeId = exchId; - - if (node2part == null) { - U.dumpStack(log, "Created invalid: " + node2part); - - // Create invalid partition map. - node2part = new GridDhtPartitionFullMap(); - } - - GridDhtPartitionMap cur = node2part.get(parts.nodeId()); - - if (cur != null && cur.updateSequence() >= parts.updateSequence()) { - if (log.isDebugEnabled()) - log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + - ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); - - return null; - } - - long updateSeq = this.updateSeq.incrementAndGet(); - - node2part = new GridDhtPartitionFullMap(node2part, updateSeq); - - boolean changed = false; - - if (cur == null || !cur.equals(parts)) - changed = true; - - node2part.put(parts.nodeId(), parts); - - part2node = new HashMap<>(part2node); - - // Add new mappings. - for (Integer p : parts.keySet()) { - Set<UUID> ids = part2node.get(p); - - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); - - changed |= ids.add(parts.nodeId()); - } - - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { - Set<UUID> ids = part2node.get(p); - - if (ids != null) - changed |= ids.remove(parts.nodeId()); - } - } - - consistencyCheck(); - - if (log.isDebugEnabled()) - log.debug("Partition map after single update: " + fullMapString()); - - return changed ? localPartitionMap() : null; - } - finally { - lock.writeLock().unlock(); - } - } - - /** - * Updates value for single partition. - * - * @param p Partition. - * @param nodeId Node ID. - * @param state State. - * @param updateSeq Update sequence. - */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { - assert lock.isWriteLockedByCurrentThread(); - assert nodeId.equals(cctx.localNodeId()); - - // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); - - // If this node became the oldest node. - if (oldest.id().equals(cctx.localNodeId())) { - long seq = node2part.updateSequence(); - - if (seq != updateSeq) { - if (seq > updateSeq) { - if (this.updateSeq.get() < seq) { - // Update global counter if necessary. - boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1); - - assert b : "Invalid update sequence [updateSeq=" + updateSeq + ", seq=" + seq + - ", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString() + ']'; - - updateSeq = seq + 1; - } - else - updateSeq = seq; - } - - node2part.updateSequence(updateSeq); - } - } - - GridDhtPartitionMap map = node2part.get(nodeId); - - if (map == null) - node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, updateSeq, - Collections.<Integer, GridDhtPartitionState>emptyMap(), false)); - - map.updateSequence(updateSeq); - - map.put(p, state); - - Set<UUID> ids = part2node.get(p); - - if (ids == null) - part2node.put(p, ids = U.newHashSet(3)); - - ids.add(nodeId); - } - - /** - * @param nodeId Node to remove. - */ - private void removeNode(UUID nodeId) { - assert nodeId != null; - assert lock.writeLock().isHeldByCurrentThread(); - - ClusterNode oldest = CU.oldest(cctx, topVer); - - ClusterNode loc = cctx.localNode(); - - if (node2part != null) { - if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) { - updateSeq.setIfGreater(node2part.updateSequence()); - - node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(), - node2part, false); - } - else - node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); - - part2node = new HashMap<>(part2node); - - GridDhtPartitionMap parts = node2part.remove(nodeId); - - if (parts != null) { - for (Integer p : parts.keySet()) { - Set<UUID> nodeIds = part2node.get(p); - - if (nodeIds != null) { - nodeIds.remove(nodeId); - - if (nodeIds.isEmpty()) - part2node.remove(p); - } - } - } - - consistencyCheck(); - } - } - - /** {@inheritDoc} */ - @Override public boolean own(GridDhtLocalPartition<K, V> part) { - assert false : "Client topology should never own a partition: " + part; - - return false; - } - - /** {@inheritDoc} */ - @Override public void onEvicted(GridDhtLocalPartition<K, V> part, boolean updateSeq) { - assert updateSeq || lock.isWriteLockedByCurrentThread(); - - lock.writeLock().lock(); - - try { - assert part.state() == EVICTED; - - long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); - - updateLocal(part.id(), cctx.localNodeId(), part.state(), seq); - - consistencyCheck(); - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) { - lock.readLock().lock(); - - try { - return node2part.get(nodeId); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats(int threshold) { - X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']'); - } - - /** - * @param p Partition. - * @param nodeId Node ID. - * @param match State to match. - * @param matches Additional states. - * @return Filter for owners of this partition. - */ - private boolean hasState(final int p, @Nullable UUID nodeId, final GridDhtPartitionState match, - final GridDhtPartitionState... matches) { - if (nodeId == null) - return false; - - GridDhtPartitionMap parts = node2part.get(nodeId); - - // Set can be null if node has been removed. - if (parts != null) { - GridDhtPartitionState state = parts.get(p); - - if (state == match) - return true; - - if (matches != null && matches.length > 0) - for (GridDhtPartitionState s : matches) - if (state == s) - return true; - } - - return false; - } - - /** - * Checks consistency after all operations. - */ - private void consistencyCheck() { - if (CONSISTENCY_CHECK) { - assert lock.writeLock().isHeldByCurrentThread(); - - if (node2part == null) - return; - - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - for (Integer p : e.getValue().keySet()) { - Set<UUID> nodeIds = part2node.get(p); - - assert nodeIds != null : "Failed consistency check [part=" + p + ", nodeId=" + e.getKey() + ']'; - assert nodeIds.contains(e.getKey()) : "Failed consistency check [part=" + p + ", nodeId=" + - e.getKey() + ", nodeIds=" + nodeIds + ']'; - } - } - - for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) { - for (UUID nodeId : e.getValue()) { - GridDhtPartitionMap map = node2part.get(nodeId); - - assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']'; - assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() + - ", nodeId=" + nodeId + ']'; - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java deleted file mode 100644 index 4c120ba..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.nio.*; - -/** - * Affinity assignment request. - */ -public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Topology version being queried. */ - private long topVer; - - /** - * Empty constructor. - */ - public GridDhtAffinityAssignmentRequest() { - // No-op. - } - - /** - * @param cacheId Cache ID. - * @param topVer Topology version. - */ - public GridDhtAffinityAssignmentRequest(int cacheId, long topVer) { - this.cacheId = cacheId; - this.topVer = topVer; - } - - /** {@inheritDoc} */ - @Override public boolean allowForStartup() { - return true; - } - - /** - * @return Requested topology version. - */ - @Override public long topologyVersion() { - return topVer; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 79; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDhtAffinityAssignmentRequest _clone = new GridDhtAffinityAssignmentRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDhtAffinityAssignmentRequest _clone = (GridDhtAffinityAssignmentRequest)_msg; - - _clone.topVer = topVer; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 3: - if (!commState.putLong(topVer)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 3: - if (buf.remaining() < 8) - return false; - - topVer = commState.getLong(); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtAffinityAssignmentRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java deleted file mode 100644 index 2f9a84f..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.nio.*; -import java.util.*; - -/** - * Affinity assignment response. - */ -public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Topology version. */ - private long topVer; - - /** Affinity assignment. */ - @GridDirectTransient - @GridToStringInclude - private List<List<ClusterNode>> affAssignment; - - /** Affinity assignment bytes. */ - private byte[] affAssignmentBytes; - - /** - * Empty constructor. - */ - public GridDhtAffinityAssignmentResponse() { - // No-op. - } - - /** - * @param cacheId Cache ID. - * @param topVer Topology version. - * @param affAssignment Affinity assignment. - */ - public GridDhtAffinityAssignmentResponse(int cacheId, long topVer, List<List<ClusterNode>> affAssignment) { - this.cacheId = cacheId; - this.topVer = topVer; - this.affAssignment = affAssignment; - } - - /** {@inheritDoc} */ - @Override public boolean allowForStartup() { - return true; - } - - /** - * @return Topology version. - */ - @Override public long topologyVersion() { - return topVer; - } - - /** - * @return Affinity assignment. - */ - public List<List<ClusterNode>> affinityAssignment() { - return affAssignment; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 80; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDhtAffinityAssignmentResponse _clone = new GridDhtAffinityAssignmentResponse(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDhtAffinityAssignmentResponse _clone = (GridDhtAffinityAssignmentResponse)_msg; - - _clone.topVer = topVer; - _clone.affAssignment = affAssignment; - _clone.affAssignmentBytes = affAssignmentBytes; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (affAssignment != null) - affAssignmentBytes = ctx.marshaller().marshal(affAssignment); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (affAssignmentBytes != null) - affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, ldr); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 3: - if (!commState.putByteArray(affAssignmentBytes)) - return false; - - commState.idx++; - - case 4: - if (!commState.putLong(topVer)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 3: - byte[] affAssignmentBytes0 = commState.getByteArray(); - - if (affAssignmentBytes0 == BYTE_ARR_NOT_READ) - return false; - - affAssignmentBytes = affAssignmentBytes0; - - commState.idx++; - - case 4: - if (buf.remaining() < 8) - return false; - - topVer = commState.getLong(); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtAffinityAssignmentResponse.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java deleted file mode 100644 index 79e9ee8..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; - -/** - * Future that fetches affinity assignment from remote cache nodes. - */ -public class GridDhtAssignmentFetchFuture<K, V> extends GridFutureAdapter<List<List<ClusterNode>>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Nodes order comparator. */ - private static final Comparator<ClusterNode> CMP = new GridNodeOrderComparator(); - - /** Cache context. */ - private final GridCacheContext<K, V> ctx; - - /** List of available nodes this future can fetch data from. */ - private Queue<ClusterNode> availableNodes; - - /** Topology version. */ - private final long topVer; - - /** Pending node from which response is being awaited. */ - private ClusterNode pendingNode; - - /** - * @param ctx Cache context. - * @param availableNodes Available nodes. - */ - public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, long topVer, Collection<ClusterNode> availableNodes) { - super(ctx.kernalContext()); - - this.ctx = ctx; - - this.topVer = topVer; - - LinkedList<ClusterNode> tmp = new LinkedList<>(); - tmp.addAll(availableNodes); - Collections.sort(tmp, CMP); - - this.availableNodes = tmp; - } - - /** - * Initializes fetch future. - */ - public void init() { - ((GridDhtPreloader<K, V>)ctx.preloader()).addDhtAssignmentFetchFuture(topVer, this); - - requestFromNextNode(); - } - - /** - * @param node Node. - * @param res Reponse. - */ - public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse<K, V> res) { - if (res.topologyVersion() != topVer) { - if (log.isDebugEnabled()) - log.debug("Received affinity assignment for wrong topolgy version (will ignore) " + - "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']'); - - return; - } - - List<List<ClusterNode>> assignment = null; - - synchronized (this) { - if (pendingNode != null && pendingNode.equals(node)) - assignment = res.affinityAssignment(); - } - - if (assignment != null) - onDone(assignment); - } - - /** - * @param leftNodeId Left node ID. - */ - public void onNodeLeft(UUID leftNodeId) { - synchronized (this) { - if (pendingNode != null && pendingNode.id().equals(leftNodeId)) { - availableNodes.remove(pendingNode); - - pendingNode = null; - } - } - - requestFromNextNode(); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable List<List<ClusterNode>> res, @Nullable Throwable err) { - if (super.onDone(res, err)) { - ((GridDhtPreloader<K, V>)ctx.preloader()).removeDhtAssignmentFetchFuture(topVer, this); - - return true; - } - - return false; - } - - /** - * Requests affinity from next node in the list. - */ - private void requestFromNextNode() { - boolean complete; - - // Avoid 'protected field is accessed in synchronized context' warning. - IgniteLogger log0 = log; - - synchronized (this) { - while (!availableNodes.isEmpty()) { - ClusterNode node = availableNodes.poll(); - - try { - if (log0.isDebugEnabled()) - log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() + - ", node=" + node + ']'); - - ctx.io().send(node, new GridDhtAffinityAssignmentRequest<K, V>(ctx.cacheId(), topVer), - AFFINITY_POOL); - - // Close window for listener notification. - if (ctx.discovery().node(node.id()) == null) { - U.warn(log0, "Failed to request affinity assignment from remote node (node left grid, will " + - "continue to another node): " + node); - - continue; - } - - pendingNode = node; - - break; - } - catch (ClusterTopologyException ignored) { - U.warn(log0, "Failed to request affinity assignment from remote node (node left grid, will " + - "continue to another node): " + node); - } - catch (IgniteCheckedException e) { - U.error(log0, "Failed to request affinity assignment from remote node (will " + - "continue to another node): " + node, e); - } - } - - complete = pendingNode == null; - } - - // No more nodes left, complete future with null outside of synchronization. - // Affinity should be calculated from scratch. - if (complete) - onDone((List<List<ClusterNode>>)null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCache.java deleted file mode 100644 index 9993a72..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCache.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.util.tostring.*; - -import java.io.*; - -/** - * DHT cache. - */ -public class GridDhtCache<K, V> extends GridDhtTransactionalCacheAdapter<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Near cache. */ - @GridToStringExclude - private GridNearTransactionalCache<K, V> near; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtCache() { - // No-op. - } - - /** - * @param ctx Context. - */ - public GridDhtCache(GridCacheContext<K, V> ctx) { - super(ctx); - } - - /** - * @param ctx Cache context. - * @param map Cache map. - */ - public GridDhtCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { - super(ctx, map); - } - - /** {@inheritDoc} */ - @Override public boolean isDht() { - return true; - } - - /** {@inheritDoc} */ - @Override public String name() { - String name = super.name(); - - return name == null ? "defaultDhtCache" : name + "Dht"; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - resetMetrics(); - - super.start(); - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - GridCacheMetricsAdapter m = new GridCacheMetricsAdapter(); - - m.delegate(ctx.dht().near().metrics0()); - - metrics = m; - - ctx.dr().resetMetrics(); - } - - /** - * @return Near cache. - */ - @Override public GridNearTransactionalCache<K, V> near() { - return near; - } - - /** - * @param near Near cache. - */ - public void near(GridNearTransactionalCache<K, V> near) { - this.near = near; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java deleted file mode 100644 index 3be1bbf..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ /dev/null @@ -1,1017 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; -import static org.apache.ignite.internal.processors.dr.GridDrType.*; - -/** - * DHT cache adapter. - */ -public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdapter<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Topology. */ - private GridDhtPartitionTopology<K, V> top; - - /** Preloader. */ - protected GridCachePreloader<K, V> preldr; - - /** Multi tx future holder. */ - private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>(); - - /** Multi tx futures. */ - private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new ConcurrentHashMap8<>(); - - /** - * Empty constructor required for {@link Externalizable}. - */ - protected GridDhtCacheAdapter() { - // No-op. - } - - /** - * @param ctx Context. - */ - protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) { - super(ctx, ctx.config().getStartSize()); - - top = new GridDhtPartitionTopologyImpl<>(ctx); - } - - /** - * Constructor used for near-only cache. - * - * @param ctx Cache context. - * @param map Cache map. - */ - protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { - super(ctx, map); - - top = new GridDhtPartitionTopologyImpl<>(ctx); - } - - /** {@inheritDoc} */ - @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { - /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { - return new GridDhtCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); - } - }); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - super.start(); - - ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest<K, V> req) { - processTtlUpdateRequest(req); - } - }); - } - - /** {@inheritDoc} */ - @Override public void stop() { - super.stop(); - - if (preldr != null) - preldr.stop(); - - // Clean up to help GC. - preldr = null; - top = null; - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - preldr.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop() { - super.onKernalStop(); - - if (preldr != null) - preldr.onKernalStop(); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - super.printMemoryStats(); - - top.printMemoryStats(1024); - } - - /** - * @return Near cache. - */ - public abstract GridNearCacheAdapter<K, V> near(); - - /** - * @return Partition topology. - */ - public GridDhtPartitionTopology<K, V> topology() { - return top; - } - - /** {@inheritDoc} */ - @Override public GridCachePreloader<K, V> preloader() { - return preldr; - } - - /** - * @return DHT preloader. - */ - public GridDhtPreloader<K, V> dhtPreloader() { - assert preldr instanceof GridDhtPreloader; - - return (GridDhtPreloader<K, V>)preldr; - } - - /** - * @return Topology version future registered for multi-update. - */ - @Nullable public GridDhtTopologyFuture multiUpdateTopologyFuture() { - IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get(); - - return tup == null ? null : tup.get2(); - } - - /** - * Starts multi-update lock. Will wait for topology future is ready. - * - * @return Topology version. - * @throws IgniteCheckedException If failed. - */ - public long beginMultiUpdate() throws IgniteCheckedException { - IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get(); - - if (tup != null) - throw new IgniteCheckedException("Nested multi-update locks are not supported"); - - top.readLock(); - - GridDhtTopologyFuture topFut; - - long topVer; - - try { - // While we are holding read lock, register lock future for partition release future. - IgniteUuid lockId = IgniteUuid.fromUuid(ctx.localNodeId()); - - topVer = top.topologyVersion(); - - MultiUpdateFuture fut = new MultiUpdateFuture(ctx.kernalContext(), topVer); - - MultiUpdateFuture old = multiTxFuts.putIfAbsent(lockId, fut); - - assert old == null; - - topFut = top.topologyVersionFuture(); - - multiTxHolder.set(F.t(lockId, topFut)); - } - finally { - top.readUnlock(); - } - - topFut.get(); - - return topVer; - } - - /** - * Ends multi-update lock. - * - * @throws IgniteCheckedException If failed. - */ - public void endMultiUpdate() throws IgniteCheckedException { - IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get(); - - if (tup == null) - throw new IgniteCheckedException("Multi-update was not started or released twice."); - - top.readLock(); - - try { - IgniteUuid lockId = tup.get1(); - - MultiUpdateFuture multiFut = multiTxFuts.remove(lockId); - - multiTxHolder.set(null); - - // Finish future. - multiFut.onDone(lockId); - } - finally { - top.readUnlock(); - } - } - - /** - * Creates multi update finish future. Will return {@code null} if no multi-update locks are found. - * - * @param topVer Topology version. - * @return Finish future. - */ - @Nullable public IgniteFuture<?> multiUpdateFinishFuture(long topVer) { - GridCompoundFuture<IgniteUuid, Object> fut = null; - - for (MultiUpdateFuture multiFut : multiTxFuts.values()) { - if (multiFut.topologyVersion() <= topVer) { - if (fut == null) - fut = new GridCompoundFuture<>(ctx.kernalContext()); - - fut.add(multiFut); - } - } - - if (fut != null) - fut.markInitialized(); - - return fut; - } - - /** - * @param key Key. - * @return DHT entry. - */ - @Nullable public GridDhtCacheEntry<K, V> peekExx(K key) { - return (GridDhtCacheEntry<K, V>)peekEx(key); - } - - /** - * {@inheritDoc} - * - * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. - */ - @Override public GridCacheEntry<K, V> entry(K key) throws GridDhtInvalidPartitionException { - return super.entry(key); - } - - /** - * {@inheritDoc} - * - * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. - */ - @Override public GridCacheEntryEx<K, V> entryEx(K key, boolean touch) throws GridDhtInvalidPartitionException { - return super.entryEx(key, touch); - } - - /** - * {@inheritDoc} - * - * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. - */ - @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) throws GridDhtInvalidPartitionException { - return super.entryEx(key, topVer); - } - - /** - * @param key Key. - * @return DHT entry. - * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. - */ - public GridDhtCacheEntry<K, V> entryExx(K key) throws GridDhtInvalidPartitionException { - return (GridDhtCacheEntry<K, V>)entryEx(key); - } - - /** - * @param key Key. - * @param topVer Topology version. - * @return DHT entry. - * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. - */ - public GridDhtCacheEntry<K, V> entryExx(K key, long topVer) throws GridDhtInvalidPartitionException { - return (GridDhtCacheEntry<K, V>)entryEx(key, topVer); - } - - /** - * Gets or creates entry for given key. If key belongs to local node, dht entry will be returned, otherwise - * if {@code allowDetached} is {@code true}, detached entry will be returned, otherwise exception will be - * thrown. - * - * @param key Key for which entry should be returned. - * @param allowDetached Whether to allow detached entries. - * @param touch {@code True} if entry should be passed to eviction policy. - * @return Cache entry. - * @throws GridDhtInvalidPartitionException if entry does not belong to this node and - * {@code allowDetached} is {@code false}. - */ - public GridCacheEntryEx<K, V> entryExx(K key, long topVer, boolean allowDetached, boolean touch) { - try { - return allowDetached && !ctx.affinity().localNode(key, topVer) ? - new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : - entryEx(key, touch); - } - catch (GridDhtInvalidPartitionException e) { - if (!allowDetached) - throw e; - - return new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0); - } - } - - /** {@inheritDoc} */ - @Override public void localLoad(Collection<? extends K> keys) throws IgniteCheckedException { - if (ctx.store().isLocalStore()) { - super.localLoad(keys); - - return; - } - - // Version for all loaded entries. - final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion()); - - final boolean replicate = ctx.isDrEnabled(); - - final long topVer = ctx.affinity().affinityTopologyVersion(); - - ctx.store().loadAllFromStore(null, keys, new CI2<K, V>() { - @Override public void apply(K key, V val) { - loadEntry(key, val, ver0, null, topVer, replicate, 0); - } - }); - } - - /** {@inheritDoc} */ - @Override public void loadCache(final IgniteBiPredicate<K, V> p, final long ttl, Object[] args) throws IgniteCheckedException { - if (ctx.store().isLocalStore()) { - super.loadCache(p, ttl, args); - - return; - } - - // Version for all loaded entries. - final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion()); - - final boolean replicate = ctx.isDrEnabled(); - - final long topVer = ctx.affinity().affinityTopologyVersion(); - - ctx.store().loadCache(new CI3<K, V, GridCacheVersion>() { - @Override public void apply(K key, V val, @Nullable GridCacheVersion ver) { - assert ver == null; - - loadEntry(key, val, ver0, p, topVer, replicate, ttl); - } - }, args); - } - - /** - * @param key Key. - * @param val Value. - * @param ver Cache version. - * @param p Optional predicate. - * @param topVer Topology version. - * @param replicate Replication flag. - * @param ttl TTL. - */ - private void loadEntry(K key, - V val, - GridCacheVersion ver, - @Nullable IgniteBiPredicate<K, V> p, - long topVer, - boolean replicate, - long ttl) { - if (p != null && !p.apply(key, val)) - return; - - try { - GridDhtLocalPartition<K, V> part = top.localPartition(ctx.affinity().partition(key), -1, true); - - // Reserve to make sure that partition does not get unloaded. - if (part.reserve()) { - GridCacheEntryEx<K, V> entry = null; - - try { - if (ctx.portableEnabled()) { - key = (K)ctx.marshalToPortable(key); - val = (V)ctx.marshalToPortable(val); - } - - entry = entryEx(key, false); - - entry.initialValue(val, null, ver, ttl, -1, false, topVer, replicate ? DR_LOAD : DR_NONE); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to put cache value: " + entry, e); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during loadCache (will ignore): " + entry); - } - finally { - if (entry != null) - entry.context().evicts().touch(entry, topVer); - - part.release(); - } - } - else if (log.isDebugEnabled()) - log.debug("Will node load entry into cache (partition is invalid): " + part); - } - catch (GridDhtInvalidPartitionException e) { - if (log.isDebugEnabled()) - log.debug("Ignoring entry for partition that does not belong [key=" + key + ", val=" + val + - ", err=" + e + ']'); - } - } - - /** {@inheritDoc} */ - @Override public int primarySize() { - int sum = 0; - - long topVer = ctx.affinity().affinityTopologyVersion(); - - for (GridDhtLocalPartition<K, V> p : topology().currentLocalPartitions()) { - if (p.primary(topVer)) - sum += p.publicSize(); - } - - return sum; - } - - /** - * This method is used internally. Use - * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, long, UUID, int, boolean, IgnitePredicate[], IgniteCacheExpiryPolicy)} - * method instead to retrieve DHT value. - * - * @param keys {@inheritDoc} - * @param forcePrimary {@inheritDoc} - * @param skipTx {@inheritDoc} - * @param filter {@inheritDoc} - * @return {@inheritDoc} - */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( - @Nullable Collection<? extends K> keys, - boolean forcePrimary, - boolean skipTx, - @Nullable GridCacheEntryEx<K, V> entry, - @Nullable UUID subjId, - String taskName, - boolean deserializePortable, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter - ) { - return getAllAsync(keys, - true, - null, - /*don't check local tx. */false, - subjId, - taskName, - deserializePortable, - forcePrimary, - null, - filter); - } - - /** {@inheritDoc} */ - @Override public V reload(K key, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) - throws IgniteCheckedException { - try { - return super.reload(key, filter); - } - catch (GridDhtInvalidPartitionException ignored) { - return null; - } - } - - /** - * @param keys Keys to get - * @param readThrough Read through flag. - * @param subjId Subject ID. - * @param taskName Task name. - * @param deserializePortable Deserialize portable flag. - * @param filter Optional filter. - * @param expiry Expiry policy. - * @return Get future. - */ - IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, - boolean readThrough, - @Nullable UUID subjId, - String taskName, - boolean deserializePortable, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable IgniteCacheExpiryPolicy expiry - ) { - return getAllAsync(keys, - readThrough, - null, - /*don't check local tx. */false, - subjId, - taskName, - deserializePortable, - false, - expiry, - filter); - } - - /** - * @param reader Reader node ID. - * @param msgId Message ID. - * @param keys Keys to get. - * @param readThrough Read through flag. - * @param reload Reload flag. - * @param topVer Topology version. - * @param subjId Subject ID. - * @param taskNameHash Task name hash code. - * @param deserializePortable Deserialize portable flag. - * @param filter Optional filter. - * @param expiry Expiry policy. - * @return DHT future. - */ - public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader, - long msgId, - LinkedHashMap<? extends K, Boolean> keys, - boolean readThrough, - boolean reload, - long topVer, - @Nullable UUID subjId, - int taskNameHash, - boolean deserializePortable, - IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable IgniteCacheExpiryPolicy expiry) { - GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, - msgId, - reader, - keys, - readThrough, - reload, - /*tx*/null, - topVer, - filter, - subjId, - taskNameHash, - deserializePortable, - expiry); - - fut.init(); - - return fut; - } - - /** - * @param nodeId Node ID. - * @param req Get request. - */ - protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest<K, V> req) { - assert isAffinityNode(cacheCfg); - - long ttl = req.accessTtl(); - - final GetExpiryPolicy expiryPlc = ttl == -1L ? null : new GetExpiryPolicy(ttl); - - IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut = - getDhtAsync(nodeId, - req.messageId(), - req.keys(), - req.readThrough(), - req.reload(), - req.topologyVersion(), - req.subjectId(), - req.taskNameHash(), - false, - req.filter(), - expiryPlc); - - fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() { - @Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) { - GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(), - req.futureId(), - req.miniId(), - req.version()); - - GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut = - (GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>>)f; - - try { - Collection<GridCacheEntryInfo<K, V>> entries = fut.get(); - - res.entries(entries); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed processing get request: " + req, e); - - res.error(e); - } - - res.invalidPartitions(fut.invalidPartitions(), ctx.discovery().topologyVersion()); - - try { - ctx.io().send(nodeId, res); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + - ",req=" + req + ", res=" + res + ']', e); - } - - sendTtlUpdateRequest(expiryPlc); - } - }); - } - - /** - * @param expiryPlc Expiry policy. - */ - public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy expiryPlc) { - if (expiryPlc != null && expiryPlc.entries() != null) { - ctx.closures().runLocalSafe(new Runnable() { - @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"}) - @Override public void run() { - Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries = expiryPlc.entries(); - - assert entries != null && !entries.isEmpty(); - - Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = new HashMap<>(); - - long topVer = ctx.discovery().topologyVersion(); - - for (Map.Entry<Object, IgniteBiTuple<byte[], GridCacheVersion>> e : entries.entrySet()) { - List<ClusterNode> nodes = ctx.affinity().nodes((K)e.getKey(), topVer); - - for (int i = 0; i < nodes.size(); i++) { - ClusterNode node = nodes.get(i); - - if (!node.isLocal()) { - GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node); - - if (req == null) { - reqMap.put(node, - req = new GridCacheTtlUpdateRequest<>(topVer, expiryPlc.forAccess())); - - req.cacheId(ctx.cacheId()); - } - - req.addEntry(e.getValue().get1(), e.getValue().get2()); - } - } - } - - Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrs = expiryPlc.readers(); - - if (rdrs != null) { - assert !rdrs.isEmpty(); - - for (Map.Entry<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> e : rdrs.entrySet()) { - ClusterNode node = ctx.node(e.getKey()); - - if (node != null) { - GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node); - - if (req == null) { - reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(topVer, - expiryPlc.forAccess())); - - req.cacheId(ctx.cacheId()); - } - - for (IgniteBiTuple<byte[], GridCacheVersion> t : e.getValue()) - req.addNearEntry(t.get1(), t.get2()); - } - } - } - - for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) { - try { - ctx.io().send(req.getKey(), req.getValue()); - } - catch (IgniteCheckedException e) { - log.error("Failed to send TTL update request.", e); - } - } - } - }); - } - } - - /** - * @param req Request. - */ - private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) { - if (req.keys() != null) - updateTtl(this, req.keys(), req.versions(), req.ttl()); - - if (req.nearKeys() != null) { - GridNearCacheAdapter<K, V> near = near(); - - assert near != null; - - updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl()); - } - } - - /** - * @param cache Cache. - * @param keys Entries keys. - * @param vers Entries versions. - * @param ttl TTL. - */ - private void updateTtl(GridCacheAdapter<K, V> cache, - List<K> keys, - List<GridCacheVersion> vers, - long ttl) { - assert !F.isEmpty(keys); - assert keys.size() == vers.size(); - - int size = keys.size(); - - boolean swap = cache.context().isSwapOrOffheapEnabled(); - - for (int i = 0; i < size; i++) { - try { - GridCacheEntryEx<K, V> entry = null; - - try { - if (swap) { - entry = cache.entryEx(keys.get(i)); - - entry.unswap(true, false); - } - else - entry = cache.peekEx(keys.get(i)); - - if (entry != null) - entry.updateTtl(vers.get(i), ttl); - } - finally { - if (entry != null) - cache.context().evicts().touch(entry, -1L); - } - } - catch (IgniteCheckedException e) { - log.error("Failed to unswap entry.", e); - } - } - } - - /** {@inheritDoc} */ - @Override public void unlockAll(Collection<? extends K> keys, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - assert false; - } - - /** {@inheritDoc} */ - @Override public Set<GridCacheEntry<K, V>> entrySet(int part) { - return new PartitionEntrySet(part); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtCacheAdapter.class, this); - } - - /** - * - */ - private class PartitionEntrySet extends AbstractSet<GridCacheEntry<K, V>> { - /** */ - private int partId; - - /** - * @param partId Partition id. - */ - private PartitionEntrySet(int partId) { - this.partId = partId; - } - - /** {@inheritDoc} */ - @NotNull @Override public Iterator<GridCacheEntry<K, V>> iterator() { - final GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId, - ctx.discovery().topologyVersion(), false); - - Iterator<GridDhtCacheEntry<K, V>> partIt = part == null ? null : part.entries().iterator(); - - return new PartitionEntryIterator<>(partIt); - } - - /** {@inheritDoc} */ - @Override public boolean remove(Object o) { - if (!(o instanceof GridCacheEntry)) - return false; - - GridCacheEntry<K, V> entry = (GridCacheEntry<K, V>)o; - - K key = entry.getKey(); - V val = entry.peek(); - - if (val == null) - return false; - - try { - // Cannot use remove(key, val) since we may be in DHT cache and should go through near. - return entry(key).remove(val); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public boolean removeAll(Collection<?> c) { - boolean rmv = false; - - for (Object o : c) - rmv |= remove(o); - - return rmv; - } - - /** {@inheritDoc} */ - @Override public boolean contains(Object o) { - if (!(o instanceof GridCacheEntry)) - return false; - - GridCacheEntry<K, V> entry = (GridCacheEntry<K, V>)o; - - return partId == entry.partition() && F.eq(entry.peek(), peek(entry.getKey())); - } - - /** {@inheritDoc} */ - @Override public int size() { - GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId, - ctx.discovery().topologyVersion(), false); - - return part != null ? part.publicSize() : 0; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(PartitionEntrySet.class, this, "super", super.toString()); - } - } - - /** {@inheritDoc} */ - @Override public List<GridCacheClearAllRunnable<K, V>> splitClearAll() { - GridCacheDistributionMode mode = configuration().getDistributionMode(); - - return (mode == PARTITIONED_ONLY || mode == NEAR_PARTITIONED) ? super.splitClearAll() : - Collections.<GridCacheClearAllRunnable<K, V>>emptyList(); - } - - /** {@inheritDoc} */ - @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion ver) { - assert entry.isDht(); - - GridDhtLocalPartition<K, V> part = topology().localPartition(entry.partition(), -1, false); - - // Do not remove entry on replica topology. Instead, add entry to removal queue. - // It will be cleared eventually. - if (part != null) { - try { - part.onDeferredDelete(entry.key(), ver); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to enqueue deleted entry [key=" + entry.key() + ", ver=" + ver + ']', e); - } - } - } - - /** - * Complex partition iterator for both partition and swap iteration. - */ - private static class PartitionEntryIterator<K, V> extends GridIteratorAdapter<GridCacheEntry<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Next entry. */ - private GridCacheEntry<K, V> entry; - - /** Last seen entry to support remove. */ - private GridCacheEntry<K, V> last; - - /** Partition iterator. */ - private final Iterator<GridDhtCacheEntry<K, V>> partIt; - - /** - * @param partIt Partition iterator. - */ - private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry<K, V>> partIt) { - this.partIt = partIt; - - advance(); - } - - /** {@inheritDoc} */ - @Override public boolean hasNextX() { - return entry != null; - } - - /** {@inheritDoc} */ - @Override public GridCacheEntry<K, V> nextX() throws IgniteCheckedException { - if (!hasNext()) - throw new NoSuchElementException(); - - last = entry; - - advance(); - - return last; - } - - /** {@inheritDoc} */ - @Override public void removeX() throws IgniteCheckedException { - if (last == null) - throw new IllegalStateException(); - - last.remove(); - } - - /** - * - */ - private void advance() { - if (partIt != null) { - while (partIt.hasNext()) { - GridDhtCacheEntry<K, V> next = partIt.next(); - - if (next.isInternal() || !next.visitable(CU.<K, V>empty())) - continue; - - entry = next.wrap(true); - - return; - } - } - - entry = null; - } - } - - /** - * Multi update future. - */ - private static class MultiUpdateFuture extends GridFutureAdapter<IgniteUuid> { - /** */ - private static final long serialVersionUID = 0L; - - /** Topology version. */ - private long topVer; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public MultiUpdateFuture() { - // No-op. - } - - /** - * @param ctx Kernal context. - * @param topVer Topology version. - */ - private MultiUpdateFuture(GridKernalContext ctx, long topVer) { - super(ctx); - - this.topVer = topVer; - } - - /** - * @return Topology version. - */ - private long topologyVersion() { - return topVer; - } - } -}