http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 0000000,f2e564b..0e4e59b
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@@ -1,0 -1,866 +1,868 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.IgniteSystemProperties.*;
+ import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
+ 
+ /**
+  *
+  */
+ public final class GridNearGetFuture<K, V> extends 
GridCompoundIdentityFuture<Map<K, V>>
+     implements GridCacheFuture<Map<K, V>> {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Default max remap count value. */
+     public static final int DFLT_MAX_REMAP_CNT = 3;
+ 
+     /** Logger reference. */
+     private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+ 
+     /** Maximum number of attempts to remap key to the same primary node. */
+     private static final int MAX_REMAP_CNT = 
getInteger(GG_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+ 
+     /** Context. */
+     private GridCacheContext<K, V> cctx;
+ 
+     /** Keys. */
+     private Collection<? extends K> keys;
+ 
+     /** Reload flag. */
+     private boolean reload;
+ 
+     /** Read through flag. */
+     private boolean readThrough;
+ 
+     /** Force primary flag. */
+     private boolean forcePrimary;
+ 
+     /** Future ID. */
+     private IgniteUuid futId;
+ 
+     /** Version. */
+     private GridCacheVersion ver;
+ 
+     /** Transaction. */
+     private IgniteTxLocalEx<K, V> tx;
+ 
+     /** Filters. */
+     private IgnitePredicate<CacheEntry<K, V>>[] filters;
+ 
+     /** Logger. */
+     private IgniteLogger log;
+ 
+     /** Trackable flag. */
+     private boolean trackable;
+ 
+     /** Remap count. */
+     private AtomicInteger remapCnt = new AtomicInteger();
+ 
+     /** Subject ID. */
+     private UUID subjId;
+ 
+     /** Task name. */
+     private String taskName;
+ 
+     /** Whether to deserialize portable objects. */
+     private boolean deserializePortable;
+ 
+     /** Expiry policy. */
+     private IgniteCacheExpiryPolicy expiryPlc;
+ 
+     /**
+      * Empty constructor required for {@link Externalizable}.
+      */
+     public GridNearGetFuture() {
+         // No-op.
+     }
+ 
+     /**
+      * @param cctx Context.
+      * @param keys Keys.
+      * @param readThrough Read through flag.
+      * @param reload Reload flag.
+      * @param forcePrimary If {@code true} get will be performed on primary 
node even if
+      *      called on backup node.
+      * @param tx Transaction.
+      * @param filters Filters.
+      * @param subjId Subject ID.
+      * @param taskName Task name.
+      * @param deserializePortable Deserialize portable flag.
+      * @param expiryPlc Expiry policy.
+      */
+     public GridNearGetFuture(
+         GridCacheContext<K, V> cctx,
+         Collection<? extends K> keys,
+         boolean readThrough,
+         boolean reload,
+         boolean forcePrimary,
+         @Nullable IgniteTxLocalEx<K, V> tx,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filters,
+         @Nullable UUID subjId,
+         String taskName,
+         boolean deserializePortable,
+         @Nullable IgniteCacheExpiryPolicy expiryPlc
+     ) {
+         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+ 
+         assert !F.isEmpty(keys);
+ 
+         this.cctx = cctx;
+         this.keys = keys;
+         this.readThrough = readThrough;
+         this.reload = reload;
+         this.forcePrimary = forcePrimary;
+         this.filters = filters;
+         this.tx = tx;
+         this.subjId = subjId;
+         this.taskName = taskName;
+         this.deserializePortable = deserializePortable;
+         this.expiryPlc = expiryPlc;
+ 
+         futId = IgniteUuid.randomUuid();
+ 
+         ver = tx == null ? cctx.versions().next() : tx.xidVersion();
+ 
+         log = U.logger(ctx, logRef, GridNearGetFuture.class);
+     }
+ 
+     /**
+      * Initializes future.
+      */
+     public void init() {
+         long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() 
: tx.topologyVersion();
+ 
+         map(keys, Collections.<ClusterNode, LinkedHashMap<K, 
Boolean>>emptyMap(), topVer);
+ 
+         markInitialized();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean trackable() {
+         return trackable;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void markNotTrackable() {
+         // Should not flip trackable flag from true to false since get future 
can be remapped.
+     }
+ 
+     /**
+      * @return Keys.
+      */
+     Collection<? extends K> keys() {
+         return keys;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteUuid futureId() {
+         return futId;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheVersion version() {
+         return ver;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<? extends ClusterNode> nodes() {
+         return
+             F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<Map<K, 
V>>, ClusterNode>() {
+                 @Nullable @Override public ClusterNode 
apply(IgniteFuture<Map<K, V>> f) {
+                     if (isMini(f))
+                         return ((MiniFuture)f).node();
+ 
+                     return cctx.discovery().localNode();
+                 }
+             });
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onNodeLeft(UUID nodeId) {
+         for (IgniteFuture<Map<K, V>> fut : futures())
+             if (isMini(fut)) {
+                 MiniFuture f = (MiniFuture)fut;
+ 
+                 if (f.node().id().equals(nodeId)) {
+                     f.onResult(new ClusterTopologyException("Remote node left 
grid (will retry): " + nodeId));
+ 
+                     return true;
+                 }
+             }
+ 
+         return false;
+     }
+ 
+     /**
+      * @param nodeId Sender.
+      * @param res Result.
+      */
+     void onResult(UUID nodeId, GridNearGetResponse<K, V> res) {
+         for (IgniteFuture<Map<K, V>> fut : futures())
+             if (isMini(fut)) {
+                 MiniFuture f = (MiniFuture)fut;
+ 
+                 if (f.futureId().equals(res.miniId())) {
+                     assert f.node().id().equals(nodeId);
+ 
+                     f.onResult(res);
+                 }
+             }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onDone(Map<K, V> res, Throwable err) {
+         if (super.onDone(res, err)) {
+             // Don't forget to clean up.
+             if (trackable)
+                 cctx.mvcc().removeFuture(this);
+ 
+             cache().dht().sendTtlUpdateRequest(expiryPlc);
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * @param f Future.
+      * @return {@code True} if mini-future.
+      */
+     private boolean isMini(IgniteFuture<Map<K, V>> f) {
+         return f.getClass().equals(MiniFuture.class);
+     }
+ 
+     /**
+      * @param keys Keys.
+      * @param mapped Mappings to check for duplicates.
+      * @param topVer Topology version to map on.
+      */
+     private void map(Collection<? extends K> keys,
+         Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped,
+         final long topVer) {
+         Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
+ 
+         if (affNodes.isEmpty()) {
+             assert !isAffinityNode(cctx.config());
+ 
+             onDone(new ClusterTopologyException("Failed to map keys for 
near-only cache (all partition " +
+                 "nodes left the grid)."));
+ 
+             return;
+         }
+ 
+         Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings = 
U.newHashMap(affNodes.size());
+ 
+         Map<K, GridCacheVersion> savedVers = null;
+ 
+         // Assign keys to primary nodes.
+         for (K key : keys) {
+             if (key != null)
+                 savedVers = map(key, mappings, topVer, mapped, savedVers);
+         }
+ 
+         if (isDone())
+             return;
+ 
+         final Map<K, GridCacheVersion> saved = savedVers;
+ 
+         final int keysSize = keys.size();
+ 
+         // Create mini futures.
+         for (Map.Entry<ClusterNode, LinkedHashMap<K, Boolean>> entry : 
mappings.entrySet()) {
+             final ClusterNode n = entry.getKey();
+ 
+             final LinkedHashMap<K, Boolean> mappedKeys = entry.getValue();
+ 
+             assert !mappedKeys.isEmpty();
+ 
+             // If this is the primary or backup node for the keys.
+             if (n.isLocal()) {
+                 final GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut 
=
+                     dht().getDhtAsync(n.id(),
+                         -1,
+                         mappedKeys,
+                         readThrough,
+                         reload,
+                         topVer,
+                         subjId,
+                         taskName == null ? 0 : taskName.hashCode(),
+                         deserializePortable,
+                         filters,
+                         expiryPlc);
+ 
+                 final Collection<Integer> invalidParts = 
fut.invalidPartitions();
+ 
+                 if (!F.isEmpty(invalidParts)) {
+                     Collection<K> remapKeys = new ArrayList<>(keysSize);
+ 
+                     for (K key : keys) {
+                         if (key != null && 
invalidParts.contains(cctx.affinity().partition(key)))
+                             remapKeys.add(key);
+                     }
+ 
+                     long updTopVer = ctx.discovery().topologyVersion();
+ 
+                     assert updTopVer > topVer : "Got invalid partitions for 
local node but topology version did " +
+                         "not change [topVer=" + topVer + ", updTopVer=" + 
updTopVer +
+                         ", invalidParts=" + invalidParts + ']';
+ 
+                     // Remap recursively.
+                     map(remapKeys, mappings, updTopVer);
+                 }
+ 
+                 // Add new future.
+                 add(fut.chain(new 
C1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>, Map<K, V>>() {
+                     @Override public Map<K, V> 
apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut) {
+                         try {
+                             return loadEntries(n.id(), mappedKeys.keySet(), 
fut.get(), saved, topVer);
+                         }
+                         catch (Exception e) {
+                             U.error(log, "Failed to get values from dht cache 
[fut=" + fut + "]", e);
+ 
+                             onDone(e);
+ 
+                             return Collections.emptyMap();
+                         }
+                     }
+                 }));
+             }
+             else {
+                 if (!trackable) {
+                     trackable = true;
+ 
+                     cctx.mvcc().addFuture(this);
+                 }
+ 
+                 MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer);
+ 
+                 GridCacheMessage<K, V> req = new GridNearGetRequest<>(
+                     cctx.cacheId(),
+                     futId,
+                     fut.futureId(),
+                     ver,
+                     mappedKeys,
+                     readThrough,
+                     reload,
+                     topVer,
+                     filters,
+                     subjId,
+                     taskName == null ? 0 : taskName.hashCode(),
+                     expiryPlc != null ? expiryPlc.forAccess() : -1L);
+ 
+                 add(fut); // Append new future.
+ 
+                 try {
+                     cctx.io().send(n, req);
+                 }
+                 catch (IgniteCheckedException e) {
+                     // Fail the whole thing.
+                     if (e instanceof ClusterTopologyException)
+                         fut.onResult((ClusterTopologyException)e);
+                     else
+                         fut.onResult(e);
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * @param mappings Mappings.
+      * @param key Key to map.
+      * @param topVer Topology version
+      * @param mapped Previously mapped.
+      * @param savedVers Saved versions.
+      * @return Map.
+      */
+     private Map<K, GridCacheVersion> map(K key, Map<ClusterNode, 
LinkedHashMap<K, Boolean>> mappings,
+         long topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, 
Map<K, GridCacheVersion> savedVers) {
+         final GridNearCacheAdapter<K, V> near = cache();
+ 
+         // Allow to get cached value from the local node.
+         boolean allowLocRead = !forcePrimary || 
cctx.affinity().primary(cctx.localNode(), key, topVer);
+ 
+         GridCacheEntryEx<K, V> entry = allowLocRead ? near.peekEx(key) : null;
+ 
+         while (true) {
+             try {
+                 V v = null;
+ 
+                 boolean isNear = entry != null;
+ 
+                 // First we peek into near cache.
+                 if (isNear)
+                     v = entry.innerGet(tx,
+                         /*swap*/false,
+                         /*read-through*/false,
+                         /*fail-fast*/true,
+                         /*unmarshal*/true,
+                         /*metrics*/true,
+                         /*events*/true,
+                         /*temporary*/false,
+                         subjId,
+                         null,
+                         taskName,
+                         filters,
+                         expiryPlc);
+ 
+                 ClusterNode primary = null;
+ 
+                 if (v == null && allowLocRead) {
+                     GridDhtCacheAdapter<K, V> dht = cache().dht();
+ 
+                     try {
+                         entry = dht.context().isSwapOrOffheapEnabled() ? 
dht.entryEx(key) : dht.peekEx(key);
+ 
+                         // If near cache does not have value, then we peek 
DHT cache.
+                         if (entry != null) {
+                             boolean isNew = entry.isNewLocked() || 
!entry.valid(topVer);
+ 
+                             v = entry.innerGet(tx,
+                                 /*swap*/true,
+                                 /*read-through*/false,
+                                 /*fail-fast*/true,
+                                 /*unmarshal*/true,
+                                 /*update-metrics*/false,
+                                 /*events*/!isNear,
+                                 /*temporary*/false,
+                                 subjId,
+                                 null,
+                                 taskName,
+                                 filters,
+                                 expiryPlc);
+ 
+                             // Entry was not in memory or in swap, so we 
remove it from cache.
+                             if (v == null && isNew && 
entry.markObsoleteIfEmpty(ver))
+                                 dht.removeIfObsolete(key);
+                         }
+ 
 -                        if (v != null)
 -                            near.metrics0().onRead(true);
++                        if (v != null) {
++                            if 
(cctx.cache().configuration().isStatisticsEnabled())
++                                near.metrics0().onRead(true);
++                        }
+                         else {
+                             primary = cctx.affinity().primary(key, topVer);
+ 
 -                            if (!primary.isLocal())
++                            if (!primary.isLocal() && 
cctx.cache().configuration().isStatisticsEnabled())
+                                 near.metrics0().onRead(false);
+                         }
+                     }
+                     catch (GridDhtInvalidPartitionException ignored) {
+                         // No-op.
+                     }
+                     finally {
+                         if (entry != null && (tx == null || (!tx.implicit() 
&& tx.isolation() == READ_COMMITTED))) {
+                             dht.context().evicts().touch(entry, topVer);
+ 
+                             entry = null;
+                         }
+                     }
+                 }
+ 
+                 if (v != null && !reload) {
+                     if (cctx.portableEnabled())
+                         v = (V)cctx.unwrapPortableIfNeeded(v, 
!deserializePortable);
+ 
+                     add(new GridFinishedFuture<>(cctx.kernalContext(), 
Collections.singletonMap(key, v)));
+                 }
+                 else {
+                     if (primary == null)
+                         primary = cctx.affinity().primary(key, topVer);
+ 
+                     GridNearCacheEntry<K, V> nearEntry = allowLocRead ? 
near.peekExx(key) : null;
+ 
+                     entry = nearEntry;
+ 
+                     if (savedVers == null)
+                         savedVers = U.newHashMap(3);
+ 
+                     savedVers.put(key, nearEntry == null ? null : 
nearEntry.dhtVersion());
+ 
+                     LinkedHashMap<K, Boolean> keys = mapped.get(primary);
+ 
+                     if (keys != null && keys.containsKey(key)) {
+                         if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
+                             onDone(new ClusterTopologyException("Failed to 
remap key to a new node after " + MAX_REMAP_CNT
+                                 + " attempts (key got remapped to the same 
node) [key=" + key + ", node=" +
+                                 U.toShortString(primary) + ", mappings=" + 
mapped + ']'));
+ 
+                             return savedVers;
+                         }
+                     }
+ 
+                     // Don't add reader if transaction acquires lock anyway 
to avoid deadlock.
+                     boolean addRdr = tx == null || tx.optimistic();
+ 
+                     if (!addRdr && tx.readCommitted() && 
!tx.writeSet().contains(key))
+                         addRdr = true;
+ 
+                     LinkedHashMap<K, Boolean> old = mappings.get(primary);
+ 
+                     if (old == null)
+                         mappings.put(primary, old = new LinkedHashMap<>(3, 
1f));
+ 
+                     old.put(key, addRdr);
+                 }
+ 
+                 break;
+             }
+             catch (IgniteCheckedException e) {
+                 onDone(e);
+ 
+                 break;
+             }
+             catch (GridCacheEntryRemovedException ignored) {
+                 entry = allowLocRead ? near.peekEx(key) : null;
+             }
+             catch (GridCacheFilterFailedException e) {
+                 if (log.isDebugEnabled())
+                     log.debug("Filter validation failed for entry: " + e);
+ 
+                 break;
+             }
+             finally {
+                 if (entry != null && !reload && tx == null)
+                     cctx.evicts().touch(entry, topVer);
+             }
+         }
+ 
+         return savedVers;
+     }
+ 
+     /**
+      * @return Near cache.
+      */
+     private GridNearCacheAdapter<K, V> cache() {
+         return (GridNearCacheAdapter<K, V>)cctx.cache();
+     }
+ 
+     /**
+      * @return DHT cache.
+      */
+     private GridDhtCacheAdapter<K, V> dht() {
+         return cache().dht();
+     }
+ 
+     /**
+      * @param nodeId Node id.
+      * @param keys Keys.
+      * @param infos Entry infos.
+      * @param savedVers Saved versions.
+      * @param topVer Topology version
+      * @return Result map.
+      */
+     private Map<K, V> loadEntries(UUID nodeId,
+         Collection<K> keys,
+         Collection<GridCacheEntryInfo<K, V>> infos,
+         Map<K, GridCacheVersion> savedVers,
+         long topVer) {
+         boolean empty = F.isEmpty(keys);
+ 
+         Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new 
GridLeanMap<K, V>(keys.size());
+ 
+         if (!empty) {
+             boolean atomic = cctx.atomic();
+ 
+             GridCacheVersion ver = atomic ? null : F.isEmpty(infos) ? null : 
cctx.versions().next();
+ 
+             for (GridCacheEntryInfo<K, V> info : infos) {
+                 try {
+                     info.unmarshalValue(cctx, cctx.deploy().globalLoader());
+ 
+                     // Entries available locally in DHT should not be loaded 
into near cache for reading.
+                     if 
(!cctx.cache().affinity().isPrimaryOrBackup(cctx.localNode(), info.key())) {
+                         GridNearCacheEntry<K, V> entry = 
cache().entryExx(info.key(), topVer);
+ 
+                         GridCacheVersion saved = savedVers.get(info.key());
+ 
+                         // Load entry into cache.
+                         entry.loadedValue(tx,
+                             nodeId,
+                             info.value(),
+                             info.valueBytes(),
+                             atomic ? info.version() : ver,
+                             info.version(),
+                             saved,
+                             info.ttl(),
+                             info.expireTime(),
+                             true,
+                             topVer,
+                             subjId);
+ 
+                         cctx.evicts().touch(entry, topVer);
+                     }
+ 
+                     V val = info.value();
+ 
+                     if (cctx.portableEnabled())
+                         val = (V)cctx.unwrapPortableIfNeeded(val, 
!deserializePortable);
+ 
+                     map.put(info.key(), val);
+                 }
+                 catch (GridCacheEntryRemovedException ignore) {
+                     if (log.isDebugEnabled())
+                         log.debug("Got removed entry while processing get 
response (will not retry).");
+                 }
+                 catch (IgniteCheckedException e) {
+                     // Fail.
+                     onDone(e);
+ 
+                     return Collections.emptyMap();
+                 }
+             }
+         }
+ 
+         return map;
+     }
+ 
+     /**
+      * Mini-future for get operations. Mini-futures are only waiting on a 
single
+      * node as opposed to multiple nodes.
+      */
+     private class MiniFuture extends GridFutureAdapter<Map<K, V>> {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** */
+         private final IgniteUuid futId = IgniteUuid.randomUuid();
+ 
+         /** Node ID. */
+         private ClusterNode node;
+ 
+         /** Keys. */
+         @GridToStringInclude
+         private LinkedHashMap<K, Boolean> keys;
+ 
+         /** Saved entry versions. */
+         private Map<K, GridCacheVersion> savedVers;
+ 
+         /** Topology version on which this future was mapped. */
+         private long topVer;
+ 
+         /**
+          * Empty constructor required for {@link Externalizable}.
+          */
+         public MiniFuture() {
+             // No-op.
+         }
+ 
+         /**
+          * @param node Node.
+          * @param keys Keys.
+          * @param savedVers Saved entry versions.
+          * @param topVer Topology version.
+          */
+         MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, Map<K, 
GridCacheVersion> savedVers, long topVer) {
+             super(cctx.kernalContext());
+ 
+             this.node = node;
+             this.keys = keys;
+             this.savedVers = savedVers;
+             this.topVer = topVer;
+         }
+ 
+         /**
+          * @return Future ID.
+          */
+         IgniteUuid futureId() {
+             return futId;
+         }
+ 
+         /**
+          * @return Node ID.
+          */
+         public ClusterNode node() {
+             return node;
+         }
+ 
+         /**
+          * @return Keys.
+          */
+         public Collection<K> keys() {
+             return keys.keySet();
+         }
+ 
+         /**
+          * @param e Error.
+          */
+         void onResult(Throwable e) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to get future result [fut=" + this + ", 
err=" + e + ']');
+ 
+             // Fail.
+             onDone(e);
+         }
+ 
+         /**
+          * @param e Topology exception.
+          */
+         void onResult(ClusterTopologyException e) {
+             if (log.isDebugEnabled())
+                 log.debug("Remote node left grid while sending or waiting for 
reply (will retry): " + this);
+ 
+             long updTopVer = ctx.discovery().topologyVersion();
+ 
+             if (updTopVer > topVer) {
+                 // Remap.
+                 map(keys.keySet(), F.t(node, keys), updTopVer);
+ 
+                 onDone(Collections.<K, V>emptyMap());
+             }
+             else {
+                 final RemapTimeoutObject timeout = new 
RemapTimeoutObject(ctx.config().getNetworkTimeout(), topVer, e);
+ 
+                 ctx.discovery().topologyFuture(topVer + 1).listenAsync(new 
CI1<IgniteFuture<Long>>() {
+                     @Override public void apply(IgniteFuture<Long> 
longIgniteFuture) {
+                         if (timeout.finish()) {
+                             ctx.timeout().removeTimeoutObject(timeout);
+ 
+                             // Remap.
+                             map(keys.keySet(), F.t(node, keys), 
cctx.affinity().affinityTopologyVersion());
+ 
+                             onDone(Collections.<K, V>emptyMap());
+                         }
+                     }
+                 });
+ 
+                 ctx.timeout().addTimeoutObject(timeout);
+             }
+         }
+ 
+         /**
+          * @param res Result callback.
+          */
+         void onResult(final GridNearGetResponse<K, V> res) {
+             final Collection<Integer> invalidParts = res.invalidPartitions();
+ 
+             // If error happened on remote node, fail the whole future.
+             if (res.error() != null) {
+                 onDone(res.error());
+ 
+                 return;
+             }
+ 
+             // Remap invalid partitions.
+             if (!F.isEmpty(invalidParts)) {
+                 long rmtTopVer = res.topologyVersion();
+ 
+                 assert rmtTopVer != 0;
+ 
+                 if (rmtTopVer <= topVer) {
+                     // Fail the whole get future.
+                     onDone(new IgniteCheckedException("Failed to process 
invalid partitions response (remote node reported " +
+                         "invalid partitions but remote topology version does 
not differ from local) " +
+                         "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", 
invalidParts=" + invalidParts +
+                         ", nodeId=" + node.id() + ']'));
+ 
+                     return;
+                 }
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Remapping mini get future [invalidParts=" + 
invalidParts + ", fut=" + this + ']');
+ 
+                 // Need to wait for next topology version to remap.
+                 IgniteFuture<Long> topFut = 
ctx.discovery().topologyFuture(rmtTopVer);
+ 
+                 topFut.listenAsync(new CIX1<IgniteFuture<Long>>() {
+                     @Override public void applyx(IgniteFuture<Long> fut) 
throws IgniteCheckedException {
+                         long readyTopVer = fut.get();
+ 
+                         // This will append new futures to compound list.
+                         map(F.view(keys.keySet(), new P1<K>() {
+                             @Override public boolean apply(K key) {
+                                 return 
invalidParts.contains(cctx.affinity().partition(key));
+                             }
+                         }), F.t(node, keys), readyTopVer);
+ 
+                         // It is critical to call onDone after adding futures 
to compound list.
+                         onDone(loadEntries(node.id(), keys.keySet(), 
res.entries(), savedVers, topVer));
+                     }
+                 });
+             }
+             else
+                 onDone(loadEntries(node.id(), keys.keySet(), res.entries(), 
savedVers, topVer));
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(MiniFuture.class, this);
+         }
+ 
+         /**
+          * Remap timeout object.
+          */
+         private class RemapTimeoutObject extends GridTimeoutObjectAdapter {
+             /** Finished flag. */
+             private AtomicBoolean finished = new AtomicBoolean();
+ 
+             /** Topology version to wait. */
+             private long topVer;
+ 
+             /** Exception cause. */
+             private IgniteCheckedException e;
+ 
+             /**
+              * @param timeout Timeout.
+              * @param topVer Topology version timeout was created on.
+              */
+             private RemapTimeoutObject(long timeout, long topVer, 
IgniteCheckedException e) {
+                 super(timeout);
+ 
+                 this.topVer = topVer;
+                 this.e = e;
+             }
+ 
+             /** {@inheritDoc} */
+             @Override public void onTimeout() {
+                 if (finish())
+                     // Fail the whole get future.
+                     onDone(new IgniteCheckedException("Failed to wait for 
topology version to change: " + (topVer + 1), e));
+                 // else remap happened concurrently.
+             }
+ 
+             /**
+              * @return Guard against concurrent completion.
+              */
+             public boolean finish() {
+                 return finished.compareAndSet(false, true);
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 0000000,3dec521..2409335
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -1,0 -1,1468 +1,1470 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.managers.discovery.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ 
+ /**
+  * Cache lock future.
+  */
+ public final class GridNearLockFuture<K, V> extends 
GridCompoundIdentityFuture<Boolean>
+     implements GridCacheMvccFuture<K, V, Boolean> {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Logger reference. */
+     private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+ 
+     /** Cache registry. */
+     @GridToStringExclude
+     private GridCacheContext<K, V> cctx;
+ 
+     /** Lock owner thread. */
+     @GridToStringInclude
+     private long threadId;
+ 
+     /** Keys to lock. */
+     private Collection<? extends K> keys;
+ 
+     /** Future ID. */
+     private IgniteUuid futId;
+ 
+     /** Lock version. */
+     private GridCacheVersion lockVer;
+ 
+     /** Read flag. */
+     private boolean read;
+ 
+     /** Flag to return value. */
+     private boolean retval;
+ 
+     /** Error. */
+     private AtomicReference<Throwable> err = new AtomicReference<>(null);
+ 
+     /** Timed out flag. */
+     private volatile boolean timedOut;
+ 
+     /** Timeout object. */
+     @GridToStringExclude
+     private LockTimeoutObject timeoutObj;
+ 
+     /** Lock timeout. */
+     private long timeout;
+ 
+     /** Logger. */
+     @GridToStringExclude
+     private IgniteLogger log;
+ 
+     /** Filter. */
+     private IgnitePredicate<CacheEntry<K, V>>[] filter;
+ 
+     /** Transaction. */
+     @GridToStringExclude
+     private GridNearTxLocal<K, V> tx;
+ 
+     /** Topology snapshot to operate on. */
+     private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot =
+         new AtomicReference<>();
+ 
+     /** Map of current values. */
+     private Map<K, GridTuple3<GridCacheVersion, V, byte[]>> valMap;
+ 
+     /** Trackable flag. */
+     private boolean trackable = true;
+ 
+     /** Mutex. */
+     private final Object mux = new Object();
+ 
+     /** Keys locked so far. */
+     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+     @GridToStringExclude
+     private List<GridDistributedCacheEntry<K, V>> entries;
+ 
+     /** TTL for read operation. */
+     private long accessTtl;
+ 
+     /**
+      * Empty constructor required by {@link Externalizable}.
+      */
+     public GridNearLockFuture() {
+         // No-op.
+     }
+ 
+     /**
+      * @param cctx Registry.
+      * @param keys Keys to lock.
+      * @param tx Transaction.
+      * @param read Read flag.
+      * @param retval Flag to return value or not.
+      * @param timeout Lock acquisition timeout.
+      * @param accessTtl TTL for read operation.
+      * @param filter Filter.
+      */
+     public GridNearLockFuture(
+         GridCacheContext<K, V> cctx,
+         Collection<? extends K> keys,
+         @Nullable GridNearTxLocal<K, V> tx,
+         boolean read,
+         boolean retval,
+         long timeout,
+         long accessTtl,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         super(cctx.kernalContext(), CU.boolReducer());
+ 
+         assert keys != null;
+ 
+         this.cctx = cctx;
+         this.keys = keys;
+         this.tx = tx;
+         this.read = read;
+         this.retval = retval;
+         this.timeout = timeout;
+         this.accessTtl = accessTtl;
+         this.filter = filter;
+ 
+         threadId = tx == null ? Thread.currentThread().getId() : 
tx.threadId();
+ 
+         lockVer = tx != null ? tx.xidVersion() : cctx.versions().next();
+ 
+         futId = IgniteUuid.randomUuid();
+ 
+         entries = new ArrayList<>(keys.size());
+ 
+         log = U.logger(ctx, logRef, GridNearLockFuture.class);
+ 
+         if (timeout > 0) {
+             timeoutObj = new LockTimeoutObject();
+ 
+             cctx.time().addTimeoutObject(timeoutObj);
+         }
+ 
+         valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
+     }
+ 
+     /**
+      * @return Participating nodes.
+      */
+     @Override public Collection<? extends ClusterNode> nodes() {
+         return
+             F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, 
ClusterNode>() {
+                 @Nullable @Override public ClusterNode apply(IgniteFuture<?> 
f) {
+                     if (isMini(f))
+                         return ((MiniFuture)f).node();
+ 
+                     return cctx.discovery().localNode();
+                 }
+             });
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheVersion version() {
+         return lockVer;
+     }
+ 
+     /**
+      * @return Entries.
+      */
+     public List<GridDistributedCacheEntry<K, V>> entriesCopy() {
+         synchronized (mux) {
+             return new ArrayList<>(entries);
+         }
+     }
+ 
+     /**
+      * @return Future ID.
+      */
+     @Override public IgniteUuid futureId() {
+         return futId;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean trackable() {
+         return trackable;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void markNotTrackable() {
+         trackable = false;
+     }
+ 
+     /**
+      * @return {@code True} if transaction is not {@code null}.
+      */
+     private boolean inTx() {
+         return tx != null;
+     }
+ 
+     /**
+      * @return {@code True} if implicit-single-tx flag is set.
+      */
+     private boolean implicitSingleTx() {
+         return tx != null && tx.implicitSingle();
+     }
+ 
+     /**
+      * @return {@code True} if transaction is not {@code null} and has 
invalidate flag set.
+      */
+     private boolean isInvalidate() {
+         return tx != null && tx.isInvalidate();
+     }
+ 
+     /**
+      * @return {@code True} if commit is synchronous.
+      */
+     private boolean syncCommit() {
+         return tx != null && tx.syncCommit();
+     }
+ 
+     /**
+      * @return {@code True} if rollback is synchronous.
+      */
+     private boolean syncRollback() {
+         return tx != null && tx.syncRollback();
+     }
+ 
+     /**
+      * @return Transaction isolation or {@code null} if no transaction.
+      */
+     @Nullable private IgniteTxIsolation isolation() {
+         return tx == null ? null : tx.isolation();
+     }
+ 
+     /**
+      * @return {@code true} if related transaction is implicit.
+      */
+     private boolean implicitTx() {
+         return tx != null && tx.implicit();
+     }
+ 
+     /**
+      * @param cached Entry.
+      * @return {@code True} if locked.
+      * @throws GridCacheEntryRemovedException If removed.
+      */
+     private boolean locked(GridCacheEntryEx<K, V> cached) throws 
GridCacheEntryRemovedException {
+         // Reentry-aware check (If filter failed, lock is failed).
+         return cached.lockedLocallyByIdOrThread(lockVer, threadId) && 
filter(cached);
+     }
+ 
+     /**
+      * Adds entry to future.
+      *
+      * @param topVer Topology version.
+      * @param entry Entry to add.
+      * @param dhtNodeId DHT node ID.
+      * @return Lock candidate.
+      * @throws GridCacheEntryRemovedException If entry was removed.
+      */
+     @Nullable private GridCacheMvccCandidate<K> addEntry(long topVer, 
GridNearCacheEntry<K, V> entry, UUID dhtNodeId)
+         throws GridCacheEntryRemovedException {
+         // Check if lock acquisition is timed out.
+         if (timedOut)
+             return null;
+ 
+         // Add local lock first, as it may throw 
GridCacheEntryRemovedException.
+         GridCacheMvccCandidate<K> c = entry.addNearLocal(
+             dhtNodeId,
+             threadId,
+             lockVer,
+             timeout,
+             !inTx(),
+             inTx(),
+             implicitSingleTx()
+         );
+ 
+         if (inTx()) {
+             IgniteTxEntry<K, V> txEntry = tx.entry(entry.txKey());
+ 
+             txEntry.cached(entry, txEntry.keyBytes());
+         }
+ 
+         if (c != null)
+             c.topologyVersion(topVer);
+ 
+         synchronized (mux) {
+             entries.add(entry);
+         }
+ 
+         if (c == null && timeout < 0) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to acquire lock with negative timeout: " + 
entry);
+ 
+             onFailed(false);
+ 
+             return null;
+         }
+ 
+         // Double check if lock acquisition has already timed out.
+         if (timedOut) {
+             entry.removeLock(lockVer);
+ 
+             return null;
+         }
+ 
+         return c;
+     }
+ 
+     /**
+      * Undoes all locks.
+      *
+      * @param dist If {@code true}, then remove locks from remote nodes as 
well.
+      */
+     private void undoLocks(boolean dist) {
+         // Transactions will undo during rollback.
+         if (dist && tx == null)
+             cctx.nearTx().removeLocks(lockVer, keys);
+         else {
+             if (tx != null) {
+                 if (tx.setRollbackOnly()) {
+                     if (log.isDebugEnabled())
+                         log.debug("Marked transaction as rollback only 
because locks could not be acquired: " + tx);
+                 }
+                 else if (log.isDebugEnabled())
+                     log.debug("Transaction was not marked rollback-only while 
locks were not acquired: " + tx);
+             }
+ 
+             for (GridCacheEntryEx<K, V> e : entriesCopy()) {
+                 try {
+                     e.removeLock(lockVer);
+                 }
+                 catch (GridCacheEntryRemovedException ignored) {
+                     while (true) {
+                         try {
+                             e = cctx.cache().peekEx(e.key());
+ 
+                             if (e != null)
+                                 e.removeLock(lockVer);
+ 
+                             break;
+                         }
+                         catch (GridCacheEntryRemovedException ignore) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Attempted to remove lock on 
removed entry (will retry) [ver=" +
+                                     lockVer + ", entry=" + e + ']');
+                         }
+                     }
+                 }
+             }
+         }
+ 
+         cctx.mvcc().recheckPendingLocks();
+     }
+ 
+     /**
+      *
+      * @param dist {@code True} if need to distribute lock release.
+      */
+     private void onFailed(boolean dist) {
+         undoLocks(dist);
+ 
+         complete(false);
+     }
+ 
+     /**
+      * @param success Success flag.
+      */
+     public void complete(boolean success) {
+         onComplete(success, true);
+     }
+ 
+     /**
+      * @param nodeId Left node ID
+      * @return {@code True} if node was in the list.
+      */
+     @SuppressWarnings({"ThrowableInstanceNeverThrown"})
+     @Override public boolean onNodeLeft(UUID nodeId) {
+         boolean found = false;
+ 
+         for (IgniteFuture<?> fut : futures()) {
+             if (isMini(fut)) {
+                 MiniFuture f = (MiniFuture)fut;
+ 
+                 if (f.node().id().equals(nodeId)) {
+                     if (log.isDebugEnabled())
+                         log.debug("Found mini-future for left node [nodeId=" 
+ nodeId + ", mini=" + f + ", fut=" +
+                             this + ']');
+ 
+                     f.onResult(newTopologyException(null, nodeId));
+ 
+                     found = true;
+                 }
+             }
+         }
+ 
+         if (!found) {
+             if (log.isDebugEnabled())
+                 log.debug("Near lock future does not have mapping for left 
node (ignoring) [nodeId=" + nodeId +
+                     ", fut=" + this + ']');
+         }
+ 
+         return found;
+     }
+ 
+     /**
+      * @param nodeId Sender.
+      * @param res Result.
+      */
+     void onResult(UUID nodeId, GridNearLockResponse<K, V> res) {
+         if (!isDone()) {
+             if (log.isDebugEnabled())
+                 log.debug("Received lock response from node [nodeId=" + 
nodeId + ", res=" + res + ", fut=" + this + ']');
+ 
+             for (IgniteFuture<Boolean> fut : pending()) {
+                 if (isMini(fut)) {
+                     MiniFuture mini = (MiniFuture)fut;
+ 
+                     if (mini.futureId().equals(res.miniId())) {
+                         assert mini.node().id().equals(nodeId);
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Found mini future for response [mini=" 
+ mini + ", res=" + res + ']');
+ 
+                         mini.onResult(res);
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Future after processed lock response 
[fut=" + this + ", mini=" + mini +
+                                 ", res=" + res + ']');
+ 
+                         return;
+                     }
+                 }
+             }
+ 
+             U.warn(log, "Failed to find mini future for response (perhaps due 
to stale message) [res=" + res +
+                 ", fut=" + this + ']');
+         }
+         else if (log.isDebugEnabled())
+             log.debug("Ignoring lock response from node (future is done) 
[nodeId=" + nodeId + ", res=" + res +
+                 ", fut=" + this + ']');
+     }
+ 
+     /**
+      * @param t Error.
+      */
+     private void onError(Throwable t) {
+         err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? 
null : t);
+     }
+ 
+     /**
+      * @param cached Entry to check.
+      * @return {@code True} if filter passed.
+      */
+     private boolean filter(GridCacheEntryEx<K, V> cached) {
+         try {
+             if (!cctx.isAll(cached, filter)) {
+                 if (log.isDebugEnabled())
+                     log.debug("Filter didn't pass for entry (will fail lock): 
" + cached);
+ 
+                 onFailed(true);
+ 
+                 return false;
+             }
+ 
+             return true;
+         }
+         catch (IgniteCheckedException e) {
+             onError(e);
+ 
+             return false;
+         }
+     }
+ 
+     /**
+      * Callback for whenever entry lock ownership changes.
+      *
+      * @param entry Entry whose lock ownership changed.
+      */
+     @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, 
GridCacheMvccCandidate<K> owner) {
+         if (owner != null && owner.version().equals(lockVer)) {
+             onDone(true);
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * @return {@code True} if locks have been acquired.
+      */
+     private boolean checkLocks() {
+         if (!isDone() && initialized() && !hasPending()) {
+             for (int i = 0; i < entries.size(); i++) {
+                 while (true) {
+                     GridCacheEntryEx<K, V> cached = entries.get(i);
+ 
+                     try {
+                         if (!locked(cached)) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Lock is still not acquired for 
entry (will keep waiting) [entry=" +
+                                     cached + ", fut=" + this + ']');
+ 
+                             return false;
+                         }
+ 
+                         break;
+                     }
+                     // Possible in concurrent cases, when owner is changed 
after locks
+                     // have been released or cancelled.
+                     catch (GridCacheEntryRemovedException ignore) {
+                         if (log.isDebugEnabled())
+                             log.debug("Got removed entry in onOwnerChanged 
method (will retry): " + cached);
+ 
+                         // Replace old entry with new one.
+                         entries.set(i, (GridDistributedCacheEntry<K, 
V>)cctx.cache().entryEx(cached.key()));
+                     }
+                 }
+             }
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Local lock acquired for entries [fut=" + this + ", 
entries=" + entries + "]");
+ 
+             onComplete(true, true);
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean cancel() {
+         if (onCancelled())
+             onComplete(false, true);
+ 
+         return isCancelled();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onDone(Boolean success, Throwable err) {
+         if (log.isDebugEnabled())
+             log.debug("Received onDone(..) callback [success=" + success + ", 
err=" + err + ", fut=" + this + ']');
+ 
+         // If locks were not acquired yet, delay completion.
+         if (isDone() || (err == null && success && !checkLocks()))
+             return false;
+ 
+         this.err.compareAndSet(null, err instanceof 
GridCacheLockTimeoutException ? null : err);
+ 
+         if (err != null)
+             success = false;
+ 
+         return onComplete(success, true);
+     }
+ 
+     /**
+      * Completeness callback.
+      *
+      * @param success {@code True} if lock was acquired.
+      * @param distribute {@code True} if need to distribute lock removal in 
case of failure.
+      * @return {@code True} if complete by this operation.
+      */
+     private boolean onComplete(boolean success, boolean distribute) {
+         if (log.isDebugEnabled())
+             log.debug("Received onComplete(..) callback [success=" + success 
+ ", distribute=" + distribute +
+                 ", fut=" + this + ']');
+ 
+         if (!success)
+             undoLocks(distribute);
+ 
+         if (tx != null)
+             cctx.tm().txContext(tx);
+ 
+         if (super.onDone(success, err.get())) {
+             if (log.isDebugEnabled())
+                 log.debug("Completing future: " + this);
+ 
+             // Clean up.
+             cctx.mvcc().removeFuture(this);
+ 
+             if (timeoutObj != null)
+                 cctx.time().removeTimeoutObject(timeoutObj);
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int hashCode() {
+         return futId.hashCode();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridNearLockFuture.class, this, "inTx", inTx(), 
"super", super.toString());
+     }
+ 
+     /**
+      * @param f Future.
+      * @return {@code True} if mini-future.
+      */
+     private boolean isMini(IgniteFuture<?> f) {
+         return f.getClass().equals(MiniFuture.class);
+     }
+ 
+     /**
+      * Basically, future mapping consists from two parts. First, we must 
determine the topology version this future
+      * will map on. Locking is performed within a user transaction, we must 
continue to map keys on the same
+      * topology version as it started. If topology version is undefined, we 
get current topology future and wait
+      * until it completes so the topology is ready to use.
+      * <p/>
+      * During the second part we map keys to primary nodes using topology 
snapshot we obtained during the first
+      * part. Note that if primary node leaves grid, the future will fail and 
transaction will be rolled back.
+      */
+     void map() {
+         // Obtain the topology version to use.
+         GridDiscoveryTopologySnapshot snapshot = tx != null ? 
tx.topologySnapshot() :
+             
cctx.mvcc().lastExplicitLockTopologySnapshot(Thread.currentThread().getId());
+ 
+         if (snapshot != null) {
+             // Continue mapping on the same topology version as it was before.
+             topSnapshot.compareAndSet(null, snapshot);
+ 
+             map(keys);
+ 
+             markInitialized();
+ 
+             return;
+         }
+ 
+         // Must get topology snapshot and map on that version.
+         mapOnTopology();
+     }
+ 
+     /**
+      * Acquires topology future and checks it completeness under the read 
lock. If it is not complete,
+      * will asynchronously wait for it's completeness and then try again.
+      */
+     void mapOnTopology() {
+         // We must acquire topology snapshot from the topology version future.
+         try {
+             cctx.topology().readLock();
+ 
+             try {
+                 GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
+ 
+                 if (fut.isDone()) {
+                     GridDiscoveryTopologySnapshot snapshot = 
fut.topologySnapshot();
+ 
+                     if (tx != null) {
+                         tx.topologyVersion(snapshot.topologyVersion());
+                         tx.topologySnapshot(snapshot);
+                     }
+ 
+                     topSnapshot.compareAndSet(null, snapshot);
+ 
+                     map(keys);
+ 
+                     markInitialized();
+                 }
+                 else {
+                     fut.listenAsync(new CI1<IgniteFuture<Long>>() {
+                         @Override public void apply(IgniteFuture<Long> t) {
+                             mapOnTopology();
+                         }
+                     });
+                 }
+             }
+             finally {
+                 cctx.topology().readUnlock();
+             }
+         }
+         catch (IgniteCheckedException e) {
+             onDone(e);
+         }
+     }
+ 
+     /**
+      * Maps keys to nodes. Note that we can not simply group keys by nodes 
and send lock request as
+      * such approach does not preserve order of lock acquisition. Instead, 
keys are split in continuous
+      * groups belonging to one primary node and locks for these groups are 
acquired sequentially.
+      *
+      * @param keys Keys.
+      */
+     private void map(Iterable<? extends K> keys) {
+         try {
+             GridDiscoveryTopologySnapshot snapshot = topSnapshot.get();
+ 
+             assert snapshot != null;
+ 
+             long topVer = snapshot.topologyVersion();
+ 
+             assert topVer > 0;
+ 
+             if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+                 onDone(new ClusterTopologyException("Failed to map keys for 
near-only cache (all " +
+                     "partition nodes left the grid)."));
+ 
+                 return;
+             }
+ 
+             ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings =
+                 new ConcurrentLinkedDeque8<>();
+ 
+             // Assign keys to primary nodes.
+             GridNearLockMapping<K, V> map = null;
+ 
+             for (K key : keys) {
+                 GridNearLockMapping<K, V> updated = map(key, map, topVer);
+ 
+                 // If new mapping was created, add to collection.
+                 if (updated != map) {
+                     mappings.add(updated);
+ 
+                     if (tx != null && updated.node().isLocal())
+                         tx.nearLocallyMapped(true);
+                 }
+ 
+                 map = updated;
+             }
+ 
+             if (isDone()) {
+                 if (log.isDebugEnabled())
+                     log.debug("Abandoning (re)map because future is done: " + 
this);
+ 
+                 return;
+             }
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Starting (re)map for mappings [mappings=" + 
mappings + ", fut=" + this + ']');
+ 
+             // Create mini futures.
+             for (Iterator<GridNearLockMapping<K, V>> iter = 
mappings.iterator(); iter.hasNext(); ) {
+                 GridNearLockMapping<K, V> mapping = iter.next();
+ 
+                 ClusterNode node = mapping.node();
+                 Collection<K> mappedKeys = mapping.mappedKeys();
+ 
+                 assert !mappedKeys.isEmpty();
+ 
+                 GridNearLockRequest<K, V> req = null;
+ 
+                 Collection<K> distributedKeys = new 
ArrayList<>(mappedKeys.size());
+ 
+                 boolean explicit = false;
+ 
+                 for (K key : mappedKeys) {
+                     IgniteTxKey<K> txKey = cctx.txKey(key);
+ 
+                     while (true) {
+                         GridNearCacheEntry<K, V> entry = null;
+ 
+                         try {
+                             entry = cctx.near().entryExx(key, topVer);
+ 
+                             if (!cctx.isAll(entry.wrap(false), filter)) {
+                                 if (log.isDebugEnabled())
+                                     log.debug("Entry being locked did not 
pass filter (will not lock): " + entry);
+ 
+                                 onComplete(false, false);
+ 
+                                 return;
+                             }
+ 
+                             // Removed exception may be thrown here.
+                             GridCacheMvccCandidate<K> cand = addEntry(topVer, 
entry, node.id());
+ 
+                             if (isDone()) {
+                                 if (log.isDebugEnabled())
+                                     log.debug("Abandoning (re)map because 
future is done after addEntry attempt " +
+                                         "[fut=" + this + ", entry=" + entry + 
']');
+ 
+                                 return;
+                             }
+ 
+                             if (cand != null) {
+                                 if (tx == null && !cand.reentry())
+                                     cctx.mvcc().addExplicitLock(threadId, 
cand, snapshot);
+ 
+                                 GridTuple3<GridCacheVersion, V, byte[]> val = 
entry.versionedValue();
+ 
+                                 if (val == null) {
+                                     GridDhtCacheEntry<K, V> dhtEntry = 
dht().peekExx(key);
+ 
+                                     try {
+                                         if (dhtEntry != null)
+                                             val = 
dhtEntry.versionedValue(topVer);
+                                     }
+                                     catch (GridCacheEntryRemovedException 
ignored) {
+                                         assert dhtEntry.obsolete() : " Got 
removed exception for non-obsolete entry: "
+                                             + dhtEntry;
+ 
+                                         if (log.isDebugEnabled())
+                                             log.debug("Got removed exception 
for DHT entry in map (will ignore): "
+                                                 + dhtEntry);
+                                     }
+                                 }
+ 
+                                 GridCacheVersion dhtVer = null;
+ 
+                                 if (val != null) {
+                                     dhtVer = val.get1();
+ 
+                                     valMap.put(key, val);
+                                 }
+ 
+                                 if (!cand.reentry()) {
+                                     if (req == null) {
+                                         req = new GridNearLockRequest<>(
+                                             cctx.cacheId(),
+                                             topVer,
+                                             cctx.nodeId(),
+                                             threadId,
+                                             futId,
+                                             lockVer,
+                                             inTx(),
+                                             implicitTx(),
+                                             implicitSingleTx(),
+                                             read,
+                                             isolation(),
+                                             isInvalidate(),
+                                             timeout,
+                                             mappedKeys.size(),
+                                             inTx() ? tx.size() : 
mappedKeys.size(),
+                                             inTx() && tx.syncCommit(),
+                                             inTx() ? tx.groupLockKey() : null,
+                                             inTx() && tx.partitionLock(),
+                                             inTx() ? tx.subjectId() : null,
+                                             inTx() ? tx.taskNameHash() : 0,
+                                             read ? accessTtl : -1L);
+ 
+                                         mapping.request(req);
+                                     }
+ 
+                                     distributedKeys.add(key);
+ 
+                                     IgniteTxEntry<K, V> writeEntry = tx != 
null ? tx.writeMap().get(txKey) : null;
+ 
+                                     if (tx != null)
+                                         tx.addKeyMapping(txKey, 
mapping.node());
+ 
+                                     req.addKeyBytes(
+                                         key,
+                                         node.isLocal() ? null : 
entry.getOrMarshalKeyBytes(),
+                                         retval && dhtVer == null,
+                                         dhtVer, // Include DHT version to 
match remote DHT entry.
+                                         writeEntry,
+                                         inTx() ? tx.entry(txKey).drVersion() 
: null,
+                                         cctx);
+ 
+                                     // Clear transfer required flag since we 
are sending message.
+                                     if (writeEntry != null)
+                                         writeEntry.transferRequired(false);
+                                 }
+ 
+                                 if (cand.reentry())
+                                     explicit = tx != null && 
!entry.hasLockCandidate(tx.xidVersion());
+                             }
+                             else
+                                 // Ignore reentries within transactions.
+                                 explicit = tx != null && 
!entry.hasLockCandidate(tx.xidVersion());
+ 
+                             if (explicit)
+                                 tx.addKeyMapping(txKey, mapping.node());
+ 
+                             break;
+                         }
+                         catch (GridCacheEntryRemovedException ignored) {
+                             assert entry.obsolete() : "Got removed exception 
on non-obsolete entry: " + entry;
+ 
+                             if (log.isDebugEnabled())
+                                 log.debug("Got removed entry in lockAsync(..) 
method (will retry): " + entry);
+                         }
+                     }
+ 
+                     // Mark mapping explicit lock flag.
+                     if (explicit) {
+                         boolean marked = tx != null && 
tx.markExplicit(node.id());
+ 
+                         assert tx == null || marked;
+                     }
+                 }
+ 
+                 if (!distributedKeys.isEmpty())
+                     mapping.distributedKeys(distributedKeys);
+                 else {
+                     assert mapping.request() == null;
+ 
+                     iter.remove();
+                 }
+             }
+ 
+             cctx.mvcc().recheckPendingLocks();
+ 
+             proceedMapping(mappings);
+         }
+         catch (IgniteCheckedException ex) {
+             onError(ex);
+         }
+     }
+ 
+     /**
+      * Gets next near lock mapping and either acquires dht locks locally or 
sends near lock request to
+      * remote primary node.
+      *
+      * @param mappings Queue of mappings.
+      * @throws IgniteCheckedException If mapping can not be completed.
+      */
+     private void proceedMapping(final 
ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings)
+         throws IgniteCheckedException {
+         GridNearLockMapping<K, V> map = mappings.poll();
+ 
+         // If there are no more mappings to process, complete the future.
+         if (map == null)
+             return;
+ 
+         final GridNearLockRequest<K, V> req = map.request();
+         final Collection<K> mappedKeys = map.distributedKeys();
+         final ClusterNode node = map.node();
+ 
+         if (filter != null && filter.length != 0)
+             req.filter(filter, cctx);
+ 
+         if (node.isLocal()) {
+             req.miniId(IgniteUuid.randomUuid());
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Before locally locking near request: " + req);
+ 
+             IgniteFuture<GridNearLockResponse<K, V>> fut = 
dht().lockAllAsync(cctx, cctx.localNode(), req, filter);
+ 
+             // Add new future.
+             add(new GridEmbeddedFuture<>(
+                 cctx.kernalContext(),
+                 fut,
+                 new C2<GridNearLockResponse<K, V>, Exception, Boolean>() {
+                     @Override public Boolean apply(GridNearLockResponse<K, V> 
res, Exception e) {
+                         if (CU.isLockTimeoutOrCancelled(e) ||
+                             (res != null && 
CU.isLockTimeoutOrCancelled(res.error())))
+                             return false;
+ 
+                         if (e != null) {
+                             onError(e);
+ 
+                             return false;
+                         }
+ 
+                         if (res == null) {
+                             onError(new IgniteCheckedException("Lock response 
is null for future: " + this));
+ 
+                             return false;
+                         }
+ 
+                         if (res.error() != null) {
+                             onError(res.error());
+ 
+                             return false;
+                         }
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Acquired lock for local DHT mapping 
[locId=" + cctx.nodeId() +
+                                 ", mappedKeys=" + mappedKeys + ", fut=" + 
GridNearLockFuture.this + ']');
+ 
+                         try {
+                             int i = 0;
+ 
+                             for (K k : mappedKeys) {
+                                 while (true) {
+                                     GridNearCacheEntry<K, V> entry = 
cctx.near().entryExx(k, req.topologyVersion());
+ 
+                                     try {
+                                         GridTuple3<GridCacheVersion, V, 
byte[]> oldValTup = valMap.get(entry.key());
+ 
+                                         boolean hasBytes = entry.hasValue();
+                                         V oldVal = entry.rawGet();
+                                         V newVal = res.value(i);
+                                         byte[] newBytes = res.valueBytes(i);
+ 
+                                         GridCacheVersion dhtVer = 
res.dhtVersion(i);
+                                         GridCacheVersion mappedVer = 
res.mappedVersion(i);
+ 
+                                         // On local node don't record twice 
if DHT cache already recorded.
+                                         boolean record = retval && oldValTup 
!= null && oldValTup.get1().equals(dhtVer);
+ 
+                                         if (newVal == null) {
+                                             if (oldValTup != null) {
+                                                 if 
(oldValTup.get1().equals(dhtVer)) {
+                                                     newVal = oldValTup.get2();
+ 
+                                                     newBytes = 
oldValTup.get3();
+                                                 }
+ 
+                                                 oldVal = oldValTup.get2();
+                                             }
+                                         }
+ 
+                                         // Lock is held at this point, so we 
can set the
+                                         // returned value if any.
+                                         entry.resetFromPrimary(newVal, 
newBytes, lockVer, dhtVer, node.id());
+ 
+                                         entry.readyNearLock(lockVer, 
mappedVer, res.committedVersions(),
+                                             res.rolledbackVersions(), 
res.pending());
+ 
+                                         if (inTx() && implicitTx() && 
tx.onePhaseCommit()) {
+                                             boolean pass = 
res.filterResult(i);
+ 
+                                             
tx.entry(cctx.txKey(k)).filters(pass ? CU.<K, V>empty() : CU.<K, 
V>alwaysFalse());
+                                         }
+ 
+                                         if (record) {
+                                             if 
(cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
+                                                 cctx.events().addEvent(
+                                                     entry.partition(),
+                                                     entry.key(),
+                                                     tx,
+                                                     null,
+                                                     EVT_CACHE_OBJECT_READ,
+                                                     newVal,
+                                                     newVal != null,
+                                                     oldVal,
+                                                     hasBytes,
+                                                     CU.subjectId(tx, 
cctx.shared()),
+                                                     null,
+                                                     inTx() ? 
tx.resolveTaskName() : null);
+ 
 -                                            
cctx.cache().metrics0().onRead(oldVal != null);
++                                            if 
(cctx.cache().configuration().isStatisticsEnabled())
++                                                
cctx.cache().metrics0().onRead(oldVal != null);
+                                         }
+ 
+                                         if (log.isDebugEnabled())
+                                             log.debug("Processed response for 
entry [res=" + res +
+                                                 ", entry=" + entry + ']');
+ 
+                                         break; // Inner while loop.
+                                     }
+                                     catch (GridCacheEntryRemovedException 
ignored) {
+                                         if (log.isDebugEnabled())
+                                             log.debug("Failed to add 
candidates because entry was " +
+                                                 "removed (will renew).");
+ 
+                                         // Replace old entry with new one.
+                                         entries.set(i, 
(GridDistributedCacheEntry<K, V>)
+                                             
cctx.cache().entryEx(entry.key()));
+                                     }
+                                 }
+ 
+                                 i++; // Increment outside of while loop.
+                             }
+ 
+                             // Proceed and add new future (if any) before 
completing embedded future.
+                             proceedMapping(mappings);
+                         }
+                         catch (IgniteCheckedException ex) {
+                             onError(ex);
+ 
+                             return false;
+                         }
+ 
+                         return true;
+                     }
+                 }
+             ));
+         }
+         else {
+             final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings);
+ 
+             req.miniId(fut.futureId());
+ 
+             add(fut); // Append new future.
+ 
+             IgniteFuture<?> txSync = null;
+ 
+             if (inTx())
+                 txSync = cctx.tm().awaitFinishAckAsync(node.id(), 
tx.threadId());
+ 
+             if (txSync == null || txSync.isDone()) {
+                 try {
+                     if (log.isDebugEnabled())
+                         log.debug("Sending near lock request [node=" + 
node.id() + ", req=" + req + ']');
+ 
+                     cctx.io().send(node, req, cctx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
+                 }
+                 catch (ClusterTopologyException ex) {
+                     assert fut != null;
+ 
+                     fut.onResult(ex);
+                 }
+             }
+             else {
+                 txSync.listenAsync(new CI1<IgniteFuture<?>>() {
+                     @Override public void apply(IgniteFuture<?> t) {
+                         try {
+                             if (log.isDebugEnabled())
+                                 log.debug("Sending near lock request [node=" 
+ node.id() + ", req=" + req + ']');
+ 
+                             cctx.io().send(node, req, cctx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
+                         }
+                         catch (ClusterTopologyException ex) {
+                             assert fut != null;
+ 
+                             fut.onResult(ex);
+                         }
+                         catch (IgniteCheckedException e) {
+                             onError(e);
+                         }
+                     }
+                 });
+             }
+         }
+     }
+ 
+     /**
+      * @param mapping Mappings.
+      * @param key Key to map.
+      * @param topVer Topology version.
+      * @return Near lock mapping.
+      * @throws IgniteCheckedException If mapping for key failed.
+      */
+     private GridNearLockMapping<K, V> map(K key, @Nullable 
GridNearLockMapping<K, V> mapping,
+         long topVer) throws IgniteCheckedException {
+         assert mapping == null || mapping.node() != null;
+ 
+         ClusterNode primary = cctx.affinity().primary(key, topVer);
+ 
+         if (cctx.discovery().node(primary.id()) == null)
+             // If primary node left the grid before lock acquisition, fail 
the whole future.
+             throw newTopologyException(null, primary.id());
+ 
+         if (inTx() && tx.groupLock() && !primary.isLocal())
+             throw new IgniteCheckedException("Failed to start group lock 
transaction (local node is not primary for " +
+                 " key) [key=" + key + ", primaryNodeId=" + primary.id() + 
']');
+ 
+         if (mapping == null || !primary.id().equals(mapping.node().id()))
+             mapping = new GridNearLockMapping<>(primary, key);
+         else
+             mapping.addKey(key);
+ 
+         return mapping;
+     }
+ 
+     /**
+      * @return DHT cache.
+      */
+     private GridDhtTransactionalCacheAdapter<K, V> dht() {
+         return cctx.nearTx().dht();
+     }
+ 
+     /**
+      * Creates new topology exception for cases when primary node leaves grid 
during mapping.
+      *
+      * @param nested Optional nested exception.
+      * @param nodeId Node ID.
+      * @return Topology exception with user-friendly message.
+      */
+     private ClusterTopologyException newTopologyException(@Nullable Throwable 
nested, UUID nodeId) {
+         return new ClusterTopologyException("Failed to acquire lock for keys 
(primary node left grid, " +
+             "retry transaction if possible) [keys=" + keys + ", node=" + 
nodeId + ']', nested);
+     }
+ 
+     /**
+      * Lock request timeout object.
+      */
+     private class LockTimeoutObject extends GridTimeoutObjectAdapter {
+         /**
+          * Default constructor.
+          */
+         LockTimeoutObject() {
+             super(timeout);
+         }
+ 
+         /** {@inheritDoc} */
+         @SuppressWarnings({"ThrowableInstanceNeverThrown"})
+         @Override public void onTimeout() {
+             if (log.isDebugEnabled())
+                 log.debug("Timed out waiting for lock response: " + this);
+ 
+             timedOut = true;
+ 
+             onComplete(false, true);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(LockTimeoutObject.class, this);
+         }
+     }
+ 
+     /**
+      * Mini-future for get operations. Mini-futures are only waiting on a 
single
+      * node as opposed to multiple nodes.
+      */
+     private class MiniFuture extends GridFutureAdapter<Boolean> {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** */
+         private final IgniteUuid futId = IgniteUuid.randomUuid();
+ 
+         /** Node ID. */
+         @GridToStringExclude
+         private ClusterNode node;
+ 
+         /** Keys. */
+         @GridToStringInclude
+         private Collection<K> keys;
+ 
+         /** Mappings to proceed. */
+         @GridToStringExclude
+         private ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings;
+ 
+         /** */
+         private AtomicBoolean rcvRes = new AtomicBoolean(false);
+ 
+         /**
+          * Empty constructor required for {@link Externalizable}.
+          */
+         public MiniFuture() {
+             // No-op.
+         }
+ 
+         /**
+          * @param node Node.
+          * @param keys Keys.
+          * @param mappings Mappings to proceed.
+          */
+         MiniFuture(ClusterNode node, Collection<K> keys,
+             ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings) {
+             super(cctx.kernalContext());
+ 
+             this.node = node;
+             this.keys = keys;
+             this.mappings = mappings;
+         }
+ 
+         /**
+          * @return Future ID.
+          */
+         IgniteUuid futureId() {
+             return futId;
+         }
+ 
+         /**
+          * @return Node ID.
+          */
+         public ClusterNode node() {
+             return node;
+         }
+ 
+         /**
+          * @return Keys.
+          */
+         public Collection<K> keys() {
+             return keys;
+         }
+ 
+         /**
+          * @param e Error.
+          */
+         void onResult(Throwable e) {
+             if (rcvRes.compareAndSet(false, true)) {
+                 if (log.isDebugEnabled())
+                     log.debug("Failed to get future result [fut=" + this + ", 
err=" + e + ']');
+ 
+                 // Fail.
+                 onDone(e);
+             }
+             else
+                 U.warn(log, "Received error after another result has been 
processed [fut=" + GridNearLockFuture.this +
+                     ", mini=" + this + ']', e);
+         }
+ 
+         /**
+          * @param e Node left exception.
+          */
+         void onResult(ClusterTopologyException e) {
+             if (isDone())
+                 return;
+ 
+             if (rcvRes.compareAndSet(false, true)) {
+                 if (log.isDebugEnabled())
+                     log.debug("Remote node left grid while sending or waiting 
for reply (will fail): " + this);
+ 
+                 if (tx != null)
+                     tx.removeMapping(node.id());
+ 
+                 // Primary node left the grid, so fail the future.
+                 GridNearLockFuture.this.onDone(newTopologyException(e, 
node.id()));
+ 
+                 onDone(true);
+             }
+         }
+ 
+         /**
+          * @param res Result callback.
+          */
+         void onResult(GridNearLockResponse<K, V> res) {
+             if (rcvRes.compareAndSet(false, true)) {
+                 if (res.error() != null) {
+                     if (log.isDebugEnabled())
+                         log.debug("Finishing mini future with an error due to 
error in response [miniFut=" + this +
+                             ", res=" + res + ']');
+ 
+                     // Fail.
+                     if (res.error() instanceof GridCacheLockTimeoutException)
+                         onDone(false);
+                     else
+                         onDone(res.error());
+ 
+                     return;
+                 }
+ 
+                 int i = 0;
+ 
+                 long topVer = topSnapshot.get().topologyVersion();
+ 
+                 for (K k : keys) {
+                     while (true) {
+                         GridNearCacheEntry<K, V> entry = 
cctx.near().entryExx(k, topVer);
+ 
+                         try {
+                             if (res.dhtVersion(i) == null) {
+                                 onDone(new IgniteCheckedException("Failed to 
receive DHT version from remote node " +
+                                     "(will fail the lock): " + res));
+ 
+                                 return;
+                             }
+ 
+                             GridTuple3<GridCacheVersion, V, byte[]> oldValTup 
= valMap.get(entry.key());
+ 
+                             V oldVal = entry.rawGet();
+                             boolean hasOldVal = false;
+                             V newVal = res.value(i);
+                             byte[] newBytes = res.valueBytes(i);
+ 
+                             boolean readRecordable = false;
+ 
+                             if (retval) {
+                                 readRecordable = 
cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+ 
+                                 if (readRecordable)
+                                     hasOldVal = entry.hasValue();
+                             }
+ 
+                             GridCacheVersion dhtVer = res.dhtVersion(i);
+                             GridCacheVersion mappedVer = res.mappedVersion(i);
+ 
+                             if (newVal == null) {
+                                 if (oldValTup != null) {
+                                     if (oldValTup.get1().equals(dhtVer)) {
+                                         newVal = oldValTup.get2();
+ 
+                                         newBytes = oldValTup.get3();
+                                     }
+ 
+                                     oldVal = oldValTup.get2();
+                                 }
+                             }
+ 
+                             // Lock is held at this point, so we can set the
+                             // returned value if any.
+                             entry.resetFromPrimary(newVal, newBytes, lockVer, 
dhtVer, node.id());
+ 
+                             if (inTx() && implicitTx() && 
tx.onePhaseCommit()) {
+                                 boolean pass = res.filterResult(i);
+ 
+                                 tx.entry(cctx.txKey(k)).filters(pass ? CU.<K, 
V>empty() : CU.<K, V>alwaysFalse());
+                             }
+ 
+                             entry.readyNearLock(lockVer, mappedVer, 
res.committedVersions(), res.rolledbackVersions(),
+                                 res.pending());
+ 
+                             if (retval) {
+                                 if (readRecordable)
+                                     cctx.events().addEvent(
+                                         entry.partition(),
+                                         entry.key(),
+                                         tx,
+                                         null,
+                                         EVT_CACHE_OBJECT_READ,
+                                         newVal,
+                                         newVal != null || newBytes != null,
+                                         oldVal,
+                                         hasOldVal,
+                                         CU.subjectId(tx, cctx.shared()),
+                                         null,
+                                         inTx() ? tx.resolveTaskName() : null);
+ 
 -                                cctx.cache().metrics0().onRead(false);
++                                if 
(cctx.cache().configuration().isStatisticsEnabled())
++                                    cctx.cache().metrics0().onRead(false);
+                             }
+ 
+                             if (log.isDebugEnabled())
+                                 log.debug("Processed response for entry 
[res=" + res + ", entry=" + entry + ']');
+ 
+                             break; // Inner while loop.
+                         }
+                         catch (GridCacheEntryRemovedException ignored) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Failed to add candidates because 
entry was removed (will renew).");
+ 
+                             // Replace old entry with new one.
+                             entries.set(i, (GridDistributedCacheEntry<K, 
V>)cctx.cache().entryEx(entry.key()));
+                         }
+                         catch (IgniteCheckedException e) {
+                             onDone(e);
+ 
+                             return;
+                         }
+                     }
+ 
+                     i++;
+                 }
+ 
+                 try {
+                     proceedMapping(mappings);
+                 }
+                 catch (IgniteCheckedException e) {
+                     onDone(e);
+                 }
+ 
+                 onDone(true);
+             }
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(MiniFuture.class, this, "node", node.id(), 
"super", super.toString());
+         }
+     }
+ }

Reply via email to