http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridClientPartitionTopology.java
deleted file mode 100644
index 1fee0da..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ /dev/null
@@ -1,816 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-import static 
org.gridgain.grid.kernal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-
-/**
- * Partition topology for node which does not have any local partitions.
- */
-@GridToStringExclude
-public class GridClientPartitionTopology<K, V> implements 
GridDhtPartitionTopology<K, V> {
-    /** If true, then check consistency. */
-    private static final boolean CONSISTENCY_CHECK = false;
-
-    /** Flag to control amount of output for full map. */
-    private static final boolean FULL_MAP_DEBUG = false;
-
-    /** Cache shared context. */
-    private GridCacheSharedContext<K, V> cctx;
-
-    /** Cache ID. */
-    private int cacheId;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /** Node to partition map. */
-    private GridDhtPartitionFullMap node2part;
-
-    /** Partition to node map. */
-    private Map<Integer, Set<UUID>> part2node = new HashMap<>();
-
-    /** */
-    private GridDhtPartitionExchangeId lastExchangeId;
-
-    /** */
-    private long topVer = -1;
-
-    /** A future that will be completed when topology with version topVer will 
be ready to use. */
-    private GridDhtTopologyFuture topReadyFut;
-
-    /** */
-    private final GridAtomicLong updateSeq = new GridAtomicLong(1);
-
-    /** Lock. */
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-    /**
-     * @param cctx Context.
-     * @param cacheId Cache ID.
-     * @param exchId Exchange ID.
-     */
-    public GridClientPartitionTopology(GridCacheSharedContext<K, V> cctx, int 
cacheId,
-        GridDhtPartitionExchangeId exchId) {
-        this.cctx = cctx;
-        this.cacheId = cacheId;
-
-        topVer = exchId.topologyVersion();
-
-        log = cctx.logger(getClass());
-
-        beforeExchange(exchId);
-    }
-
-    /**
-     * @return Full map string representation.
-     */
-    @SuppressWarnings( {"ConstantConditions"})
-    private String fullMapString() {
-        return node2part == null ? "null" : FULL_MAP_DEBUG ? 
node2part.toFullString() : node2part.toString();
-    }
-
-    /**
-     * @param map Map to get string for.
-     * @return Full map string representation.
-     */
-    @SuppressWarnings( {"ConstantConditions"})
-    private String mapString(GridDhtPartitionMap map) {
-        return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : 
map.toString();
-    }
-
-    /**
-     * @return Cache ID.
-     */
-    public int cacheId() {
-        return cacheId;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"})
-    @Override public void readLock() {
-        lock.readLock().lock();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readUnlock() {
-        lock.readLock().unlock();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updateTopologyVersion(GridDhtPartitionExchangeId 
exchId,
-        GridDhtPartitionsExchangeFuture<K, V> exchFut) {
-        lock.writeLock().lock();
-
-        try {
-            assert exchId.topologyVersion() > topVer : "Invalid topology 
version [topVer=" + topVer +
-                ", exchId=" + exchId + ']';
-
-            topVer = exchId.topologyVersion();
-
-            topReadyFut = exchFut;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public long topologyVersion() {
-        lock.readLock().lock();
-
-        try {
-            assert topVer > 0;
-
-            return topVer;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtTopologyFuture topologyVersionFuture() {
-        lock.readLock().lock();
-
-        try {
-            assert topReadyFut != null;
-
-            return topReadyFut;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) {
-        ClusterNode loc = cctx.localNode();
-
-        lock.writeLock().lock();
-
-        try {
-            assert topVer == exchId.topologyVersion() : "Invalid topology 
version [topVer=" +
-                topVer + ", exchId=" + exchId + ']';
-
-            if (!exchId.isJoined())
-                removeNode(exchId.nodeId());
-
-            // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx, topVer);
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map beforeExchange [exchId=" + exchId + 
", fullMap=" + fullMapString() + ']');
-
-            long updateSeq = this.updateSeq.incrementAndGet();
-
-            // If this is the oldest node.
-            if (oldest.id().equals(loc.id())) {
-                if (node2part == null) {
-                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Created brand new full topology map on 
oldest node [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-                else if (!node2part.valid()) {
-                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq, node2part, false);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Created new full topology map on oldest 
node [exchId=" + exchId + ", fullMap=" +
-                            node2part + ']');
-                }
-                else if (!node2part.nodeId().equals(loc.id())) {
-                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq, node2part, false);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Copied old map into new map on oldest node 
(previous oldest node left) [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-            }
-
-            consistencyCheck();
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map after beforeExchange [exchId=" + 
exchId + ", fullMap=" +
-                    fullMapString() + ']');
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) 
throws IgniteCheckedException {
-        long topVer = exchId.topologyVersion();
-
-        lock.writeLock().lock();
-
-        try {
-            assert topVer == exchId.topologyVersion() : "Invalid topology 
version [topVer=" +
-                topVer + ", exchId=" + exchId + ']';
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map before afterExchange [exchId=" + 
exchId + ", fullMap=" +
-                    fullMapString() + ']');
-
-            updateSeq.incrementAndGet();
-
-            consistencyCheck();
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int 
p, long topVer, boolean create)
-        throws GridDhtInvalidPartitionException {
-        if (!create)
-            return null;
-
-        throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted 
partition [part=" + p +
-            ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean 
create) {
-        return localPartition(1, -1, create);
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<GridDhtLocalPartition<K, V>> localPartitions() {
-        return Collections.emptyList();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridDhtLocalPartition<K, V>> 
currentLocalPartitions() {
-        return Collections.emptyList();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, 
GridDhtCacheEntry<K, V> e) {
-        assert false : "Entry should not be added to client topology: " + e;
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onRemoved(GridDhtCacheEntry<K, V> e) {
-        assert false : "Entry should not be removed from client topology: " + 
e;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtPartitionMap localPartitionMap() {
-        lock.readLock().lock();
-
-        try {
-            return new GridDhtPartitionMap(cctx.localNodeId(), updateSeq.get(),
-                Collections.<Integer, GridDhtPartitionState>emptyMap(), true);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> nodes(int p, long topVer) {
-        lock.readLock().lock();
-
-        try {
-            assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer=" + topVer +
-                ", node2part=" + node2part + ']';
-
-            Collection<ClusterNode> nodes = null;
-
-            Collection<UUID> nodeIds = part2node.get(p);
-
-            if (!F.isEmpty(nodeIds)) {
-                for (UUID nodeId : nodeIds) {
-                    ClusterNode n = cctx.discovery().node(nodeId);
-
-                    if (n != null && (topVer < 0 || n.order() <= topVer)) {
-                        if (nodes == null)
-                            nodes = new ArrayList<>();
-
-                        nodes.add(n);
-                    }
-                }
-            }
-
-            return nodes;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version ({@code -1} for all nodes).
-     * @param state Partition state.
-     * @param states Additional partition states.
-     * @return List of nodes for the partition.
-     */
-    private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState 
state, GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, 
topVer)) : null;
-
-        lock.readLock().lock();
-
-        try {
-            assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer=" + topVer +
-                ", allIds=" + allIds + ", node2part=" + node2part + ']';
-
-            Collection<UUID> nodeIds = part2node.get(p);
-
-            // Node IDs can be null if both, primary and backup, nodes 
disappear.
-            int size = nodeIds == null ? 0 : nodeIds.size();
-
-            if (size == 0)
-                return Collections.emptyList();
-
-            List<ClusterNode> nodes = new ArrayList<>(size);
-
-            for (UUID id : nodeIds) {
-                if (topVer > 0 && !allIds.contains(id))
-                    continue;
-
-                if (hasState(p, id, state, states)) {
-                    ClusterNode n = cctx.discovery().node(id);
-
-                    if (n != null && (topVer < 0 || n.order() <= topVer))
-                        nodes.add(n);
-                }
-            }
-
-            return nodes;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<ClusterNode> owners(int p, long topVer) {
-        return nodes(p, topVer, OWNING);
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<ClusterNode> owners(int p) {
-        return owners(p, -1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<ClusterNode> moving(int p) {
-        return nodes(p, -1, MOVING);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return List of nodes in state OWNING or MOVING.
-     */
-    private List<ClusterNode> ownersAndMoving(int p, long topVer) {
-        return nodes(p, topVer, OWNING, MOVING);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long updateSequence() {
-        return updateSeq.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) {
-        lock.readLock().lock();
-
-        try {
-            assert node2part != null && node2part.valid() : "Invalid node2part 
[node2part: " + node2part +
-                ", locNodeId=" + cctx.localNodeId() + ", gridName=" + 
cctx.gridName() + ']';
-
-            GridDhtPartitionFullMap m = node2part;
-
-            return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), 
m.updateSequence(), m, onlyActive);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap update(@Nullable 
GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionFullMap partMap) {
-        if (log.isDebugEnabled())
-            log.debug("Updating full partition map [exchId=" + exchId + ", 
parts=" + fullMapString() + ']');
-
-        lock.writeLock().lock();
-
-        try {
-            if (exchId != null && lastExchangeId != null && 
lastExchangeId.compareTo(exchId) >= 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for full partition map update 
(will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
-
-                return null;
-            }
-
-            if (node2part != null && node2part.compareTo(partMap) >= 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale partition map for full partition map 
update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ", curMap=" + 
node2part + ", newMap=" + partMap + ']');
-
-                return null;
-            }
-
-            updateSeq.incrementAndGet();
-
-            if (exchId != null)
-                lastExchangeId = exchId;
-
-            if (node2part != null) {
-                for (GridDhtPartitionMap part : node2part.values()) {
-                    GridDhtPartitionMap newPart = partMap.get(part.nodeId());
-
-                    // If for some nodes current partition has a newer map,
-                    // then we keep the newer value.
-                    if (newPart != null && newPart.updateSequence() < 
part.updateSequence()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update 
map [exchId=" + exchId + ", curPart=" +
-                                mapString(part) + ", newPart=" + 
mapString(newPart) + ']');
-
-                        partMap.put(part.nodeId(), part);
-                    }
-                }
-
-                for (Iterator<UUID> it = partMap.keySet().iterator(); 
it.hasNext();) {
-                    UUID nodeId = it.next();
-
-                    if (!cctx.discovery().alive(nodeId)) {
-                        if (log.isDebugEnabled())
-                            log.debug("Removing left node from full map update 
[nodeId=" + nodeId + ", partMap=" +
-                                partMap + ']');
-
-                        it.remove();
-                    }
-                }
-            }
-
-            node2part = partMap;
-
-            Map<Integer, Set<UUID>> p2n = new HashMap<>();
-
-            for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
-                for (Integer p : e.getValue().keySet()) {
-                    Set<UUID> ids = p2n.get(p);
-
-                    if (ids == null)
-                        // Initialize HashSet to size 3 in anticipation that 
there won't be
-                        // more than 3 nodes per partitions.
-                        p2n.put(p, ids = U.newHashSet(3));
-
-                    ids.add(e.getKey());
-                }
-            }
-
-            part2node = p2n;
-
-            consistencyCheck();
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map after full update: " + 
fullMapString());
-
-            return null;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap update(@Nullable 
GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts) {
-        if (log.isDebugEnabled())
-            log.debug("Updating single partition map [exchId=" + exchId + ", 
parts=" + mapString(parts) + ']');
-
-        if (!cctx.discovery().alive(parts.nodeId())) {
-            if (log.isDebugEnabled())
-                log.debug("Received partition update for non-existing node 
(will ignore) [exchId=" + exchId +
-                    ", parts=" + parts + ']');
-
-            return null;
-        }
-
-        lock.writeLock().lock();
-
-        try {
-            if (lastExchangeId != null && exchId != null && 
lastExchangeId.compareTo(exchId) > 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for single partition map 
update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
-
-                return null;
-            }
-
-            if (exchId != null)
-                lastExchangeId = exchId;
-
-            if (node2part == null) {
-                U.dumpStack(log, "Created invalid: " + node2part);
-
-                // Create invalid partition map.
-                node2part = new GridDhtPartitionFullMap();
-            }
-
-            GridDhtPartitionMap cur = node2part.get(parts.nodeId());
-
-            if (cur != null && cur.updateSequence() >= parts.updateSequence()) 
{
-                if (log.isDebugEnabled())
-                    log.debug("Stale update sequence for single partition map 
update (will ignore) [exchId=" + exchId +
-                        ", curSeq=" + cur.updateSequence() + ", newSeq=" + 
parts.updateSequence() + ']');
-
-                return null;
-            }
-
-            long updateSeq = this.updateSeq.incrementAndGet();
-
-            node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
-
-            boolean changed = false;
-
-            if (cur == null || !cur.equals(parts))
-                changed = true;
-
-            node2part.put(parts.nodeId(), parts);
-
-            part2node = new HashMap<>(part2node);
-
-            // Add new mappings.
-            for (Integer p : parts.keySet()) {
-                Set<UUID> ids = part2node.get(p);
-
-                if (ids == null)
-                    // Initialize HashSet to size 3 in anticipation that there 
won't be
-                    // more than 3 nodes per partition.
-                    part2node.put(p, ids = U.newHashSet(3));
-
-                changed |= ids.add(parts.nodeId());
-            }
-
-            // Remove obsolete mappings.
-            if (cur != null) {
-                for (Integer p : F.view(cur.keySet(), 
F0.notIn(parts.keySet()))) {
-                    Set<UUID> ids = part2node.get(p);
-
-                    if (ids != null)
-                        changed |= ids.remove(parts.nodeId());
-                }
-            }
-
-            consistencyCheck();
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map after single update: " + 
fullMapString());
-
-            return changed ? localPartitionMap() : null;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /**
-     * Updates value for single partition.
-     *
-     * @param p Partition.
-     * @param nodeId Node ID.
-     * @param state State.
-     * @param updateSeq Update sequence.
-     */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, 
long updateSeq) {
-        assert lock.isWriteLockedByCurrentThread();
-        assert nodeId.equals(cctx.localNodeId());
-
-        // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldest(cctx, topVer);
-
-        // If this node became the oldest node.
-        if (oldest.id().equals(cctx.localNodeId())) {
-            long seq = node2part.updateSequence();
-
-            if (seq != updateSeq) {
-                if (seq > updateSeq) {
-                    if (this.updateSeq.get() < seq) {
-                        // Update global counter if necessary.
-                        boolean b = 
this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1);
-
-                        assert b : "Invalid update sequence [updateSeq=" + 
updateSeq + ", seq=" + seq +
-                            ", curUpdateSeq=" + this.updateSeq.get() + ", 
node2part=" + node2part.toFullString() + ']';
-
-                        updateSeq = seq + 1;
-                    }
-                    else
-                        updateSeq = seq;
-                }
-
-                node2part.updateSequence(updateSeq);
-            }
-        }
-
-        GridDhtPartitionMap map = node2part.get(nodeId);
-
-        if (map == null)
-            node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, 
updateSeq,
-                Collections.<Integer, GridDhtPartitionState>emptyMap(), 
false));
-
-        map.updateSequence(updateSeq);
-
-        map.put(p, state);
-
-        Set<UUID> ids = part2node.get(p);
-
-        if (ids == null)
-            part2node.put(p, ids = U.newHashSet(3));
-
-        ids.add(nodeId);
-    }
-
-    /**
-     * @param nodeId Node to remove.
-     */
-    private void removeNode(UUID nodeId) {
-        assert nodeId != null;
-        assert lock.writeLock().isHeldByCurrentThread();
-
-        ClusterNode oldest = CU.oldest(cctx, topVer);
-
-        ClusterNode loc = cctx.localNode();
-
-        if (node2part != null) {
-            if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) {
-                updateSeq.setIfGreater(node2part.updateSequence());
-
-                node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), 
updateSeq.incrementAndGet(),
-                    node2part, false);
-            }
-            else
-                node2part = new GridDhtPartitionFullMap(node2part, 
node2part.updateSequence());
-
-            part2node = new HashMap<>(part2node);
-
-            GridDhtPartitionMap parts = node2part.remove(nodeId);
-
-            if (parts != null) {
-                for (Integer p : parts.keySet()) {
-                    Set<UUID> nodeIds = part2node.get(p);
-
-                    if (nodeIds != null) {
-                        nodeIds.remove(nodeId);
-
-                        if (nodeIds.isEmpty())
-                            part2node.remove(p);
-                    }
-                }
-            }
-
-            consistencyCheck();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean own(GridDhtLocalPartition<K, V> part) {
-        assert false : "Client topology should never own a partition: " + part;
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onEvicted(GridDhtLocalPartition<K, V> part, boolean 
updateSeq) {
-        assert updateSeq || lock.isWriteLockedByCurrentThread();
-
-        lock.writeLock().lock();
-
-        try {
-            assert part.state() == EVICTED;
-
-            long seq = updateSeq ? this.updateSeq.incrementAndGet() : 
this.updateSeq.get();
-
-            updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
-
-            consistencyCheck();
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) {
-        lock.readLock().lock();
-
-        try {
-            return node2part.get(nodeId);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void printMemoryStats(int threshold) {
-        X.println(">>>  Cache partition topology stats [grid=" + 
cctx.gridName() + ", cacheId=" + cacheId + ']');
-    }
-
-    /**
-     * @param p Partition.
-     * @param nodeId Node ID.
-     * @param match State to match.
-     * @param matches Additional states.
-     * @return Filter for owners of this partition.
-     */
-    private boolean hasState(final int p, @Nullable UUID nodeId, final 
GridDhtPartitionState match,
-        final GridDhtPartitionState... matches) {
-        if (nodeId == null)
-            return false;
-
-        GridDhtPartitionMap parts = node2part.get(nodeId);
-
-        // Set can be null if node has been removed.
-        if (parts != null) {
-            GridDhtPartitionState state = parts.get(p);
-
-            if (state == match)
-                return true;
-
-            if (matches != null && matches.length > 0)
-                for (GridDhtPartitionState s : matches)
-                    if (state == s)
-                        return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Checks consistency after all operations.
-     */
-    private void consistencyCheck() {
-        if (CONSISTENCY_CHECK) {
-            assert lock.writeLock().isHeldByCurrentThread();
-
-            if (node2part == null)
-                return;
-
-            for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
-                for (Integer p : e.getValue().keySet()) {
-                    Set<UUID> nodeIds = part2node.get(p);
-
-                    assert nodeIds != null : "Failed consistency check [part=" 
+ p + ", nodeId=" + e.getKey() + ']';
-                    assert nodeIds.contains(e.getKey()) : "Failed consistency 
check [part=" + p + ", nodeId=" +
-                        e.getKey() + ", nodeIds=" + nodeIds + ']';
-                }
-            }
-
-            for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) {
-                for (UUID nodeId : e.getValue()) {
-                    GridDhtPartitionMap map = node2part.get(nodeId);
-
-                    assert map != null : "Failed consistency check [part=" + 
e.getKey() + ", nodeId=" + nodeId + ']';
-                    assert map.containsKey(e.getKey()) : "Failed consistency 
check [part=" + e.getKey() +
-                        ", nodeId=" + nodeId + ']';
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
deleted file mode 100644
index 4c120ba..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.nio.*;
-
-/**
- * Affinity assignment request.
- */
-public class GridDhtAffinityAssignmentRequest<K, V> extends 
GridCacheMessage<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Topology version being queried. */
-    private long topVer;
-
-    /**
-     * Empty constructor.
-     */
-    public GridDhtAffinityAssignmentRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     * @param topVer Topology version.
-     */
-    public GridDhtAffinityAssignmentRequest(int cacheId, long topVer) {
-        this.cacheId = cacheId;
-        this.topVer = topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean allowForStartup() {
-        return true;
-    }
-
-    /**
-     * @return Requested topology version.
-     */
-    @Override public long topologyVersion() {
-        return topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 79;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridDhtAffinityAssignmentRequest _clone = new 
GridDhtAffinityAssignmentRequest();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridDhtAffinityAssignmentRequest _clone = 
(GridDhtAffinityAssignmentRequest)_msg;
-
-        _clone.topVer = topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 3:
-                if (!commState.putLong(topVer))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 3:
-                if (buf.remaining() < 8)
-                    return false;
-
-                topVer = commState.getLong();
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtAffinityAssignmentRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
deleted file mode 100644
index 2f9a84f..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * Affinity assignment response.
- */
-public class GridDhtAffinityAssignmentResponse<K, V> extends 
GridCacheMessage<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Topology version. */
-    private long topVer;
-
-    /** Affinity assignment. */
-    @GridDirectTransient
-    @GridToStringInclude
-    private List<List<ClusterNode>> affAssignment;
-
-    /** Affinity assignment bytes. */
-    private byte[] affAssignmentBytes;
-
-    /**
-     * Empty constructor.
-     */
-    public GridDhtAffinityAssignmentResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     * @param topVer Topology version.
-     * @param affAssignment Affinity assignment.
-     */
-    public GridDhtAffinityAssignmentResponse(int cacheId, long topVer, 
List<List<ClusterNode>> affAssignment) {
-        this.cacheId = cacheId;
-        this.topVer = topVer;
-        this.affAssignment = affAssignment;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean allowForStartup() {
-        return true;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    @Override public long topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @return Affinity assignment.
-     */
-    public List<List<ClusterNode>> affinityAssignment() {
-        return affAssignment;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 80;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridDhtAffinityAssignmentResponse _clone = new 
GridDhtAffinityAssignmentResponse();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridDhtAffinityAssignmentResponse _clone = 
(GridDhtAffinityAssignmentResponse)_msg;
-
-        _clone.topVer = topVer;
-        _clone.affAssignment = affAssignment;
-        _clone.affAssignmentBytes = affAssignmentBytes;
-    }
-
-    /** {@inheritDoc}
-     * @param ctx*/
-    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (affAssignment != null)
-            affAssignmentBytes = ctx.marshaller().marshal(affAssignment);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (affAssignmentBytes != null)
-            affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, 
ldr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 3:
-                if (!commState.putByteArray(affAssignmentBytes))
-                    return false;
-
-                commState.idx++;
-
-            case 4:
-                if (!commState.putLong(topVer))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 3:
-                byte[] affAssignmentBytes0 = commState.getByteArray();
-
-                if (affAssignmentBytes0 == BYTE_ARR_NOT_READ)
-                    return false;
-
-                affAssignmentBytes = affAssignmentBytes0;
-
-                commState.idx++;
-
-            case 4:
-                if (buf.remaining() < 8)
-                    return false;
-
-                topVer = commState.getLong();
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtAffinityAssignmentResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
deleted file mode 100644
index 79e9ee8..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- * Future that fetches affinity assignment from remote cache nodes.
- */
-public class GridDhtAssignmentFetchFuture<K, V> extends 
GridFutureAdapter<List<List<ClusterNode>>> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Nodes order comparator. */
-    private static final Comparator<ClusterNode> CMP = new 
GridNodeOrderComparator();
-
-    /** Cache context. */
-    private final GridCacheContext<K, V> ctx;
-
-    /** List of available nodes this future can fetch data from. */
-    private Queue<ClusterNode> availableNodes;
-
-    /** Topology version. */
-    private final long topVer;
-
-    /** Pending node from which response is being awaited. */
-    private ClusterNode pendingNode;
-
-    /**
-     * @param ctx Cache context.
-     * @param availableNodes Available nodes.
-     */
-    public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, long 
topVer, Collection<ClusterNode> availableNodes) {
-        super(ctx.kernalContext());
-
-        this.ctx = ctx;
-
-        this.topVer = topVer;
-
-        LinkedList<ClusterNode> tmp = new LinkedList<>();
-        tmp.addAll(availableNodes);
-        Collections.sort(tmp, CMP);
-
-        this.availableNodes = tmp;
-    }
-
-    /**
-     * Initializes fetch future.
-     */
-    public void init() {
-        ((GridDhtPreloader<K, 
V>)ctx.preloader()).addDhtAssignmentFetchFuture(topVer, this);
-
-        requestFromNextNode();
-    }
-
-    /**
-     * @param node Node.
-     * @param res Reponse.
-     */
-    public void onResponse(ClusterNode node, 
GridDhtAffinityAssignmentResponse<K, V> res) {
-        if (res.topologyVersion() != topVer) {
-            if (log.isDebugEnabled())
-                log.debug("Received affinity assignment for wrong topolgy 
version (will ignore) " +
-                    "[node=" + node + ", res=" + res + ", topVer=" + topVer + 
']');
-
-            return;
-        }
-
-        List<List<ClusterNode>> assignment = null;
-
-        synchronized (this) {
-            if (pendingNode != null && pendingNode.equals(node))
-                assignment = res.affinityAssignment();
-        }
-
-        if (assignment != null)
-            onDone(assignment);
-    }
-
-    /**
-     * @param leftNodeId Left node ID.
-     */
-    public void onNodeLeft(UUID leftNodeId) {
-        synchronized (this) {
-            if (pendingNode != null && pendingNode.id().equals(leftNodeId)) {
-                availableNodes.remove(pendingNode);
-
-                pendingNode = null;
-            }
-        }
-
-        requestFromNextNode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable List<List<ClusterNode>> res, 
@Nullable Throwable err) {
-        if (super.onDone(res, err)) {
-            ((GridDhtPreloader<K, 
V>)ctx.preloader()).removeDhtAssignmentFetchFuture(topVer, this);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Requests affinity from next node in the list.
-     */
-    private void requestFromNextNode() {
-        boolean complete;
-
-        // Avoid 'protected field is accessed in synchronized context' warning.
-        IgniteLogger log0 = log;
-
-        synchronized (this) {
-            while (!availableNodes.isEmpty()) {
-                ClusterNode node = availableNodes.poll();
-
-                try {
-                    if (log0.isDebugEnabled())
-                        log0.debug("Sending affinity fetch request to remote 
node [locNodeId=" + ctx.localNodeId() +
-                            ", node=" + node + ']');
-
-                    ctx.io().send(node, new 
GridDhtAffinityAssignmentRequest<K, V>(ctx.cacheId(), topVer),
-                        AFFINITY_POOL);
-
-                    // Close window for listener notification.
-                    if (ctx.discovery().node(node.id()) == null) {
-                        U.warn(log0, "Failed to request affinity assignment 
from remote node (node left grid, will " +
-                            "continue to another node): " + node);
-
-                        continue;
-                    }
-
-                    pendingNode = node;
-
-                    break;
-                }
-                catch (ClusterTopologyException ignored) {
-                    U.warn(log0, "Failed to request affinity assignment from 
remote node (node left grid, will " +
-                        "continue to another node): " + node);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log0, "Failed to request affinity assignment from 
remote node (will " +
-                        "continue to another node): " + node, e);
-                }
-            }
-
-            complete = pendingNode == null;
-        }
-
-        // No more nodes left, complete future with null outside of 
synchronization.
-        // Affinity should be calculated from scratch.
-        if (complete)
-            onDone((List<List<ClusterNode>>)null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCache.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCache.java
deleted file mode 100644
index 9993a72..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCache.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.tostring.*;
-
-import java.io.*;
-
-/**
- * DHT cache.
- */
-public class GridDhtCache<K, V> extends GridDhtTransactionalCacheAdapter<K, V> 
{
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Near cache. */
-    @GridToStringExclude
-    private GridNearTransactionalCache<K, V> near;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtCache() {
-        // No-op.
-    }
-
-    /**
-     * @param ctx Context.
-     */
-    public GridDhtCache(GridCacheContext<K, V> ctx) {
-        super(ctx);
-    }
-
-    /**
-     * @param ctx Cache context.
-     * @param map Cache map.
-     */
-    public GridDhtCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, 
V> map) {
-        super(ctx, map);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isDht() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        String name = super.name();
-
-        return name == null ? "defaultDhtCache" : name + "Dht";
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        resetMetrics();
-
-        super.start();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void resetMetrics() {
-        GridCacheMetricsAdapter m = new GridCacheMetricsAdapter();
-
-        m.delegate(ctx.dht().near().metrics0());
-
-        metrics = m;
-
-        ctx.dr().resetMetrics();
-    }
-
-    /**
-     * @return Near cache.
-     */
-    @Override public GridNearTransactionalCache<K, V> near() {
-        return near;
-    }
-
-    /**
-     * @param near Near cache.
-     */
-    public void near(GridNearTransactionalCache<K, V> near) {
-        this.near = near;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
deleted file mode 100644
index 3be1bbf..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ /dev/null
@@ -1,1017 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.GridCacheDistributionMode.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
-import static org.apache.ignite.internal.processors.dr.GridDrType.*;
-
-/**
- * DHT cache adapter.
- */
-public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdapter<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Topology. */
-    private GridDhtPartitionTopology<K, V> top;
-
-    /** Preloader. */
-    protected GridCachePreloader<K, V> preldr;
-
-    /** Multi tx future holder. */
-    private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> 
multiTxHolder = new ThreadLocal<>();
-
-    /** Multi tx futures. */
-    private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new 
ConcurrentHashMap8<>();
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    protected GridDhtCacheAdapter() {
-        // No-op.
-    }
-
-    /**
-     * @param ctx Context.
-     */
-    protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
-        super(ctx, ctx.config().getStartSize());
-
-        top = new GridDhtPartitionTopologyImpl<>(ctx);
-    }
-
-    /**
-     * Constructor used for near-only cache.
-     *
-     * @param ctx Cache context.
-     * @param map Cache map.
-     */
-    protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, 
GridCacheConcurrentMap<K, V> map) {
-        super(ctx, map);
-
-        top = new GridDhtPartitionTopologyImpl<>(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void init() {
-        map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
-            /** {@inheritDoc} */
-            @Override public GridCacheMapEntry<K, V> 
create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
-                V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
-                return new GridDhtCacheEntry<>(ctx, topVer, key, hash, val, 
next, ttl, hdrId);
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        super.start();
-
-        ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, 
new CI2<UUID, GridCacheTtlUpdateRequest<K, V>>() {
-            @Override public void apply(UUID nodeId, 
GridCacheTtlUpdateRequest<K, V> req) {
-                processTtlUpdateRequest(req);
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() {
-        super.stop();
-
-        if (preldr != null)
-            preldr.stop();
-
-        // Clean up to help GC.
-        preldr = null;
-        top = null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        preldr.onKernalStart();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop() {
-        super.onKernalStop();
-
-        if (preldr != null)
-            preldr.onKernalStop();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void printMemoryStats() {
-        super.printMemoryStats();
-
-        top.printMemoryStats(1024);
-    }
-
-    /**
-     * @return Near cache.
-     */
-    public abstract GridNearCacheAdapter<K, V> near();
-
-    /**
-     * @return Partition topology.
-     */
-    public GridDhtPartitionTopology<K, V> topology() {
-        return top;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCachePreloader<K, V> preloader() {
-        return preldr;
-    }
-
-    /**
-     * @return DHT preloader.
-     */
-    public GridDhtPreloader<K, V> dhtPreloader() {
-        assert preldr instanceof GridDhtPreloader;
-
-        return (GridDhtPreloader<K, V>)preldr;
-    }
-
-    /**
-     * @return Topology version future registered for multi-update.
-     */
-    @Nullable public GridDhtTopologyFuture multiUpdateTopologyFuture() {
-        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = 
multiTxHolder.get();
-
-        return tup == null ? null : tup.get2();
-    }
-
-    /**
-     * Starts multi-update lock. Will wait for topology future is ready.
-     *
-     * @return Topology version.
-     * @throws IgniteCheckedException If failed.
-     */
-    public long beginMultiUpdate() throws IgniteCheckedException {
-        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = 
multiTxHolder.get();
-
-        if (tup != null)
-            throw new IgniteCheckedException("Nested multi-update locks are 
not supported");
-
-        top.readLock();
-
-        GridDhtTopologyFuture topFut;
-
-        long topVer;
-
-        try {
-            // While we are holding read lock, register lock future for 
partition release future.
-            IgniteUuid lockId = IgniteUuid.fromUuid(ctx.localNodeId());
-
-            topVer = top.topologyVersion();
-
-            MultiUpdateFuture fut = new MultiUpdateFuture(ctx.kernalContext(), 
topVer);
-
-            MultiUpdateFuture old = multiTxFuts.putIfAbsent(lockId, fut);
-
-            assert old == null;
-
-            topFut = top.topologyVersionFuture();
-
-            multiTxHolder.set(F.t(lockId, topFut));
-        }
-        finally {
-            top.readUnlock();
-        }
-
-        topFut.get();
-
-        return topVer;
-    }
-
-    /**
-     * Ends multi-update lock.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void endMultiUpdate() throws IgniteCheckedException {
-        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = 
multiTxHolder.get();
-
-        if (tup == null)
-            throw new IgniteCheckedException("Multi-update was not started or 
released twice.");
-
-        top.readLock();
-
-        try {
-            IgniteUuid lockId = tup.get1();
-
-            MultiUpdateFuture multiFut = multiTxFuts.remove(lockId);
-
-            multiTxHolder.set(null);
-
-            // Finish future.
-            multiFut.onDone(lockId);
-        }
-        finally {
-            top.readUnlock();
-        }
-    }
-
-    /**
-     * Creates multi update finish future. Will return {@code null} if no 
multi-update locks are found.
-     *
-     * @param topVer Topology version.
-     * @return Finish future.
-     */
-    @Nullable public IgniteFuture<?> multiUpdateFinishFuture(long topVer) {
-        GridCompoundFuture<IgniteUuid, Object> fut = null;
-
-        for (MultiUpdateFuture multiFut : multiTxFuts.values()) {
-            if (multiFut.topologyVersion() <= topVer) {
-                if (fut == null)
-                    fut = new GridCompoundFuture<>(ctx.kernalContext());
-
-                fut.add(multiFut);
-            }
-        }
-
-        if (fut != null)
-            fut.markInitialized();
-
-        return fut;
-    }
-
-    /**
-     * @param key Key.
-     * @return DHT entry.
-     */
-    @Nullable public GridDhtCacheEntry<K, V> peekExx(K key) {
-        return (GridDhtCacheEntry<K, V>)peekEx(key);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @throws GridDhtInvalidPartitionException If partition for the key is no 
longer valid.
-     */
-    @Override public GridCacheEntry<K, V> entry(K key) throws 
GridDhtInvalidPartitionException {
-        return super.entry(key);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @throws GridDhtInvalidPartitionException If partition for the key is no 
longer valid.
-     */
-    @Override public GridCacheEntryEx<K, V> entryEx(K key, boolean touch) 
throws GridDhtInvalidPartitionException {
-        return super.entryEx(key, touch);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @throws GridDhtInvalidPartitionException If partition for the key is no 
longer valid.
-     */
-    @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) throws 
GridDhtInvalidPartitionException {
-        return super.entryEx(key, topVer);
-    }
-
-    /**
-     * @param key Key.
-     * @return DHT entry.
-     * @throws GridDhtInvalidPartitionException If partition for the key is no 
longer valid.
-     */
-    public GridDhtCacheEntry<K, V> entryExx(K key) throws 
GridDhtInvalidPartitionException {
-        return (GridDhtCacheEntry<K, V>)entryEx(key);
-    }
-
-    /**
-     * @param key Key.
-     * @param topVer Topology version.
-     * @return DHT entry.
-     * @throws GridDhtInvalidPartitionException If partition for the key is no 
longer valid.
-     */
-    public GridDhtCacheEntry<K, V> entryExx(K key, long topVer) throws 
GridDhtInvalidPartitionException {
-        return (GridDhtCacheEntry<K, V>)entryEx(key, topVer);
-    }
-
-    /**
-     * Gets or creates entry for given key. If key belongs to local node, dht 
entry will be returned, otherwise
-     * if {@code allowDetached} is {@code true}, detached entry will be 
returned, otherwise exception will be
-     * thrown.
-     *
-     * @param key Key for which entry should be returned.
-     * @param allowDetached Whether to allow detached entries.
-     * @param touch {@code True} if entry should be passed to eviction policy.
-     * @return Cache entry.
-     * @throws GridDhtInvalidPartitionException if entry does not belong to 
this node and
-     *      {@code allowDetached} is {@code false}.
-     */
-    public GridCacheEntryEx<K, V> entryExx(K key, long topVer, boolean 
allowDetached, boolean touch) {
-        try {
-            return allowDetached && !ctx.affinity().localNode(key, topVer) ?
-                new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), 
null, null, 0, 0) :
-                entryEx(key, touch);
-        }
-        catch (GridDhtInvalidPartitionException e) {
-            if (!allowDetached)
-                throw e;
-
-            return new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), 
null, null, 0, 0);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void localLoad(Collection<? extends K> keys) throws 
IgniteCheckedException {
-        if (ctx.store().isLocalStore()) {
-            super.localLoad(keys);
-
-            return;
-        }
-
-        // Version for all loaded entries.
-        final GridCacheVersion ver0 = 
ctx.shared().versions().nextForLoad(topology().topologyVersion());
-
-        final boolean replicate = ctx.isDrEnabled();
-
-        final long topVer = ctx.affinity().affinityTopologyVersion();
-
-        ctx.store().loadAllFromStore(null, keys, new CI2<K, V>() {
-            @Override public void apply(K key, V val) {
-                loadEntry(key, val, ver0, null, topVer, replicate, 0);
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void loadCache(final IgniteBiPredicate<K, V> p, final 
long ttl, Object[] args) throws IgniteCheckedException {
-        if (ctx.store().isLocalStore()) {
-            super.loadCache(p, ttl, args);
-
-            return;
-        }
-
-        // Version for all loaded entries.
-        final GridCacheVersion ver0 = 
ctx.shared().versions().nextForLoad(topology().topologyVersion());
-
-        final boolean replicate = ctx.isDrEnabled();
-
-        final long topVer = ctx.affinity().affinityTopologyVersion();
-
-        ctx.store().loadCache(new CI3<K, V, GridCacheVersion>() {
-            @Override public void apply(K key, V val, @Nullable 
GridCacheVersion ver) {
-                assert ver == null;
-
-                loadEntry(key, val, ver0, p, topVer, replicate, ttl);
-            }
-        }, args);
-    }
-
-    /**
-     * @param key Key.
-     * @param val Value.
-     * @param ver Cache version.
-     * @param p Optional predicate.
-     * @param topVer Topology version.
-     * @param replicate Replication flag.
-     * @param ttl TTL.
-     */
-    private void loadEntry(K key,
-        V val,
-        GridCacheVersion ver,
-        @Nullable IgniteBiPredicate<K, V> p,
-        long topVer,
-        boolean replicate,
-        long ttl) {
-        if (p != null && !p.apply(key, val))
-            return;
-
-        try {
-            GridDhtLocalPartition<K, V> part = 
top.localPartition(ctx.affinity().partition(key), -1, true);
-
-            // Reserve to make sure that partition does not get unloaded.
-            if (part.reserve()) {
-                GridCacheEntryEx<K, V> entry = null;
-
-                try {
-                    if (ctx.portableEnabled()) {
-                        key = (K)ctx.marshalToPortable(key);
-                        val = (V)ctx.marshalToPortable(val);
-                    }
-
-                    entry = entryEx(key, false);
-
-                    entry.initialValue(val, null, ver, ttl, -1, false, topVer, 
replicate ? DR_LOAD : DR_NONE);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException("Failed to put cache value: " + 
entry, e);
-                }
-                catch (GridCacheEntryRemovedException ignore) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry during loadCache (will 
ignore): " + entry);
-                }
-                finally {
-                    if (entry != null)
-                        entry.context().evicts().touch(entry, topVer);
-
-                    part.release();
-                }
-            }
-            else if (log.isDebugEnabled())
-                log.debug("Will node load entry into cache (partition is 
invalid): " + part);
-        }
-        catch (GridDhtInvalidPartitionException e) {
-            if (log.isDebugEnabled())
-                log.debug("Ignoring entry for partition that does not belong 
[key=" + key + ", val=" + val +
-                    ", err=" + e + ']');
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int primarySize() {
-        int sum = 0;
-
-        long topVer = ctx.affinity().affinityTopologyVersion();
-
-        for (GridDhtLocalPartition<K, V> p : 
topology().currentLocalPartitions()) {
-            if (p.primary(topVer))
-                sum += p.publicSize();
-        }
-
-        return sum;
-    }
-
-    /**
-     * This method is used internally. Use
-     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, long, 
UUID, int, boolean, IgnitePredicate[], IgniteCacheExpiryPolicy)}
-     * method instead to retrieve DHT value.
-     *
-     * @param keys {@inheritDoc}
-     * @param forcePrimary {@inheritDoc}
-     * @param skipTx {@inheritDoc}
-     * @param filter {@inheritDoc}
-     * @return {@inheritDoc}
-     */
-    @Override public IgniteFuture<Map<K, V>> getAllAsync(
-        @Nullable Collection<? extends K> keys,
-        boolean forcePrimary,
-        boolean skipTx,
-        @Nullable GridCacheEntryEx<K, V> entry,
-        @Nullable UUID subjId,
-        String taskName,
-        boolean deserializePortable,
-        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter
-    ) {
-        return getAllAsync(keys,
-            true,
-            null,
-            /*don't check local tx. */false,
-            subjId,
-            taskName,
-            deserializePortable,
-            forcePrimary,
-            null,
-            filter);
-    }
-
-    /** {@inheritDoc} */
-    @Override public V reload(K key, @Nullable 
IgnitePredicate<GridCacheEntry<K, V>>... filter)
-        throws IgniteCheckedException {
-        try {
-            return super.reload(key, filter);
-        }
-        catch (GridDhtInvalidPartitionException ignored) {
-            return null;
-        }
-    }
-
-    /**
-     * @param keys Keys to get
-     * @param readThrough Read through flag.
-     * @param subjId Subject ID.
-     * @param taskName Task name.
-     * @param deserializePortable Deserialize portable flag.
-     * @param filter Optional filter.
-     * @param expiry Expiry policy.
-     * @return Get future.
-     */
-    IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> 
keys,
-        boolean readThrough,
-        @Nullable UUID subjId,
-        String taskName,
-        boolean deserializePortable,
-        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable IgniteCacheExpiryPolicy expiry
-        ) {
-        return getAllAsync(keys,
-            readThrough,
-            null,
-            /*don't check local tx. */false,
-            subjId,
-            taskName,
-            deserializePortable,
-            false,
-            expiry,
-            filter);
-    }
-
-    /**
-     * @param reader Reader node ID.
-     * @param msgId Message ID.
-     * @param keys Keys to get.
-     * @param readThrough Read through flag.
-     * @param reload Reload flag.
-     * @param topVer Topology version.
-     * @param subjId Subject ID.
-     * @param taskNameHash Task name hash code.
-     * @param deserializePortable Deserialize portable flag.
-     * @param filter Optional filter.
-     * @param expiry Expiry policy.
-     * @return DHT future.
-     */
-    public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> 
getDhtAsync(UUID reader,
-        long msgId,
-        LinkedHashMap<? extends K, Boolean> keys,
-        boolean readThrough,
-        boolean reload,
-        long topVer,
-        @Nullable UUID subjId,
-        int taskNameHash,
-        boolean deserializePortable,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable IgniteCacheExpiryPolicy expiry) {
-        GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
-            msgId,
-            reader,
-            keys,
-            readThrough,
-            reload,
-            /*tx*/null,
-            topVer,
-            filter,
-            subjId,
-            taskNameHash,
-            deserializePortable,
-            expiry);
-
-        fut.init();
-
-        return fut;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param req Get request.
-     */
-    protected void processNearGetRequest(final UUID nodeId, final 
GridNearGetRequest<K, V> req) {
-        assert isAffinityNode(cacheCfg);
-
-        long ttl = req.accessTtl();
-
-        final GetExpiryPolicy expiryPlc = ttl == -1L ? null : new 
GetExpiryPolicy(ttl);
-
-        IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
-            getDhtAsync(nodeId,
-                req.messageId(),
-                req.keys(),
-                req.readThrough(),
-                req.reload(),
-                req.topologyVersion(),
-                req.subjectId(),
-                req.taskNameHash(),
-                false,
-                req.filter(),
-                expiryPlc);
-
-        fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, 
V>>>>() {
-            @Override public void 
apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) {
-                GridNearGetResponse<K, V> res = new 
GridNearGetResponse<>(ctx.cacheId(),
-                    req.futureId(),
-                    req.miniId(),
-                    req.version());
-
-                GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
-                    (GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>>)f;
-
-                try {
-                    Collection<GridCacheEntryInfo<K, V>> entries = fut.get();
-
-                    res.entries(entries);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed processing get request: " + req, e);
-
-                    res.error(e);
-                }
-
-                res.invalidPartitions(fut.invalidPartitions(), 
ctx.discovery().topologyVersion());
-
-                try {
-                    ctx.io().send(nodeId, res);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send get response to node (is node 
still alive?) [nodeId=" + nodeId +
-                        ",req=" + req + ", res=" + res + ']', e);
-                }
-
-                sendTtlUpdateRequest(expiryPlc);
-            }
-        });
-    }
-
-    /**
-     * @param expiryPlc Expiry policy.
-     */
-    public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy 
expiryPlc) {
-        if (expiryPlc != null && expiryPlc.entries() != null) {
-            ctx.closures().runLocalSafe(new Runnable() {
-                @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"})
-                @Override public void run() {
-                    Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> 
entries = expiryPlc.entries();
-
-                    assert entries != null && !entries.isEmpty();
-
-                    Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = 
new HashMap<>();
-
-                    long topVer = ctx.discovery().topologyVersion();
-
-                    for (Map.Entry<Object, IgniteBiTuple<byte[], 
GridCacheVersion>> e : entries.entrySet()) {
-                        List<ClusterNode> nodes = 
ctx.affinity().nodes((K)e.getKey(), topVer);
-
-                        for (int i = 0; i < nodes.size(); i++) {
-                            ClusterNode node = nodes.get(i);
-
-                            if (!node.isLocal()) {
-                                GridCacheTtlUpdateRequest<K, V> req = 
reqMap.get(node);
-
-                                if (req == null) {
-                                    reqMap.put(node,
-                                        req = new 
GridCacheTtlUpdateRequest<>(topVer, expiryPlc.forAccess()));
-
-                                    req.cacheId(ctx.cacheId());
-                                }
-
-                                req.addEntry(e.getValue().get1(), 
e.getValue().get2());
-                            }
-                        }
-                    }
-
-                    Map<UUID, Collection<IgniteBiTuple<byte[], 
GridCacheVersion>>> rdrs = expiryPlc.readers();
-
-                    if (rdrs != null) {
-                        assert !rdrs.isEmpty();
-
-                        for (Map.Entry<UUID, Collection<IgniteBiTuple<byte[], 
GridCacheVersion>>> e : rdrs.entrySet()) {
-                            ClusterNode node = ctx.node(e.getKey());
-
-                            if (node != null) {
-                                GridCacheTtlUpdateRequest<K, V> req = 
reqMap.get(node);
-
-                                if (req == null) {
-                                    reqMap.put(node, req = new 
GridCacheTtlUpdateRequest<>(topVer,
-                                        expiryPlc.forAccess()));
-
-                                    req.cacheId(ctx.cacheId());
-                                }
-
-                                for (IgniteBiTuple<byte[], GridCacheVersion> t 
: e.getValue())
-                                    req.addNearEntry(t.get1(), t.get2());
-                            }
-                        }
-                    }
-
-                    for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, 
V>> req : reqMap.entrySet()) {
-                        try {
-                            ctx.io().send(req.getKey(), req.getValue());
-                        }
-                        catch (IgniteCheckedException e) {
-                            log.error("Failed to send TTL update request.", e);
-                        }
-                    }
-                }
-            });
-        }
-    }
-
-    /**
-     * @param req Request.
-     */
-    private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
-        if (req.keys() != null)
-            updateTtl(this, req.keys(), req.versions(), req.ttl());
-
-        if (req.nearKeys() != null) {
-            GridNearCacheAdapter<K, V> near = near();
-
-            assert near != null;
-
-            updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
-        }
-    }
-
-    /**
-     * @param cache Cache.
-     * @param keys Entries keys.
-     * @param vers Entries versions.
-     * @param ttl TTL.
-     */
-    private void updateTtl(GridCacheAdapter<K, V> cache,
-        List<K> keys,
-        List<GridCacheVersion> vers,
-        long ttl) {
-        assert !F.isEmpty(keys);
-        assert keys.size() == vers.size();
-
-        int size = keys.size();
-
-        boolean swap = cache.context().isSwapOrOffheapEnabled();
-
-        for (int i = 0; i < size; i++) {
-            try {
-                GridCacheEntryEx<K, V> entry = null;
-
-                try {
-                    if (swap) {
-                        entry = cache.entryEx(keys.get(i));
-
-                        entry.unswap(true, false);
-                    }
-                    else
-                        entry = cache.peekEx(keys.get(i));
-
-                    if (entry != null)
-                        entry.updateTtl(vers.get(i), ttl);
-                }
-                finally {
-                    if (entry != null)
-                        cache.context().evicts().touch(entry, -1L);
-                }
-            }
-            catch (IgniteCheckedException e) {
-                log.error("Failed to unswap entry.", e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void unlockAll(Collection<? extends K> keys,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        assert false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Set<GridCacheEntry<K, V>> entrySet(int part) {
-        return new PartitionEntrySet(part);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtCacheAdapter.class, this);
-    }
-
-    /**
-     *
-     */
-    private class PartitionEntrySet extends AbstractSet<GridCacheEntry<K, V>> {
-        /** */
-        private int partId;
-
-        /**
-         * @param partId Partition id.
-         */
-        private PartitionEntrySet(int partId) {
-            this.partId = partId;
-        }
-
-        /** {@inheritDoc} */
-        @NotNull @Override public Iterator<GridCacheEntry<K, V>> iterator() {
-            final GridDhtLocalPartition<K, V> part = 
ctx.topology().localPartition(partId,
-                ctx.discovery().topologyVersion(), false);
-
-            Iterator<GridDhtCacheEntry<K, V>> partIt = part == null ? null : 
part.entries().iterator();
-
-            return new PartitionEntryIterator<>(partIt);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean remove(Object o) {
-            if (!(o instanceof GridCacheEntry))
-                return false;
-
-            GridCacheEntry<K, V> entry = (GridCacheEntry<K, V>)o;
-
-            K key = entry.getKey();
-            V val = entry.peek();
-
-            if (val == null)
-                return false;
-
-            try {
-                // Cannot use remove(key, val) since we may be in DHT cache 
and should go through near.
-                return entry(key).remove(val);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean removeAll(Collection<?> c) {
-            boolean rmv = false;
-
-            for (Object o : c)
-                rmv |= remove(o);
-
-            return rmv;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean contains(Object o) {
-            if (!(o instanceof GridCacheEntry))
-                return false;
-
-            GridCacheEntry<K, V> entry = (GridCacheEntry<K, V>)o;
-
-            return partId == entry.partition() && F.eq(entry.peek(), 
peek(entry.getKey()));
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            GridDhtLocalPartition<K, V> part = 
ctx.topology().localPartition(partId,
-                ctx.discovery().topologyVersion(), false);
-
-            return part != null ? part.publicSize() : 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(PartitionEntrySet.class, this, "super", 
super.toString());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<GridCacheClearAllRunnable<K, V>> splitClearAll() {
-        GridCacheDistributionMode mode = configuration().getDistributionMode();
-
-        return (mode == PARTITIONED_ONLY || mode == NEAR_PARTITIONED) ? 
super.splitClearAll() :
-            Collections.<GridCacheClearAllRunnable<K, V>>emptyList();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, 
GridCacheVersion ver) {
-        assert entry.isDht();
-
-        GridDhtLocalPartition<K, V> part = 
topology().localPartition(entry.partition(), -1, false);
-
-        // Do not remove entry on replica topology. Instead, add entry to 
removal queue.
-        // It will be cleared eventually.
-        if (part != null) {
-            try {
-                part.onDeferredDelete(entry.key(), ver);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to enqueue deleted entry [key=" + 
entry.key() + ", ver=" + ver + ']', e);
-            }
-        }
-    }
-
-    /**
-     * Complex partition iterator for both partition and swap iteration.
-     */
-    private static class PartitionEntryIterator<K, V> extends 
GridIteratorAdapter<GridCacheEntry<K, V>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Next entry. */
-        private GridCacheEntry<K, V> entry;
-
-        /** Last seen entry to support remove. */
-        private GridCacheEntry<K, V> last;
-
-        /** Partition iterator. */
-        private final Iterator<GridDhtCacheEntry<K, V>> partIt;
-
-        /**
-         * @param partIt Partition iterator.
-         */
-        private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry<K, 
V>> partIt) {
-            this.partIt = partIt;
-
-            advance();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNextX() {
-            return entry != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheEntry<K, V> nextX() throws 
IgniteCheckedException {
-            if (!hasNext())
-                throw new NoSuchElementException();
-
-            last = entry;
-
-            advance();
-
-            return last;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void removeX() throws IgniteCheckedException {
-            if (last == null)
-                throw new IllegalStateException();
-
-            last.remove();
-        }
-
-        /**
-         *
-         */
-        private void advance() {
-            if (partIt != null) {
-                while (partIt.hasNext()) {
-                    GridDhtCacheEntry<K, V> next = partIt.next();
-
-                    if (next.isInternal() || !next.visitable(CU.<K, V>empty()))
-                        continue;
-
-                    entry = next.wrap(true);
-
-                    return;
-                }
-            }
-
-            entry = null;
-        }
-    }
-
-    /**
-     * Multi update future.
-     */
-    private static class MultiUpdateFuture extends 
GridFutureAdapter<IgniteUuid> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Topology version. */
-        private long topVer;
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public MultiUpdateFuture() {
-            // No-op.
-        }
-
-        /**
-         * @param ctx Kernal context.
-         * @param topVer Topology version.
-         */
-        private MultiUpdateFuture(GridKernalContext ctx, long topVer) {
-            super(ctx);
-
-            this.topVer = topVer;
-        }
-
-        /**
-         * @return Topology version.
-         */
-        private long topologyVersion() {
-            return topVer;
-        }
-    }
-}

Reply via email to