http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 0000000,0c01c6c..5312f18
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -1,0 -1,818 +1,820 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.distributed.near.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ 
+ import static org.apache.ignite.cache.CacheFlag.*;
+ import static org.apache.ignite.cache.GridCachePeekMode.*;
+ 
+ /**
+  * Colocated cache.
+  */
+ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapter<K, V> {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /**
+      * Empty constructor required for {@link Externalizable}
+      */
+     public GridDhtColocatedCache() {
+         // No-op.
+     }
+ 
+     /**
+      * @param ctx Cache context.
+      */
+     public GridDhtColocatedCache(GridCacheContext<K, V> ctx) {
+         super(ctx);
+     }
+ 
+     /**
+      * Creates colocated cache with specified map.
+      *
+      * @param ctx Cache context.
+      * @param map Cache map.
+      */
+     public GridDhtColocatedCache(GridCacheContext<K, V> ctx, 
GridCacheConcurrentMap<K, V> map) {
+         super(ctx, map);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isColocated() {
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void init() {
+         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+             /** {@inheritDoc} */
+             @Override public GridCacheMapEntry<K, V> 
create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+                 return new GridDhtColocatedCacheEntry<>(ctx, topVer, key, 
hash, val, next, ttl, hdrId);
+             }
+         });
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void start() throws IgniteCheckedException {
+         super.start();
+ 
+         ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse<K, V>>() {
+             @Override public void apply(UUID nodeId, GridNearGetResponse<K, 
V> res) {
+                 processGetResponse(nodeId, res);
+             }
+         });
+ 
+         ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new 
CI2<UUID, GridNearLockResponse<K, V>>() {
+             @Override public void apply(UUID nodeId, GridNearLockResponse<K, 
V> res) {
+                 processLockResponse(nodeId, res);
+             }
+         });
+     }
+ 
+     /**
+      * Gets or creates entry for given key and given topology version.
+      *
+      * @param key Key for entry.
+      * @param topVer Topology version.
+      * @param allowDetached Whether to allow detached entries. If {@code 
true} and node is not primary
+      *      for given key, a new detached entry will be created. Otherwise, 
entry will be obtained from
+      *      dht cache map.
+      * @return Cache entry.
+      * @throws GridDhtInvalidPartitionException If {@code allowDetached} is 
false and node is not primary
+      *      for given key.
+      */
+     public GridDistributedCacheEntry<K, V> entryExx(K key, long topVer, 
boolean allowDetached) {
+         return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, 
topVer) ?
+             new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, 
null, 0, 0) : entryExx(key, topVer);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V peek(K key, @Nullable Collection<GridCachePeekMode> 
modes) throws IgniteCheckedException {
+         GridTuple<V> val = null;
+ 
+         if (!modes.contains(NEAR_ONLY)) {
+             try {
+                 val = peek0(true, key, modes, ctx.tm().txx());
+             }
+             catch (GridCacheFilterFailedException ignored) {
+                 if (log.isDebugEnabled())
+                     log.debug("Filter validation failed for key: " + key);
+ 
+                 return null;
+             }
+         }
+ 
+         return val != null ? val.get() : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isLocked(K key) {
+         return ctx.mvcc().isLockedByThread(key, -1);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isLockedByThread(K key) {
+         return ctx.mvcc().isLockedByThread(key, 
Thread.currentThread().getId());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Map<K, V>> getAllAsync(
+         @Nullable final Collection<? extends K> keys,
+         boolean forcePrimary,
+         boolean skipTx,
+         @Nullable final GridCacheEntryEx<K, V> entry,
+         @Nullable UUID subjId,
+         String taskName,
+         final boolean deserializePortable,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+     ) {
+         ctx.denyOnFlag(LOCAL);
+         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+ 
+         if (F.isEmpty(keys))
+             return new GridFinishedFuture<>(ctx.kernalContext(), 
Collections.<K, V>emptyMap());
+ 
+         IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx();
+ 
+         if (tx != null && !tx.implicit() && !skipTx) {
+             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
+                 @Override public IgniteFuture<Map<K, V>> 
op(IgniteTxLocalAdapter<K, V> tx) {
+                     return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, 
deserializePortable, filter));
+                 }
+             });
+         }
+ 
+         long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : 
tx.topologyVersion();
+ 
+         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+ 
+         subjId = ctx.subjectIdPerCall(subjId, prj);
+ 
+         return loadAsync(keys,
+             true,
+             false,
+             forcePrimary,
+             topVer,
+             subjId,
+             taskName,
+             deserializePortable,
+             filter,
+             accessExpiryPolicy(prj != null ? prj.expiry() : null));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected GridCacheEntryEx<K, V> entryExSafe(K key, long 
topVer) {
+         try {
+             return ctx.affinity().localNode(key, topVer) ? entryEx(key) : 
null;
+         }
+         catch (GridDhtInvalidPartitionException ignored) {
+             return null;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean containsKey(K key, @Nullable 
IgnitePredicate<CacheEntry<K, V>> filter) {
+         A.notNull(key, "key");
+ 
+         // We need detached entry here because if there is an ongoing 
transaction,
+         // we should see this entry and apply filter.
+         GridCacheEntryEx<K, V> e = entryExx(key, 
ctx.affinity().affinityTopologyVersion(), true, true);
+ 
+         try {
+             return e != null && e.peek(SMART, filter) != null;
+         }
+         catch (GridCacheEntryRemovedException ignore) {
+             if (log.isDebugEnabled())
+                 log.debug("Got removed entry during peek (will ignore): " + 
e);
+ 
+             return false;
+         }
+     }
+ 
+     /**
+      * @param keys Keys to load.
+      * @param readThrough Read through flag.
+      * @param reload Reload flag.
+      * @param forcePrimary Force get from primary node flag.
+      * @param topVer Topology version.
+      * @param subjId Subject ID.
+      * @param taskName Task name.
+      * @param deserializePortable Deserialize portable flag.
+      * @param filter Filter.
+      * @param expiryPlc Expiry policy.
+      * @return Loaded values.
+      */
+     public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends 
K> keys,
+         boolean readThrough,
+         boolean reload,
+         boolean forcePrimary,
+         long topVer,
+         @Nullable UUID subjId,
+         String taskName,
+         boolean deserializePortable,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter,
+         @Nullable IgniteCacheExpiryPolicy expiryPlc) {
+         if (keys == null || keys.isEmpty())
+             return new GridFinishedFuture<>(ctx.kernalContext(), 
Collections.<K, V>emptyMap());
+ 
+         if (keyCheck)
+             validateCacheKeys(keys);
+ 
+         if (expiryPlc == null)
+             expiryPlc = accessExpiryPolicy(ctx.expiry());
+ 
+         // Optimisation: try to resolve value locally and escape 'get future' 
creation.
+         if (!reload && !forcePrimary) {
+             Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f);
+ 
+             boolean success = true;
+ 
+             // Optimistically expect that all keys are available locally 
(avoid creation of get future).
+             for (K key : keys) {
+                 GridCacheEntryEx<K, V> entry = null;
+ 
+                 while (true) {
+                     try {
+                         entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : 
peekEx(key);
+ 
+                         // If our DHT cache do has value, then we peek it.
+                         if (entry != null) {
+                             boolean isNew = entry.isNewLocked();
+ 
+                             V v = entry.innerGet(null,
+                                 /*swap*/true,
+                                 /*read-through*/false,
+                                 /*fail-fast*/true,
+                                 /*unmarshal*/true,
 -                                /**update-metrics*/true,
++                                /**update-metrics*/false,
+                                 /*event*/true,
+                                 /*temporary*/false,
+                                 subjId,
+                                 null,
+                                 taskName,
+                                 filter,
+                                 expiryPlc);
+ 
+                             // Entry was not in memory or in swap, so we 
remove it from cache.
+                             if (v == null) {
+                                 GridCacheVersion obsoleteVer = 
context().versions().next();
+ 
+                                 if (isNew && 
entry.markObsoleteIfEmpty(obsoleteVer))
+                                     removeIfObsolete(key);
+ 
+                                 success = false;
+                             }
+                             else {
+                                 if (ctx.portableEnabled())
+                                     v = (V)ctx.unwrapPortableIfNeeded(v, 
!deserializePortable);
+ 
+                                 locVals.put(key, v);
+                             }
+                         }
+                         else
+                             success = false;
+ 
+                         break; // While.
+                     }
+                     catch (GridCacheEntryRemovedException ignored) {
+                         // No-op, retry.
+                     }
+                     catch (GridCacheFilterFailedException ignored) {
+                         // No-op, skip the key.
+                         break;
+                     }
+                     catch (GridDhtInvalidPartitionException ignored) {
+                         success = false;
+ 
+                         break; // While.
+                     }
+                     catch (IgniteCheckedException e) {
+                         return new GridFinishedFuture<>(ctx.kernalContext(), 
e);
+                     }
+                     finally {
+                         if (entry != null)
+                             context().evicts().touch(entry, topVer);
+                     }
+                 }
+ 
+                 if (!success)
+                     break;
++                else
++                    ctx.cache().metrics0().onRead(true);
+             }
+ 
+             if (success) {
+                 sendTtlUpdateRequest(expiryPlc);
+ 
+                 return ctx.wrapCloneMap(new 
GridFinishedFuture<>(ctx.kernalContext(), locVals));
+             }
+         }
+ 
+         if (expiryPlc != null)
+             expiryPlc.reset();
+ 
+         // Either reload or not all values are available locally.
+         GridPartitionedGetFuture<K, V> fut = new 
GridPartitionedGetFuture<>(ctx,
+             keys,
+             topVer,
+             readThrough,
+             reload,
+             forcePrimary,
+             filter,
+             subjId,
+             taskName,
+             deserializePortable,
+             expiryPlc);
+ 
+         fut.init();
+ 
+         return ctx.wrapCloneMap(fut);
+     }
+ 
+     /**
+      * This is an entry point to pessimistic locking within transaction.
+      *
+      * {@inheritDoc}
+      */
+     @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends 
K> keys,
+         long timeout,
+         @Nullable IgniteTxLocalEx<K, V> tx,
+         boolean isInvalidate,
+         boolean isRead,
+         boolean retval,
+         @Nullable IgniteTxIsolation isolation,
+         long accessTtl,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         assert tx == null || tx instanceof GridNearTxLocal;
+ 
+         GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx;
+ 
+         GridDhtColocatedLockFuture<K, V> fut = new 
GridDhtColocatedLockFuture<>(ctx,
+             keys,
+             txx,
+             isRead,
+             retval,
+             timeout,
+             accessTtl,
+             filter);
+ 
+         // Future will be added to mvcc only if it was mapped to remote nodes.
+         fut.map();
+ 
+         return fut;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridNearTransactionalCache<K, V> near() {
+         assert false : "Near cache is not available in colocated mode.";
+ 
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public CacheEntry<K, V> entry(K key) throws 
GridDhtInvalidPartitionException {
+         return new GridDhtCacheEntryImpl<>(ctx.projectionPerCall(), ctx, key, 
null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void unlockAll(Collection<? extends K> keys,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         if (keys.isEmpty())
+             return;
+ 
+         try {
+             GridCacheVersion ver = null;
+ 
+             int keyCnt = -1;
+ 
+             Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null;
+ 
+             Collection<K> locKeys = new LinkedList<>();
+ 
+             for (K key : keys) {
+                 GridDistributedCacheEntry<K, V> entry = peekExx(key);
+ 
+                 CacheEntry<K, V> cacheEntry = entry == null ? entry(key) : 
entry.wrap(false);
+ 
+                 if (!ctx.isAll(cacheEntry, filter))
+                     break; // While.
+ 
+                 GridCacheMvccCandidate lock = 
ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), key, null);
+ 
+                 if (lock != null) {
+                     final long topVer = lock.topologyVersion();
+ 
+                     assert topVer > 0;
+ 
+                     if (map == null) {
+                         Collection<ClusterNode> affNodes = CU.allNodes(ctx, 
topVer);
+ 
+                         keyCnt = (int)Math.ceil((double)keys.size() / 
affNodes.size());
+ 
+                         map = U.newHashMap(affNodes.size());
+                     }
+ 
+                     if (ver == null)
+                         ver = lock.version();
+ 
+                     // Send request to remove from remote nodes.
+                     ClusterNode primary = ctx.affinity().primary(key, topVer);
+ 
+                     if (!lock.reentry()) {
+                         if (!ver.equals(lock.version()))
+                             throw new IgniteCheckedException("Failed to 
unlock (if keys were locked separately, " +
+                                 "then they need to be unlocked separately): " 
+ keys);
+ 
+                         if (!primary.isLocal()) {
+                             GridNearUnlockRequest<K, V> req = 
map.get(primary);
+ 
+                             if (req == null) {
+                                 map.put(primary, req = new 
GridNearUnlockRequest<>(ctx.cacheId(), keyCnt));
+ 
+                                 req.version(ver);
+                             }
+ 
+                             byte[] keyBytes = entry != null ? 
entry.getOrMarshalKeyBytes() : CU.marshal(ctx.shared(), key);
+ 
+                             req.addKey(key, keyBytes, ctx);
+                         }
+                         else
+                             locKeys.add(key);
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Removed lock (will distribute): " + 
lock);
+                     }
+                     else if (log.isDebugEnabled())
+                         log.debug("Current thread still owns lock (or there 
are no other nodes)" +
+                             " [lock=" + lock + ", curThreadId=" + 
Thread.currentThread().getId() + ']');
+                 }
+             }
+ 
+             if (ver == null)
+                 return;
+ 
+             if (!locKeys.isEmpty())
+                 removeLocks(ctx.localNodeId(), ver, locKeys, true);
+ 
+             for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping 
: map.entrySet()) {
+                 ClusterNode n = mapping.getKey();
+ 
+                 GridDistributedUnlockRequest<K, V> req = mapping.getValue();
+ 
+                 assert !n.isLocal();
+ 
+                 if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys()))
+                     // We don't wait for reply to this message.
+                     ctx.io().send(n, req);
+             }
+         }
+         catch (IgniteCheckedException ex) {
+             U.error(log, "Failed to unlock the lock for keys: " + keys, ex);
+         }
+     }
+ 
+     /**
+      * Removes locks regardless of whether they are owned or not for given
+      * version and keys.
+      *
+      * @param threadId Thread ID.
+      * @param ver Lock version.
+      * @param keys Keys.
+      */
+     public void removeLocks(long threadId, GridCacheVersion ver, Collection<? 
extends K> keys) {
+         if (keys.isEmpty())
+             return;
+ 
+         try {
+             int keyCnt = -1;
+ 
+             Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null;
+ 
+             Collection<K> locKeys = new LinkedList<>();
+ 
+             for (K key : keys) {
+                 GridCacheMvccCandidate<K> lock = 
ctx.mvcc().removeExplicitLock(threadId, key, ver);
+ 
+                 if (lock != null) {
+                     long topVer = lock.topologyVersion();
+ 
+                     if (map == null) {
+                         Collection<ClusterNode> affNodes = CU.allNodes(ctx, 
topVer);
+ 
+                         keyCnt = (int)Math.ceil((double)keys.size() / 
affNodes.size());
+ 
+                         map = U.newHashMap(affNodes.size());
+                     }
+ 
+                     ClusterNode primary = ctx.affinity().primary(key, topVer);
+ 
+                     if (!primary.isLocal()) {
+                         // Send request to remove from remote nodes.
+                         GridNearUnlockRequest<K, V> req = map.get(primary);
+ 
+                         if (req == null) {
+                             map.put(primary, req = new 
GridNearUnlockRequest<>(ctx.cacheId(), keyCnt));
+ 
+                             req.version(ver);
+                         }
+ 
+                         GridCacheEntryEx<K, V> entry = peekEx(key);
+ 
+                         byte[] keyBytes = entry != null ? 
entry.getOrMarshalKeyBytes() : CU.marshal(ctx.shared(), key);
+ 
+                         req.addKey(key, keyBytes, ctx);
+                     }
+                     else
+                         locKeys.add(key);
+                 }
+             }
+ 
+             if (!locKeys.isEmpty())
+                 removeLocks(ctx.localNodeId(), ver, locKeys, true);
+ 
+             if (map == null || map.isEmpty())
+                 return;
+ 
+             Collection<GridCacheVersion> committed = 
ctx.tm().committedVersions(ver);
+             Collection<GridCacheVersion> rolledback = 
ctx.tm().rolledbackVersions(ver);
+ 
+             for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping 
: map.entrySet()) {
+                 ClusterNode n = mapping.getKey();
+ 
+                 GridDistributedUnlockRequest<K, V> req = mapping.getValue();
+ 
+                 if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) {
+                     req.completedVersions(committed, rolledback);
+ 
+                     // We don't wait for reply to this message.
+                     ctx.io().send(n, req);
+                 }
+             }
+         }
+         catch (IgniteCheckedException ex) {
+             U.error(log, "Failed to unlock the lock for keys: " + keys, ex);
+         }
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param tx Started colocated transaction (if any).
+      * @param threadId Thread ID.
+      * @param ver Lock version.
+      * @param topVer Topology version.
+      * @param keys Mapped keys.
+      * @param txRead Tx read.
+      * @param timeout Lock timeout.
+      * @param accessTtl TTL for read operation.
+      * @param filter filter Optional filter.
+      * @return Lock future.
+      */
+     IgniteFuture<Exception> lockAllAsync(
+         final GridCacheContext<K, V> cacheCtx,
+         @Nullable final GridNearTxLocal<K, V> tx,
+         final long threadId,
+         final GridCacheVersion ver,
+         final long topVer,
+         final Collection<K> keys,
+         final boolean txRead,
+         final long timeout,
+         final long accessTtl,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+     ) {
+         assert keys != null;
+ 
+         IgniteFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, 
topVer);
+ 
+         // Prevent embedded future creation if possible.
+         if (keyFut.isDone()) {
+             try {
+                 // Check for exception.
+                 keyFut.get();
+ 
+                 return lockAllAsync0(cacheCtx,
+                     tx,
+                     threadId,
+                     ver,
+                     topVer,
+                     keys,
+                     txRead,
+                     timeout,
+                     accessTtl,
+                     filter);
+             }
+             catch (IgniteCheckedException e) {
+                 return new GridFinishedFuture<>(ctx.kernalContext(), e);
+             }
+         }
+         else {
+             return new GridEmbeddedFuture<>(true, keyFut,
+                 new C2<Object, Exception, IgniteFuture<Exception>>() {
+                     @Override public IgniteFuture<Exception> apply(Object o, 
Exception exx) {
+                         if (exx != null)
+                             return new 
GridDhtFinishedFuture<>(ctx.kernalContext(), exx);
+ 
+                         return lockAllAsync0(cacheCtx,
+                             tx,
+                             threadId,
+                             ver,
+                             topVer,
+                             keys,
+                             txRead,
+                             timeout,
+                             accessTtl,
+                             filter);
+                     }
+                 },
+                 ctx.kernalContext());
+         }
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param tx Started colocated transaction (if any).
+      * @param threadId Thread ID.
+      * @param ver Lock version.
+      * @param topVer Topology version.
+      * @param keys Mapped keys.
+      * @param txRead Tx read.
+      * @param timeout Lock timeout.
+      * @param accessTtl TTL for read operation.
+      * @param filter filter Optional filter.
+      * @return Lock future.
+      */
+     private IgniteFuture<Exception> lockAllAsync0(
+         GridCacheContext<K, V> cacheCtx,
+         @Nullable final GridNearTxLocal<K, V> tx,
+         long threadId,
+         final GridCacheVersion ver,
+         final long topVer,
+         final Collection<K> keys,
+         final boolean txRead,
+         final long timeout,
+         final long accessTtl,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         int cnt = keys.size();
+ 
+         if (tx == null) {
+             GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+                 ctx.localNodeId(),
+                 ver,
+                 topVer,
+                 cnt,
+                 txRead,
+                 timeout,
+                 tx,
+                 threadId,
+                 accessTtl,
+                 filter);
+ 
+             // Add before mapping.
+             if (!ctx.mvcc().addFuture(fut))
+                 throw new IllegalStateException("Duplicate future ID: " + 
fut);
+ 
+             boolean timedout = false;
+ 
+             for (K key : keys) {
+                 if (timedout)
+                     break;
+ 
+                 while (true) {
+                     GridDhtCacheEntry<K, V> entry = entryExx(key, topVer);
+ 
+                     try {
+                         fut.addEntry(key == null ? null : entry);
+ 
+                         if (fut.isDone())
+                             timedout = true;
+ 
+                         break;
+                     }
+                     catch (GridCacheEntryRemovedException ignore) {
+                         if (log.isDebugEnabled())
+                             log.debug("Got removed entry when adding lock 
(will retry): " + entry);
+                     }
+                     catch (GridDistributedLockCancelledException e) {
+                         if (log.isDebugEnabled())
+                             log.debug("Got lock request for cancelled lock 
(will ignore): " +
+                                 entry);
+ 
+                         fut.onError(e);
+ 
+                         return new 
GridDhtFinishedFuture<>(ctx.kernalContext(), e);
+                     }
+                 }
+             }
+ 
+             // This will send remote messages.
+             fut.map();
+ 
+             return new GridDhtEmbeddedFuture<>(
+                 ctx.kernalContext(),
+                 fut,
+                 new C2<Boolean, Exception, Exception>() {
+                     @Override public Exception apply(Boolean b, Exception e) {
+                         if (e != null)
+                             e = U.unwrap(e);
+                         else if (!b)
+                             e = new GridCacheLockTimeoutException(ver);
+ 
+                         return e;
+                     }
+                 });
+         }
+         else {
+             // Handle implicit locks for pessimistic transactions.
+             ctx.tm().txContext(tx);
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Performing colocated lock [tx=" + tx + ", keys=" + 
keys + ']');
+ 
+             IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx,
+                 keys,
+                 tx.implicit(),
+                 txRead,
+                 accessTtl);
+ 
+             return new GridDhtEmbeddedFuture<>(
+                 ctx.kernalContext(),
+                 txFut,
+                 new C2<GridCacheReturn<V>, Exception, Exception>() {
+                     @Override public Exception apply(GridCacheReturn<V> ret,
+                         Exception e) {
+                         if (e != null)
+                             e = U.unwrap(e);
+ 
+                         assert !tx.empty();
+ 
+                         return e;
+                     }
+                 });
+         }
+     }
+ 
+     /**
+      * @param nodeId Sender ID.
+      * @param res Response.
+      */
+     private void processGetResponse(UUID nodeId, GridNearGetResponse<K, V> 
res) {
+         GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, 
V>)ctx.mvcc().<Map<K, V>>future(
+             res.version(), res.futureId());
+ 
+         if (fut == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to find future for get response [sender=" + 
nodeId + ", res=" + res + ']');
+ 
+             return;
+         }
+ 
+         fut.onResult(nodeId, res);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param res Response.
+      */
+     private void processLockResponse(UUID nodeId, GridNearLockResponse<K, V> 
res) {
+         assert nodeId != null;
+         assert res != null;
+ 
+         GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, 
V>)ctx.mvcc().
+             <Boolean>future(res.version(), res.futureId());
+ 
+         if (fut != null)
+             fut.onResult(nodeId, res);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 0000000,25718fc..05dda8a
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@@ -1,0 -1,792 +1,785 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ 
+ import javax.cache.expiry.*;
+ import java.io.*;
+ import java.util.*;
+ 
+ import static org.apache.ignite.cache.CacheFlag.*;
+ import static org.apache.ignite.cache.GridCachePeekMode.*;
+ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
+ 
+ /**
+  * Common logic for near caches.
+  */
+ public abstract class GridNearCacheAdapter<K, V> extends 
GridDistributedCacheAdapter<K, V> {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /**
+      * Empty constructor required for {@link Externalizable}.
+      */
+     protected GridNearCacheAdapter() {
+         // No-op.
+     }
+ 
+     /**
+      * @param ctx Context.
+      */
+     protected GridNearCacheAdapter(GridCacheContext<K, V> ctx) {
+         super(ctx, ctx.config().getNearStartSize());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void init() {
+         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+             /** {@inheritDoc} */
+             @Override public GridCacheMapEntry<K, V> 
create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+                 // Can't hold any locks here - this method is invoked when
+                 // holding write-lock on the whole cache map.
+                 return new GridNearCacheEntry<>(ctx, key, hash, val, next, 
ttl, hdrId);
+             }
+         });
+     }
+ 
+     /**
+      * @return DHT cache.
+      */
+     public abstract GridDhtCacheAdapter<K, V> dht();
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isNear() {
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCachePreloader<K, V> preloader() {
+         return dht().preloader();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheEntryEx<K, V> entryEx(K key, boolean touch) {
+         GridNearCacheEntry<K, V> entry = null;
+ 
+         while (true) {
+             try {
+                 entry = (GridNearCacheEntry<K, V>)super.entryEx(key, touch);
+ 
+                 
entry.initializeFromDht(ctx.affinity().affinityTopologyVersion());
+ 
+                 return entry;
+             }
+             catch (GridCacheEntryRemovedException ignore) {
+                 if (log.isDebugEnabled())
+                     log.debug("Got removed near entry while initializing from 
DHT entry (will retry): " + entry);
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) {
+         GridNearCacheEntry<K, V> entry = null;
+ 
+         while (true) {
+             try {
+                 entry = (GridNearCacheEntry<K, V>)super.entryEx(key, topVer);
+ 
+                 entry.initializeFromDht(topVer);
+ 
+                 return entry;
+             }
+             catch (GridCacheEntryRemovedException ignore) {
+                 if (log.isDebugEnabled())
+                     log.debug("Got removed near entry while initializing from 
DHT entry (will retry): " + entry);
+             }
+         }
+     }
+ 
+     /**
+      * @param key Key.
+      * @param topVer Topology version.
+      * @return Entry.
+      */
+     public GridNearCacheEntry<K, V> entryExx(K key, long topVer) {
+         return (GridNearCacheEntry<K, V>)entryEx(key, topVer);
+     }
+ 
+     /**
+      * @param key Key.
+      * @return Entry.
+      */
+     @Nullable public GridNearCacheEntry<K, V> peekExx(K key) {
+         return (GridNearCacheEntry<K, V>)peekEx(key);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isLocked(K key) {
+         return super.isLocked(key) || dht().isLocked(key);
+     }
+ 
+     /**
+      * @param key Key.
+      * @return If near entry is locked.
+      */
+     public boolean isLockedNearOnly(K key) {
+         return super.isLocked(key);
+     }
+ 
+     /**
+      * @param keys Keys.
+      * @return If near entries for given keys are locked.
+      */
+     public boolean isAllLockedNearOnly(Iterable<? extends K> keys) {
+         A.notNull(keys, "keys");
+ 
+         for (K key : keys)
+             if (!isLockedNearOnly(key))
+                 return false;
+ 
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"unchecked", "RedundantCast"})
+     @Override public IgniteFuture<Object> readThroughAllAsync(Collection<? 
extends K> keys, boolean reload,
+         IgniteTxEx<K, V> tx, IgnitePredicate<CacheEntry<K, V>>[] filter, 
@Nullable UUID subjId, String taskName,
+         IgniteBiInClosure<K, V> vis) {
+         return (IgniteFuture)loadAsync(tx,
+             keys,
+             reload,
+             false,
+             filter,
+             subjId,
+             taskName,
+             true,
+             null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void reloadAll(@Nullable Collection<? extends K> keys,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws 
IgniteCheckedException {
+         dht().reloadAll(keys, filter);
+ 
+         super.reloadAll(keys, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? 
extends K> keys,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext());
+ 
+         fut.add(super.reloadAllAsync(keys, filter));
+         fut.add(dht().reloadAllAsync(keys, filter));
+ 
+         fut.markInitialized();
+ 
+         return fut;
+ 
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V reload(K key, @Nullable IgnitePredicate<CacheEntry<K, 
V>>... filter)
+         throws IgniteCheckedException {
+         V val;
+ 
+         try {
+             val = dht().reload(key, filter);
+         }
+         catch (GridDhtInvalidPartitionException ignored) {
+             return null;
+         }
+ 
+         V nearVal = super.reload(key, filter);
+ 
+         return val == null ? nearVal : val;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void reloadAll() throws IgniteCheckedException {
+         super.reloadAll();
+ 
+         dht().reloadAll();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"unchecked"})
+     @Override public IgniteFuture<?> reloadAllAsync() {
+         GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext());
+ 
+         fut.add(super.reloadAllAsync());
+         fut.add(dht().reloadAllAsync());
+ 
+         fut.markInitialized();
+ 
+         return fut;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"unchecked"})
+     @Override public IgniteFuture<?> reloadAllAsync(@Nullable 
IgnitePredicate<CacheEntry<K, V>> filter) {
+         GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext());
+ 
+         fut.add(super.reloadAllAsync());
+         fut.add(dht().reloadAllAsync(filter));
+ 
+         fut.markInitialized();
+ 
+         return fut;
+     }
+ 
+     /**
+      * @param tx Transaction.
+      * @param keys Keys to load.
+      * @param reload Reload flag.
+      * @param forcePrimary Force primary flag.
+      * @param filter Filter.
+      * @param subjId Subject ID.
+      * @param taskName Task name.
+      * @param deserializePortable Deserialize portable flag.
+      * @param expiryPlc Expiry policy.
+      * @return Loaded values.
+      */
+     public IgniteFuture<Map<K, V>> loadAsync(@Nullable IgniteTxEx tx,
+         @Nullable Collection<? extends K> keys,
+         boolean reload,
+         boolean forcePrimary,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter,
+         @Nullable UUID subjId,
+         String taskName,
+         boolean deserializePortable,
+         @Nullable ExpiryPolicy expiryPlc) {
+         if (F.isEmpty(keys))
+             return new GridFinishedFuture<>(ctx.kernalContext(), 
Collections.<K, V>emptyMap());
+ 
+         if (keyCheck)
+             validateCacheKeys(keys);
+ 
+         IgniteTxLocalEx<K, V> txx = (tx != null && tx.local()) ? 
(IgniteTxLocalEx<K, V>)tx : null;
+ 
+         final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
+ 
+         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
+             keys,
+             true,
+             reload,
+             forcePrimary,
+             txx,
+             filter,
+             subjId,
+             taskName,
+             deserializePortable,
+             expiry);
+ 
+         // init() will register future for responses if future has remote 
mappings.
+         fut.init();
+ 
+         return ctx.wrapCloneMap(fut);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void loadCache(IgniteBiPredicate<K, V> p, long ttl, 
Object[] args) throws IgniteCheckedException {
+         dht().loadCache(p, ttl, args);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void localLoad(Collection<? extends K> keys) throws 
IgniteCheckedException {
+         dht().localLoad(keys);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> loadCacheAsync(IgniteBiPredicate<K, V> 
p, long ttl, Object[] args) {
+         return dht().loadCacheAsync(p, ttl, args);
+     }
+ 
 -    /** {@inheritDoc} */
 -    @Override public void resetMetrics() {
 -        super.resetMetrics();
 -
 -        dht().resetMetrics();
 -    }
 -
+     /**
+      * @param nodeId Sender ID.
+      * @param res Response.
+      */
+     protected void processGetResponse(UUID nodeId, GridNearGetResponse<K, V> 
res) {
+         GridNearGetFuture<K, V> fut = (GridNearGetFuture<K, 
V>)ctx.mvcc().<Map<K, V>>future(
+             res.version(), res.futureId());
+ 
+         if (fut == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to find future for get response [sender=" + 
nodeId + ", res=" + res + ']');
+ 
+             return;
+         }
+ 
+         fut.onResult(nodeId, res);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int size() {
+         return super.size() + dht().size();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int primarySize() {
+         return dht().primarySize();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int nearSize() {
+         return super.size();
+     }
+ 
+     /**
+      * @return Near entries.
+      */
+     public Set<CacheEntry<K, V>> nearEntries() {
+         return super.entrySet(CU.<K, V>empty());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Set<CacheEntry<K, V>> entrySet(
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         return new EntrySet(super.entrySet(filter), dht().entrySet(filter));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Set<CacheEntry<K, V>> entrySet(int part) {
+         return dht().entrySet(part);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Set<CacheEntry<K, V>> primaryEntrySet(
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter) {
+         final long topVer = ctx.affinity().affinityTopologyVersion();
+ 
+         Collection<CacheEntry<K, V>> entries =
+             F.flatCollections(
+                 F.viewReadOnly(
+                     dht().topology().currentLocalPartitions(),
+                     new C1<GridDhtLocalPartition<K, V>, 
Collection<CacheEntry<K, V>>>() {
+                         @Override public Collection<CacheEntry<K, V>> 
apply(GridDhtLocalPartition<K, V> p) {
+                             return F.viewReadOnly(
+                                 p.entries(),
+                                 new C1<GridDhtCacheEntry<K, V>, CacheEntry<K, 
V>>() {
+                                     @Override public CacheEntry<K, V> 
apply(GridDhtCacheEntry<K, V> e) {
+                                         return e.wrap(true);
+                                     }
+                                 },
+                                 new P1<GridDhtCacheEntry<K, V>>() {
+                                     @Override public boolean 
apply(GridDhtCacheEntry<K, V> e) {
+                                         return !e.obsoleteOrDeleted();
+                                     }
+                                 });
+                         }
+                     },
+                     new P1<GridDhtLocalPartition<K, V>>() {
+                         @Override public boolean 
apply(GridDhtLocalPartition<K, V> p) {
+                             return p.primary(topVer);
+                         }
+                     }));
+ 
+         return new GridCacheEntrySet<>(ctx, entries, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, 
V>>[] filter) {
+         return new GridCacheKeySet<>(ctx, entrySet(filter), null);
+     }
+ 
+     /**
+      * @param filter Entry filter.
+      * @return Keys for near cache only.
+      */
+     public Set<K> nearKeySet(@Nullable IgnitePredicate<CacheEntry<K, V>> 
filter) {
+         return super.keySet(filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Set<K> primaryKeySet(@Nullable 
IgnitePredicate<CacheEntry<K, V>>... filter) {
+         return new GridCacheKeySet<>(ctx, primaryEntrySet(filter), null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<V> values(IgnitePredicate<CacheEntry<K, 
V>>... filter) {
+         return new GridCacheValueCollection<>(ctx, entrySet(filter), 
ctx.vararg(F.<K, V>cacheHasPeekValue()));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<V> primaryValues(@Nullable 
IgnitePredicate<CacheEntry<K, V>>... filter) {
+         return new GridCacheValueCollection<>(ctx, entrySet(filter), 
ctx.vararg(F.<K, V>cachePrimary()));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean containsKey(K key, IgnitePredicate<CacheEntry<K, 
V>> filter) {
+         return super.containsKey(key, filter) || dht().containsKey(key, 
filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean evict(K key, @Nullable 
IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         // Use unary 'and' to make sure that both sides execute.
+         return super.evict(key, filter) & dht().evict(key, filter);
+     }
+ 
+     /**
+      * @param key Key to evict.
+      * @param filter Optional filter.
+      * @return {@code True} if evicted.
+      */
+     public boolean evictNearOnly(K key, @Nullable 
IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         return super.evict(key, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void evictAll(Collection<? extends K> keys,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         super.evictAll(keys, filter);
+ 
+         dht().evictAll(keys, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean compact(K key,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) throws 
IgniteCheckedException {
+         return super.compact(key, filter) | dht().compact(key, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public CacheEntry<K, V> entry(K key) {
+         // We don't try wrap entry from near or dht cache.
+         // Created object will be wrapped once some method is called.
+         return new GridPartitionedCacheEntryImpl<>(ctx.projectionPerCall(), 
ctx, key, null);
+     }
+ 
+     /**
+      * Peeks only near cache without looking into DHT cache.
+      *
+      * @param key Key.
+      * @return Peeked value.
+      */
+     @Nullable public V peekNearOnly(K key) {
+         try {
+             GridTuple<V> peek = peek0(true, key, SMART, CU.<K, V>empty());
+ 
+             return peek != null ? peek.get() : null;
+         }
+         catch (GridCacheFilterFailedException ignored) {
+             if (log.isDebugEnabled())
+                 log.debug("Filter validation failed for key: " + key);
+ 
+             return null;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V peek(K key, @Nullable IgnitePredicate<CacheEntry<K, 
V>> filter) {
+         try {
+             GridTuple<V> res = peek0(false, key, SMART, filter);
+ 
+             if (res != null)
+                 return res.get();
+         }
+         catch (GridCacheFilterFailedException e) {
+             e.printStackTrace();
+ 
+             assert false : "Filter should not fail since fail-fast is false";
+         }
+ 
+         return dht().peek(key, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V peek(K key, @Nullable Collection<GridCachePeekMode> 
modes) throws IgniteCheckedException {
+         GridTuple<V> val = null;
+ 
+         if (!modes.contains(PARTITIONED_ONLY)) {
+             try {
+                 val = peek0(true, key, modes, ctx.tm().txx());
+             }
+             catch (GridCacheFilterFailedException ignored) {
+                 if (log.isDebugEnabled())
+                     log.debug("Filter validation failed for key: " + key);
+ 
+                 return null;
+             }
+         }
+ 
+         if (val != null)
+             return val.get();
+ 
+         return !modes.contains(NEAR_ONLY) ? dht().peek(key, modes) : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Map<K, V> peekAll(@Nullable Collection<? extends K> keys,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         final Map<K, V> resMap = super.peekAll(keys, filter);
+ 
+         if (resMap.size() != keys.size())
+             resMap.putAll(dht().peekAll(keys, F.and(filter, new 
IgnitePredicate<CacheEntry<K, V>>() {
+                 @Override public boolean apply(CacheEntry<K, V> e) {
+                     return !resMap.containsKey(e.getKey());
+                 }
+             })));
+ 
+         return resMap;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean clear0(K key, @Nullable 
IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         return super.clear0(key, filter) | dht().clear0(key, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void clearAll0(Collection<? extends K> keys,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         super.clearAll0(keys, filter);
+ 
+         dht().clearAll0(keys, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V promote(K key, boolean deserializePortable) throws 
IgniteCheckedException {
+         ctx.denyOnFlags(F.asList(READ, SKIP_SWAP));
+ 
+         // Unswap only from dht(). Near cache does not have swap storage.
+         return dht().promote(key, deserializePortable);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V promote(K key) throws IgniteCheckedException {
+         ctx.denyOnFlags(F.asList(READ, SKIP_SWAP));
+ 
+         // Unswap only from dht(). Near cache does not have swap storage.
+         return dht().promote(key);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void promoteAll(@Nullable Collection<? extends K> keys) 
throws IgniteCheckedException {
+         ctx.denyOnFlags(F.asList(READ, SKIP_SWAP));
+ 
+         // Unswap only from dht(). Near cache does not have swap storage.
+         // In near-only cache this is a no-op.
+         if (isAffinityNode(ctx.config()))
+             dht().promoteAll(keys);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Iterator<Map.Entry<K, V>> swapIterator() throws 
IgniteCheckedException {
+         ctx.denyOnFlags(F.asList(SKIP_SWAP));
+ 
+         return dht().swapIterator();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Iterator<Map.Entry<K, V>> offHeapIterator() throws 
IgniteCheckedException {
+         return dht().offHeapIterator();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long offHeapEntriesCount() {
+         return dht().offHeapEntriesCount();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long offHeapAllocatedSize() {
+         return dht().offHeapAllocatedSize();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long swapSize() throws IgniteCheckedException {
+         return dht().swapSize();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long swapKeys() throws IgniteCheckedException {
+         return dht().swapKeys();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isGgfsDataCache() {
+         return dht().isGgfsDataCache();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long ggfsDataSpaceUsed() {
+         return dht().ggfsDataSpaceUsed();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long ggfsDataSpaceMax() {
+         return dht().ggfsDataSpaceMax();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onGgfsDataSizeChanged(long delta) {
+         dht().onGgfsDataSizeChanged(delta);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isMongoDataCache() {
+         return dht().isMongoDataCache();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isMongoMetaCache() {
+         return dht().isMongoMetaCache();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public List<GridCacheClearAllRunnable<K, V>> splitClearAll() {
+         switch (configuration().getDistributionMode()) {
+             case NEAR_PARTITIONED:
+                 GridCacheVersion obsoleteVer = ctx.versions().next();
+ 
+                 List<GridCacheClearAllRunnable<K, V>> dhtJobs = 
dht().splitClearAll();
+ 
+                 List<GridCacheClearAllRunnable<K, V>> res = new 
ArrayList<>(dhtJobs.size());
+ 
+                 for (GridCacheClearAllRunnable<K, V> dhtJob : dhtJobs)
+                     res.add(new GridNearCacheClearAllRunnable<>(this, 
obsoleteVer, dhtJob));
+ 
+                 return res;
+ 
+             case NEAR_ONLY:
+                 return super.splitClearAll();
+ 
+             default:
+                 assert false : "Invalid partition distribution mode.";
+ 
+                 return null;
+         }
+     }
+ 
+     /**
+      * Wrapper for entry set.
+      */
+     private class EntrySet extends AbstractSet<CacheEntry<K, V>> {
+         /** Near entry set. */
+         private Set<CacheEntry<K, V>> nearSet;
+ 
+         /** Dht entry set. */
+         private Set<CacheEntry<K, V>> dhtSet;
+ 
+         /**
+          * @param nearSet Near entry set.
+          * @param dhtSet Dht entry set.
+          */
+         private EntrySet(Set<CacheEntry<K, V>> nearSet, Set<CacheEntry<K, V>> 
dhtSet) {
+             assert nearSet != null;
+             assert dhtSet != null;
+ 
+             this.nearSet = nearSet;
+             this.dhtSet = dhtSet;
+         }
+ 
+         /** {@inheritDoc} */
+         @NotNull @Override public Iterator<CacheEntry<K, V>> iterator() {
+             return new EntryIterator(nearSet.iterator(),
+                 F.iterator0(dhtSet, false, new P1<CacheEntry<K, V>>() {
+                     @Override public boolean apply(CacheEntry<K, V> e) {
+                         return 
!GridNearCacheAdapter.super.containsKey(e.getKey(), null);
+                     }
+                 }));
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public int size() {
+             return F.size(iterator());
+         }
+     }
+ 
+     /**
+      * Entry set iterator.
+      */
+     private class EntryIterator implements Iterator<CacheEntry<K, V>> {
+         /** */
+         private Iterator<CacheEntry<K, V>> dhtIter;
+ 
+         /** */
+         private Iterator<CacheEntry<K, V>> nearIter;
+ 
+         /** */
+         private Iterator<CacheEntry<K, V>> currIter;
+ 
+         /** */
+         private CacheEntry<K, V> currEntry;
+ 
+         /**
+          * @param nearIter Near set iterator.
+          * @param dhtIter Dht set iterator.
+          */
+         private EntryIterator(Iterator<CacheEntry<K, V>> nearIter, 
Iterator<CacheEntry<K, V>> dhtIter) {
+             assert nearIter != null;
+             assert dhtIter != null;
+ 
+             this.nearIter = nearIter;
+             this.dhtIter = dhtIter;
+ 
+             currIter = nearIter;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean hasNext() {
+             return nearIter.hasNext() || dhtIter.hasNext();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public CacheEntry<K, V> next() {
+             if (!hasNext())
+                 throw new NoSuchElementException();
+ 
+             if (!currIter.hasNext())
+                 currIter = dhtIter;
+ 
+             return currEntry = currIter.next();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void remove() {
+             if (currEntry == null)
+                 throw new IllegalStateException();
+ 
+             assert currIter != null;
+ 
+             currIter.remove();
+ 
+             try {
+                 GridNearCacheAdapter.this.remove(currEntry.getKey(), CU.<K, 
V>empty());
+             }
+             catch (IgniteCheckedException e) {
+                 throw new IgniteException(e);
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridNearCacheAdapter.class, this);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 0000000,8628028..a8e5915
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@@ -1,0 -1,626 +1,627 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.util.*;
+ 
+ import static org.apache.ignite.events.IgniteEventType.*;
+ 
+ /**
+  * Near cache entry.
+  */
+ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", 
"TooBroadScope"})
+ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> 
{
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** */
+     private static final int NEAR_SIZE_OVERHEAD = 36;
+ 
+     /** ID of primary node from which this entry was last read. */
+     private volatile UUID primaryNodeId;
+ 
+     /** DHT version which caused the last update. */
+     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+     private GridCacheVersion dhtVer;
+ 
+     /** Partition. */
+     private int part;
+ 
+     /**
+      * @param ctx Cache context.
+      * @param key Cache key.
+      * @param hash Key hash value.
+      * @param val Entry value.
+      * @param next Next entry in the linked list.
+      * @param ttl Time to live.
+      * @param hdrId Header id.
+      */
+     public GridNearCacheEntry(GridCacheContext<K, V> ctx, K key, int hash, V 
val, GridCacheMapEntry<K, V> next,
+         long ttl, int hdrId) {
+         super(ctx, key, hash, val, next, ttl, hdrId);
+ 
+         part = ctx.affinity().partition(key);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int memorySize() throws IgniteCheckedException {
+         return super.memorySize() + NEAR_SIZE_OVERHEAD;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int partition() {
+         return part;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isNear() {
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean valid(long topVer) {
+         assert topVer > 0 : "Topology version is invalid: " + topVer;
+ 
+         UUID primaryNodeId = this.primaryNodeId;
+ 
+         if (primaryNodeId == null)
+             return false;
+ 
+         if (cctx.discovery().node(primaryNodeId) == null) {
+             this.primaryNodeId = null;
+ 
+             return false;
+         }
+ 
+         // Make sure that primary node is alive before returning this value.
+         ClusterNode primary = cctx.affinity().primary(key(), topVer);
+ 
+         if (primary != null && primary.id().equals(primaryNodeId))
+             return true;
+ 
+         // Primary node changed.
+         this.primaryNodeId = null;
+ 
+         return false;
+     }
+ 
+     /**
+      * @param topVer Topology version.
+      * @return {@code True} if this entry was initialized by this call.
+      * @throws GridCacheEntryRemovedException If this entry is obsolete.
+      */
+     public boolean initializeFromDht(long topVer) throws 
GridCacheEntryRemovedException {
+         while (true) {
+             GridDhtCacheEntry<K, V> entry = cctx.near().dht().peekExx(key);
+ 
+             if (entry != null) {
+                 GridCacheEntryInfo<K, V> e = entry.info();
+ 
+                 if (e != null) {
+                     GridCacheVersion enqueueVer = null;
+ 
+                     try {
+                         synchronized (this) {
+                             checkObsolete();
+ 
+                             if (isNew() || !valid(topVer)) {
+                                 // Version does not change for load ops.
+                                 update(e.value(), e.valueBytes(), 
e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
+ 
+                                 if (cctx.deferredDelete()) {
+                                     boolean deleted = val == null && valBytes 
== null;
+ 
+                                     if (deleted != deletedUnlocked()) {
+                                         deletedUnlocked(deleted);
+ 
+                                         if (deleted)
+                                             enqueueVer = e.version();
+                                     }
+                                 }
+ 
+                                 recordNodeId(cctx.affinity().primary(key, 
topVer).id());
+ 
+                                 dhtVer = e.isNew() || e.isDeleted() ? null : 
e.version();
+ 
+                                 return true;
+                             }
+ 
+                             return false;
+                         }
+                     }
+                     finally {
+                         if (enqueueVer != null)
+                             cctx.onDeferredDelete(this, enqueueVer);
+                     }
+                 }
+             }
+             else
+                 return false;
+         }
+     }
+ 
+     /**
+      * This method should be called only when lock is owned on this entry.
+      *
+      * @param val Value.
+      * @param valBytes Value bytes.
+      * @param ver Version.
+      * @param dhtVer DHT version.
+      * @param primaryNodeId Primary node ID.
+      * @return {@code True} if reset was done.
+      * @throws GridCacheEntryRemovedException If obsolete.
+      * @throws IgniteCheckedException If failed.
+      */
+     @SuppressWarnings( {"RedundantTypeArguments"})
+     public boolean resetFromPrimary(V val, byte[] valBytes, GridCacheVersion 
ver, GridCacheVersion dhtVer,
+         UUID primaryNodeId) throws GridCacheEntryRemovedException, 
IgniteCheckedException {
+         assert dhtVer != null;
+ 
+         cctx.versions().onReceived(primaryNodeId, dhtVer);
+ 
+         if (valBytes != null && val == null && 
!cctx.config().isStoreValueBytes()) {
+             GridCacheVersion curDhtVer = dhtVersion();
+ 
+             if (!F.eq(dhtVer, curDhtVer))
+                 val = cctx.marshaller().<V>unmarshal(valBytes, 
cctx.deploy().globalLoader());
+         }
+ 
+         synchronized (this) {
+             checkObsolete();
+ 
+             this.primaryNodeId = primaryNodeId;
+ 
+             if (!F.eq(this.dhtVer, dhtVer)) {
+                 value(val, valBytes);
+ 
+                 this.ver = ver;
+                 this.dhtVer = dhtVer;
+ 
+                 return true;
+             }
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * This method should be called only when lock is owned on this entry.
+      *
+      * @param dhtVer DHT version.
+      * @param val Value associated with version.
+      * @param valBytes Value bytes.
+      * @param expireTime Expire time.
+      * @param ttl Time to live.
+      * @param primaryNodeId Primary node ID.
+      */
+     public void updateOrEvict(GridCacheVersion dhtVer, @Nullable V val, 
@Nullable byte[] valBytes, long expireTime,
+         long ttl, UUID primaryNodeId) {
+         assert dhtVer != null;
+ 
+         cctx.versions().onReceived(primaryNodeId, dhtVer);
+ 
+         synchronized (this) {
+             if (!obsolete()) {
+                 // Don't set DHT version to null until we get a match from 
DHT remote transaction.
+                 if (F.eq(this.dhtVer, dhtVer))
+                     this.dhtVer = null;
+ 
+                 // If we are here, then we already tried to evict this entry.
+                 // If cannot evict, then update.
+                 if (this.dhtVer == null) {
+                     if (!markObsolete(dhtVer)) {
+                         value(val, valBytes);
+ 
+                         ttlAndExpireTimeExtras((int) ttl, expireTime);
+ 
+                         this.primaryNodeId = primaryNodeId;
+                     }
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * @return DHT version for this entry.
+      * @throws GridCacheEntryRemovedException If obsolete.
+      */
+     @Nullable public synchronized GridCacheVersion dhtVersion() throws 
GridCacheEntryRemovedException {
+         checkObsolete();
+ 
+         return dhtVer;
+     }
+ 
+     /**
+      * @return Tuple with version and value of this entry.
+      * @throws GridCacheEntryRemovedException If entry has been removed.
+      */
+     @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> 
versionedValue()
+         throws GridCacheEntryRemovedException {
+         checkObsolete();
+ 
+         if (dhtVer == null)
+             return null;
+         else {
+             V val0 = null;
+             byte[] valBytes0 = null;
+ 
+             GridCacheValueBytes valBytesTuple = valueBytes();
+ 
+             if (!valBytesTuple.isNull()) {
+                 if (valBytesTuple.isPlain())
+                     val0 = (V)valBytesTuple.get();
+                 else
+                     valBytes0 = valBytesTuple.get();
+             }
+             else
+                 val0 = val;
+ 
+             return F.t(dhtVer, val0, valBytes0);
+         }
+     }
+ 
+     /**
+      * @return ID of primary node from which this value was loaded.
+      */
+     UUID nodeId() {
+         return primaryNodeId;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void recordNodeId(UUID primaryNodeId) {
+         assert Thread.holdsLock(this);
+ 
+         this.primaryNodeId = primaryNodeId;
+     }
+ 
+     /**
+      * This method should be called only when committing optimistic 
transactions.
+      *
+      * @param dhtVer DHT version to record.
+      */
+     public synchronized void recordDhtVersion(GridCacheVersion dhtVer) {
+         // Version manager must be updated separately, when adding DHT version
+         // to transaction entries.
+         this.dhtVer = dhtVer;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected V readThrough(IgniteTxEx<K, V> tx, K key, boolean 
reload,
+         IgnitePredicate<CacheEntry<K, V>>[] filter, UUID subjId, String 
taskName) throws IgniteCheckedException {
+         return cctx.near().loadAsync(tx,
+             F.asList(key),
+             reload,
+             /*force primary*/false,
+             filter,
+             subjId,
+             taskName,
+             true,
+             null).get().get(key);
+     }
+ 
+     /**
+      * @param tx Transaction.
+      * @param primaryNodeId Primary node ID.
+      * @param val New value.
+      * @param valBytes Value bytes.
+      * @param ver Version to use.
+      * @param dhtVer DHT version received from remote node.
+      * @param expVer Optional version to match.
+      * @param ttl Time to live.
+      * @param expireTime Expiration time.
+      * @param evt Event flag.
+      * @param topVer Topology version.
+      * @param subjId Subject ID.
+      * @return {@code True} if initial value was set.
+      * @throws IgniteCheckedException In case of error.
+      * @throws GridCacheEntryRemovedException If entry was removed.
+      */
+     @SuppressWarnings({"RedundantTypeArguments"})
+     public boolean loadedValue(@Nullable IgniteTxEx tx, UUID primaryNodeId, V 
val, byte[] valBytes,
+         GridCacheVersion ver, GridCacheVersion dhtVer, @Nullable 
GridCacheVersion expVer, long ttl, long expireTime,
+         boolean evt, long topVer, UUID subjId)
+         throws IgniteCheckedException, GridCacheEntryRemovedException {
+         boolean valid = valid(tx != null ? tx.topologyVersion() : 
cctx.affinity().affinityTopologyVersion());
+ 
+         if (valBytes != null && val == null && (isNewLocked() || !valid))
+             val = cctx.marshaller().<V>unmarshal(valBytes, 
cctx.deploy().globalLoader());
+ 
+         GridCacheVersion enqueueVer = null;
+ 
+         try {
+             synchronized (this) {
+                 checkObsolete();
+ 
 -                cctx.cache().metrics0().onRead(false);
++                if (cctx.cache().configuration().isStatisticsEnabled())
++                    cctx.cache().metrics0().onRead(false);
+ 
+                 boolean ret = false;
+ 
+                 V old = this.val;
+                 boolean hasVal = hasValueUnlocked();
+ 
+                 if (isNew() || !valid || expVer == null || 
expVer.equals(this.dhtVer)) {
+                     this.primaryNodeId = primaryNodeId;
+ 
+                     // Change entry only if dht version has changed.
+                     if (!dhtVer.equals(dhtVersion())) {
+                         update(val, valBytes, expireTime, ttl, ver);
+ 
+                         if (cctx.deferredDelete()) {
+                             boolean deleted = val == null && valBytes == null;
+ 
+                             if (deleted != deletedUnlocked()) {
+                                 deletedUnlocked(deleted);
+ 
+                                 if (deleted)
+                                     enqueueVer = ver;
+                             }
+                         }
+ 
+                         recordDhtVersion(dhtVer);
+ 
+                         ret = true;
+                     }
+                 }
+ 
+                 if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
+                     cctx.events().addEvent(partition(), key, tx, null, 
EVT_CACHE_OBJECT_READ,
+                         val, val != null || valBytes != null, old, hasVal, 
subjId, null, null);
+ 
+                 return ret;
+             }
+         }
+         finally {
+             if (enqueueVer != null)
+                 cctx.onDeferredDelete(this, enqueueVer);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void updateIndex(V val, byte[] valBytes, long 
expireTime,
+         GridCacheVersion ver, V old) throws IgniteCheckedException {
+         // No-op: queries are disabled for near cache.
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void clearIndex(V val) {
+         // No-op.
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheMvccCandidate<K> addLocal(
+         long threadId,
+         GridCacheVersion ver,
+         long timeout,
+         boolean reenter,
+         boolean tx,
+         boolean implicitSingle) throws GridCacheEntryRemovedException {
+         return addNearLocal(
+             null,
+             threadId,
+             ver,
+             timeout,
+             reenter,
+             tx,
+             implicitSingle
+         );
+     }
+ 
+     /**
+      * Add near local candidate.
+      *
+      * @param dhtNodeId DHT node ID.
+      * @param threadId Owning thread ID.
+      * @param ver Lock version.
+      * @param timeout Timeout to acquire lock.
+      * @param reenter Reentry flag.
+      * @param tx Transaction flag.
+      * @param implicitSingle Implicit flag.
+      * @return New candidate.
+      * @throws GridCacheEntryRemovedException If entry has been removed.
+      */
+     @Nullable public GridCacheMvccCandidate<K> addNearLocal(
+         @Nullable UUID dhtNodeId,
+         long threadId,
+         GridCacheVersion ver,
+         long timeout,
+         boolean reenter,
+         boolean tx,
+         boolean implicitSingle)
+         throws GridCacheEntryRemovedException {
+         GridCacheMvccCandidate<K> prev;
+         GridCacheMvccCandidate<K> owner;
+         GridCacheMvccCandidate<K> cand;
+ 
+         V val;
+ 
+         UUID locId = cctx.nodeId();
+ 
+         synchronized (this) {
+             checkObsolete();
+ 
+             GridCacheMvcc<K> mvcc = mvccExtras();
+ 
+             if (mvcc == null) {
+                 mvcc = new GridCacheMvcc<>(cctx);
+ 
+                 mvccExtras(mvcc);
+             }
+ 
+             GridCacheMvccCandidate<K> c = mvcc.localCandidate(locId, 
threadId);
+ 
+             if (c != null)
+                 return reenter ? c.reenter() : null;
+ 
+             prev = mvcc.anyOwner();
+ 
+             boolean emptyBefore = mvcc.isEmpty();
+ 
+             // Lock could not be acquired.
+             if (timeout < 0 && !emptyBefore)
+                 return null;
+ 
+             // Local lock for near cache is a local lock.
+             cand = mvcc.addNearLocal(this, locId, dhtNodeId, threadId, ver, 
timeout, tx, implicitSingle);
+ 
+             owner = mvcc.anyOwner();
+ 
+             boolean emptyAfter = mvcc.isEmpty();
+ 
+             checkCallbacks(emptyBefore, emptyAfter);
+ 
+             val = this.val;
+ 
+             if (emptyAfter)
+                 mvccExtras(null);
+         }
+ 
+         // This call must be outside of synchronization.
+         checkOwnerChanged(prev, owner, val);
+ 
+         return cand;
+     }
+ 
+     /**
+      * @param ver Version to set DHT node ID for.
+      * @param dhtNodeId DHT node ID.
+      * @return {@code true} if candidate was found.
+      * @throws GridCacheEntryRemovedException If entry is removed.
+      */
+     @Nullable public synchronized GridCacheMvccCandidate<K> 
dhtNodeId(GridCacheVersion ver, UUID dhtNodeId)
+         throws GridCacheEntryRemovedException {
+         checkObsolete();
+ 
+         GridCacheMvcc<K> mvcc = mvccExtras();
+ 
+         GridCacheMvccCandidate<K> cand = mvcc == null ? null : 
mvcc.candidate(ver);
+ 
+         if (cand == null)
+             return null;
+ 
+         cand.otherNodeId(dhtNodeId);
+ 
+         return cand;
+     }
+ 
+     /**
+      * Unlocks local lock.
+      *
+      * @return Removed candidate, or <tt>null</tt> if thread still holds the 
lock.
+      */
+     @Nullable @Override public GridCacheMvccCandidate<K> removeLock() {
+         GridCacheMvccCandidate<K> prev = null;
+         GridCacheMvccCandidate<K> owner = null;
+ 
+         V val;
+ 
+         UUID locId = cctx.nodeId();
+ 
+         GridCacheMvccCandidate<K> cand = null;
+ 
+         synchronized (this) {
+             GridCacheMvcc<K> mvcc = mvccExtras();
+ 
+             if (mvcc != null) {
+                 prev = mvcc.anyOwner();
+ 
+                 boolean emptyBefore = mvcc.isEmpty();
+ 
+                 cand = mvcc.localCandidate(locId, 
Thread.currentThread().getId());
+ 
+                 assert cand == null || cand.nearLocal();
+ 
+                 if (cand != null && cand.owner()) {
+                     // If a reentry, then release reentry. Otherwise, remove 
lock.
+                     GridCacheMvccCandidate<K> reentry = cand.unenter();
+ 
+                     if (reentry != null) {
+                         assert reentry.reentry();
+ 
+                         return reentry;
+                     }
+ 
+                     mvcc.remove(cand.version());
+ 
+                     owner = mvcc.anyOwner();
+                 }
+                 else
+                     return null;
+ 
+                 boolean emptyAfter = mvcc.isEmpty();
+ 
+                 checkCallbacks(emptyBefore, emptyAfter);
+ 
+                 if (emptyAfter)
+                     mvccExtras(null);
+             }
+ 
+             val = this.val;
+         }
+ 
+         assert cand != null;
+         assert owner != prev;
+ 
+         if (log.isDebugEnabled())
+             log.debug("Released local candidate from entry [owner=" + owner + 
", prev=" + prev +
+                 ", entry=" + this + ']');
+ 
+         cctx.mvcc().removeExplicitLock(cand);
+ 
+         if (prev != null && owner != prev)
+             checkThreadChain(prev);
+ 
+         // This call must be outside of synchronization.
+         checkOwnerChanged(prev, owner, val);
+ 
+         return owner != prev ? prev : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void onInvalidate() {
+         dhtVer = null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public CacheEntry<K, V> wrap(boolean prjAware) {
+         GridCacheProjectionImpl<K, V> prjPerCall = null;
+ 
+         if (prjAware)
+             prjPerCall = cctx.projectionPerCall();
+ 
+         return new GridPartitionedCacheEntryImpl<>(prjPerCall, cctx, key, 
this);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public synchronized String toString() {
+         return S.toString(GridNearCacheEntry.class, this, "super", 
super.toString());
+     }
+ }

Reply via email to