http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 0000000,683b7b9..9b63363
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -1,0 -1,3005 +1,3007 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+ import org.apache.ignite.internal.processors.cache.distributed.near.*;
+ import org.apache.ignite.internal.processors.cache.dr.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ import sun.misc.*;
+ 
+ import javax.cache.expiry.*;
+ import javax.cache.processor.*;
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ import java.util.concurrent.locks.*;
+ 
+ import static org.apache.ignite.IgniteSystemProperties.*;
+ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
+ import static org.apache.ignite.cache.GridCachePeekMode.*;
+ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+ import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
+ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+ 
+ /**
+  * Non-transactional partitioned cache.
+  */
+ @GridToStringExclude
+ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Deferred update response buffer size. */
+     private static final int DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE =
+         Integer.getInteger(GG_ATOMIC_DEFERRED_ACK_BUFFER_SIZE, 256);
+ 
+     /** Deferred update response timeout. */
+     private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT =
+         Integer.getInteger(GG_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
+ 
+     /** Unsafe instance. */
+     private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+ 
+     /** Update reply closure. */
+     private CI2<GridNearAtomicUpdateRequest<K, V>, 
GridNearAtomicUpdateResponse<K, V>> updateReplyClos;
+ 
+     /** Pending  */
+     private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = 
new ConcurrentHashMap8<>();
+ 
+     /** */
+     private GridNearAtomicCache<K, V> near;
+ 
+     /**
+      * Empty constructor required by {@link Externalizable}.
+      */
+     public GridDhtAtomicCache() {
+         // No-op.
+     }
+ 
+     /**
+      * @param ctx Cache context.
+      */
+     public GridDhtAtomicCache(GridCacheContext<K, V> ctx) {
+         super(ctx);
+     }
+ 
+     /**
+      * @param ctx Cache context.
+      * @param map Cache concurrent map.
+      */
+     public GridDhtAtomicCache(GridCacheContext<K, V> ctx, 
GridCacheConcurrentMap<K, V> map) {
+         super(ctx, map);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isDhtAtomic() {
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void init() {
+         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+             /** {@inheritDoc} */
+             @Override public GridCacheMapEntry<K, V> 
create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+                 return new GridDhtAtomicCacheEntry<>(ctx, topVer, key, hash, 
val, next, ttl, hdrId);
+             }
+         });
+ 
+         updateReplyClos = new CI2<GridNearAtomicUpdateRequest<K, V>, 
GridNearAtomicUpdateResponse<K, V>>() {
+             @Override public void apply(GridNearAtomicUpdateRequest<K, V> 
req, GridNearAtomicUpdateResponse<K, V> res) {
+                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                     // Always send reply in CLOCK ordering mode.
+                     sendNearUpdateReply(res.nodeId(), res);
+ 
+                     return;
+                 }
+ 
+                 // Request should be for primary keys only in PRIMARY 
ordering mode.
+                 assert req.hasPrimary();
+ 
+                 if (req.writeSynchronizationMode() != FULL_ASYNC)
+                     sendNearUpdateReply(res.nodeId(), res);
+                 else {
+                     if (!F.isEmpty(res.remapKeys()))
+                         // Remap keys on primary node in FULL_ASYNC mode.
+                         remapToNewPrimary(req);
+                     else if (res.error() != null) {
+                         U.error(log, "Failed to process write update request 
in FULL_ASYNC mode for keys: " +
+                             res.failedKeys(), res.error());
+                     }
+                 }
+             }
+         };
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"IfMayBeConditional", "SimplifiableIfStatement"})
+     @Override public void start() throws IgniteCheckedException {
+         super.start();
+ 
 -        resetMetrics();
++        CacheMetricsAdapter m = new CacheMetricsAdapter(ctx);
++
++        if (ctx.dht().near() != null)
++            m.delegate(ctx.dht().near().metrics0());
++
++        metrics = m;
+ 
+         preldr = new GridDhtPreloader<>(ctx);
+ 
+         preldr.start();
+ 
+         ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new 
CI2<UUID, GridNearGetRequest<K, V>>() {
+             @Override public void apply(UUID nodeId, GridNearGetRequest<K, V> 
req) {
+                 processNearGetRequest(nodeId, req);
+             }
+         });
+ 
+         ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, 
new CI2<UUID, GridNearAtomicUpdateRequest<K, V>>() {
+             @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateRequest<K, V> req) {
+                 processNearAtomicUpdateRequest(nodeId, req);
+             }
+         });
+ 
+         ctx.io().addHandler(ctx.cacheId(), 
GridNearAtomicUpdateResponse.class, new CI2<UUID, 
GridNearAtomicUpdateResponse<K, V>>() {
+             @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateResponse<K, V> res) {
+                 processNearAtomicUpdateResponse(nodeId, res);
+             }
+         });
+ 
+         ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, 
new CI2<UUID, GridDhtAtomicUpdateRequest<K, V>>() {
+             @Override public void apply(UUID nodeId, 
GridDhtAtomicUpdateRequest<K, V> req) {
+                 processDhtAtomicUpdateRequest(nodeId, req);
+             }
+         });
+ 
+         ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, 
new CI2<UUID, GridDhtAtomicUpdateResponse<K, V>>() {
+             @Override public void apply(UUID nodeId, 
GridDhtAtomicUpdateResponse<K, V> res) {
+                 processDhtAtomicUpdateResponse(nodeId, res);
+             }
+         });
+ 
+         ctx.io().addHandler(ctx.cacheId(), 
GridDhtAtomicDeferredUpdateResponse.class,
+             new CI2<UUID, GridDhtAtomicDeferredUpdateResponse<K, V>>() {
+                 @Override public void apply(UUID nodeId, 
GridDhtAtomicDeferredUpdateResponse<K, V> res) {
+                     processDhtAtomicDeferredUpdateResponse(nodeId, res);
+                 }
+             });
+ 
+         if (near == null) {
+             ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse<K, V>>() {
+                 @Override public void apply(UUID nodeId, 
GridNearGetResponse<K, V> res) {
+                     processNearGetResponse(nodeId, res);
+                 }
+             });
+         }
+ 
+         ctx.io().addDisconnectListener(new GridDisconnectListener() {
+             @Override public void onNodeDisconnected(UUID nodeId) {
+                 scheduleAtomicFutureRecheck();
+             }
+         });
+     }
+ 
 -    /** {@inheritDoc} */
 -    @Override public void resetMetrics() {
 -        GridCacheMetricsAdapter m = new GridCacheMetricsAdapter();
 -
 -        if (ctx.dht().near() != null)
 -            m.delegate(ctx.dht().near().metrics0());
 -
 -        metrics = m;
 -
 -        ctx.dr().resetMetrics();
 -    }
 -
+     /**
+      * @param near Near cache.
+      */
+     public void near(GridNearAtomicCache<K, V> near) {
+         this.near = near;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridNearCacheAdapter<K, V> near() {
+         return near;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public CacheEntry<K, V> entry(K key) {
+         return new GridDhtCacheEntryImpl<>(ctx.projectionPerCall(), ctx, key, 
null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V peek(K key, @Nullable Collection<GridCachePeekMode> 
modes) throws IgniteCheckedException {
+         GridTuple<V> val = null;
+ 
+         if (ctx.isReplicated() || !modes.contains(NEAR_ONLY)) {
+             try {
+                 val = peek0(true, key, modes, ctx.tm().txx());
+             }
+             catch (GridCacheFilterFailedException ignored) {
+                 if (log.isDebugEnabled())
+                     log.debug("Filter validation failed for key: " + key);
+ 
+                 return null;
+             }
+         }
+ 
+         return val != null ? val.get() : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Map<K, V>> getAllAsync(
+         @Nullable final Collection<? extends K> keys,
+         final boolean forcePrimary,
+         boolean skipTx,
+         @Nullable final GridCacheEntryEx<K, V> entry,
+         @Nullable UUID subjId,
+         final String taskName,
+         final boolean deserializePortable,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+     ) {
+         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+ 
+         subjId = ctx.subjectIdPerCall(null, prj);
+ 
+         final UUID subjId0 = subjId;
+ 
+         final ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
+ 
+         return asyncOp(new CO<IgniteFuture<Map<K, V>>>() {
+             @Override public IgniteFuture<Map<K, V>> apply() {
+                 return getAllAsync0(keys,
+                     false,
+                     forcePrimary,
+                     filter,
+                     subjId0,
+                     taskName,
+                     deserializePortable,
+                     expiryPlc);
+             }
+         });
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V put(K key, V val, @Nullable GridCacheEntryEx<K, V> 
cached, long ttl,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) throws 
IgniteCheckedException {
+         return putAsync(key, val, cached, ttl, filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean putx(K key, V val, @Nullable GridCacheEntryEx<K, 
V> cached,
+         long ttl, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) 
throws IgniteCheckedException {
+         return putxAsync(key, val, cached, ttl, filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean putx(K key, V val,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) throws 
IgniteCheckedException {
+         return putxAsync(key, val, filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<V> putAsync(K key, V val, @Nullable 
GridCacheEntryEx<K, V> entry,
+         long ttl, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         return updateAllAsync0(F0.asMap(key, val),
+             null,
+             null,
+             null,
+             null,
+             true,
+             false,
+             entry,
+             filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable 
GridCacheEntryEx<K, V> entry, long ttl,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         return updateAllAsync0(F0.asMap(key, val),
+             null,
+             null,
+             null,
+             null,
+             false,
+             false,
+             entry,
+             filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V putIfAbsent(K key, V val) throws 
IgniteCheckedException {
+         return putIfAbsentAsync(key, val).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<V> putIfAbsentAsync(K key, V val) {
+         return putAsync(key, val, ctx.noPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean putxIfAbsent(K key, V val) throws 
IgniteCheckedException {
+         return putxIfAbsentAsync(key, val).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> putxIfAbsentAsync(K key, V val) {
+         return putxAsync(key, val, ctx.noPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V replace(K key, V val) throws IgniteCheckedException {
+         return replaceAsync(key, val).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<V> replaceAsync(K key, V val) {
+         return putAsync(key, val, ctx.hasPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean replacex(K key, V val) throws 
IgniteCheckedException {
+         return replacexAsync(key, val).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> replacexAsync(K key, V val) {
+         return putxAsync(key, val, ctx.hasPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean replace(K key, V oldVal, V newVal) throws 
IgniteCheckedException {
+         return replaceAsync(key, oldVal, newVal).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V 
newVal) {
+         return putxAsync(key, newVal, ctx.equalsPeekArray(oldVal));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheReturn<V> removex(K key, V val) throws 
IgniteCheckedException {
+         return removexAsync(key, val).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) 
throws IgniteCheckedException {
+         return replacexAsync(key, oldVal, newVal).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<GridCacheReturn<V>> removexAsync(K key, V 
val) {
+         return removeAllAsync0(F.asList(key), null, null, true, true, 
ctx.equalsPeekArray(val));
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<GridCacheReturn<V>> replacexAsync(K key, V 
oldVal, V newVal) {
+         return updateAllAsync0(F.asMap(key, newVal),
+             null,
+             null,
+             null,
+             null,
+             true,
+             true,
+             null,
+             ctx.equalsPeekArray(oldVal));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void putAll(Map<? extends K, ? extends V> m,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) throws 
IgniteCheckedException {
+         putAllAsync(m, filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> putAllAsync(Map<? extends K, ? extends 
V> m,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         return updateAllAsync0(m,
+             null,
+             null,
+             null,
+             null,
+             false,
+             false,
+             null,
+             filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> 
drMap) throws IgniteCheckedException {
+         putAllDrAsync(drMap).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> putAllDrAsync(Map<? extends K, 
GridCacheDrInfo<V>> drMap) {
+         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
+ 
+         return updateAllAsync0(null,
+             null,
+             null,
+             drMap,
+             null,
+             false,
+             false,
+             null,
+             null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public V remove(K key, @Nullable GridCacheEntryEx<K, V> entry,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws 
IgniteCheckedException {
+         return removeAsync(key, entry, filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<V> removeAsync(K key, @Nullable 
GridCacheEntryEx<K, V> entry,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         return removeAllAsync0(Collections.singletonList(key), null, entry, 
true, false, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void removeAll(Collection<? extends K> keys,
+         IgnitePredicate<CacheEntry<K, V>>... filter) throws 
IgniteCheckedException {
+         removeAllAsync(keys, filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> removeAllAsync(Collection<? extends K> 
keys,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         A.notNull(keys, "keys");
+ 
+         return removeAllAsync0(keys, null, null, false, false, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean removex(K key, @Nullable GridCacheEntryEx<K, V> 
entry,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws 
IgniteCheckedException {
+         return removexAsync(key, entry, filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<Boolean> removexAsync(K key, @Nullable 
GridCacheEntryEx<K, V> entry,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         return removeAllAsync0(Collections.singletonList(key), null, entry, 
false, false, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean remove(K key, V val) throws 
IgniteCheckedException {
+         return removeAsync(key, val).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> removeAsync(K key, V val) {
+         return removexAsync(key, ctx.equalsPeekArray(val));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void removeAll(IgnitePredicate<CacheEntry<K, V>>[] 
filter) throws IgniteCheckedException {
+         removeAllAsync(filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> 
removeAllAsync(IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         return removeAllAsync(keySet(filter), filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void removeAllDr(Map<? extends K, GridCacheVersion> 
drMap) throws IgniteCheckedException {
+         removeAllDrAsync(drMap).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> removeAllDrAsync(Map<? extends K, 
GridCacheVersion> drMap) {
+         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
+ 
+         return removeAllAsync0(null, drMap, null, false, false, null);
+     }
+ 
+     /**
+      * @return {@code True} if store write-through enabled.
+      */
+     private boolean writeThrough() {
+         return ctx.writeThrough() && ctx.store().configured();
+     }
+ 
+     /**
+      * @param op Operation closure.
+      * @return Future.
+      */
+     @SuppressWarnings("unchecked")
+     protected <T> IgniteFuture<T> asyncOp(final CO<IgniteFuture<T>> op) {
+         IgniteFuture<T> fail = asyncOpAcquire();
+ 
+         if (fail != null)
+             return fail;
+ 
+         FutureHolder holder = lastFut.get();
+ 
+         holder.lock();
+ 
+         try {
+             IgniteFuture fut = holder.future();
+ 
+             if (fut != null && !fut.isDone()) {
+                 IgniteFuture<T> f = new GridEmbeddedFuture<>(fut,
+                     new C2<T, Exception, IgniteFuture<T>>() {
+                         @Override public IgniteFuture<T> apply(T t, Exception 
e) {
+                             return op.apply();
+                         }
+                     }, ctx.kernalContext());
+ 
+                 saveFuture(holder, f);
+ 
+                 return f;
+             }
+ 
+             IgniteFuture<T> f = op.apply();
+ 
+             saveFuture(holder, f);
+ 
+             return f;
+         }
+         finally {
+             holder.unlock();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? 
extends K> keys,
+         long timeout,
+         @Nullable IgniteTxLocalEx<K, V> tx,
+         boolean isInvalidate,
+         boolean isRead,
+         boolean retval,
+         @Nullable IgniteTxIsolation isolation,
+         long accessTtl,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         return new FinishedLockFuture(new 
UnsupportedOperationException("Locks are not supported for " +
+             "CacheAtomicityMode.ATOMIC mode (use 
CacheAtomicityMode.TRANSACTIONAL instead)"));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> EntryProcessorResult<T> invoke(K key, 
EntryProcessor<K, V, T> entryProcessor, Object... args)
+         throws IgniteCheckedException {
+         return invokeAsync(key, entryProcessor, args).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
+         EntryProcessor<K, V, T> entryProcessor,
+         Object... args)
+         throws IgniteCheckedException {
+         return invokeAllAsync(keys, entryProcessor, args).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K 
key,
+         EntryProcessor<K, V, T> entryProcessor,
+         Object... args) {
+         A.notNull(key, "key", entryProcessor, "entryProcessor");
+ 
+         if (keyCheck)
+             validateCacheKey(key);
+ 
+         ctx.denyOnLocalRead();
+ 
+         Map<? extends K, EntryProcessor> invokeMap =
+             Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+ 
+         IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = 
updateAllAsync0(null,
+             invokeMap,
+             args,
+             null,
+             null,
+             true,
+             false,
+             null,
+             null);
+ 
+         return fut.chain(new CX1<IgniteFuture<Map<K, 
EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
+             @Override public EntryProcessorResult<T> 
applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut)
+                 throws IgniteCheckedException {
+                 Map<K, EntryProcessorResult<T>> resMap = fut.get();
+ 
+                 if (resMap != null) {
+                     assert resMap.isEmpty() || resMap.size() == 1 : 
resMap.size();
+ 
+                     return resMap.isEmpty() ? null : 
resMap.values().iterator().next();
+                 }
+ 
+                 return null;
+             }
+         });
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> 
invokeAllAsync(Set<? extends K> keys,
+         final EntryProcessor<K, V, T> entryProcessor,
+         Object... args) {
+         A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+ 
+         if (keyCheck)
+             validateCacheKeys(keys);
+ 
+         ctx.denyOnLocalRead();
+ 
+         Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new 
C1<K, EntryProcessor>() {
+             @Override public EntryProcessor apply(K k) {
+                 return entryProcessor;
+             }
+         });
+ 
+         return updateAllAsync0(null,
+             invokeMap,
+             args,
+             null,
+             null,
+             true,
+             false,
+             null,
+             null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+         Object... args) throws IgniteCheckedException {
+         return invokeAllAsync(map, args).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> 
invokeAllAsync(
+         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+         Object... args) {
+         A.notNull(map, "map");
+ 
+         if (keyCheck)
+             validateCacheKeys(map.keySet());
+ 
+         ctx.denyOnLocalRead();
+ 
+         return updateAllAsync0(null,
+             map,
+             args,
+             null,
+             null,
+             true,
+             false,
+             null,
+             null);
+     }
+ 
+     /**
+      * Entry point for all public API put/transform methods.
+      *
+      * @param map Put map. Either {@code map}, {@code invokeMap} or {@code 
drMap} should be passed.
+      * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or 
{@code drMap} should be passed.
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param drPutMap DR put map.
+      * @param drRmvMap DR remove map.
+      * @param retval Return value required flag.
+      * @param rawRetval Return {@code GridCacheReturn} instance.
+      * @param cached Cached cache entry for key. May be passed if and only if 
map size is {@code 1}.
+      * @param filter Cache entry filter for atomic updates.
+      * @return Completion future.
+      */
+     private IgniteFuture updateAllAsync0(
+         @Nullable final Map<? extends K, ? extends V> map,
+         @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
+         @Nullable Object[] invokeArgs,
+         @Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap,
+         @Nullable final Map<? extends K, GridCacheVersion> drRmvMap,
+         final boolean retval,
+         final boolean rawRetval,
+         @Nullable GridCacheEntryEx<K, V> cached,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+     ) {
+         if (map != null && keyCheck)
+             validateCacheKeys(map.keySet());
+ 
+         ctx.checkSecurity(GridSecurityPermission.CACHE_PUT);
+ 
+         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+ 
+         UUID subjId = ctx.subjectIdPerCall(null, prj);
+ 
+         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+ 
+         final GridNearAtomicUpdateFuture<K, V> updateFut = new 
GridNearAtomicUpdateFuture<>(
+             ctx,
+             this,
+             ctx.config().getWriteSynchronizationMode(),
+             invokeMap != null ? TRANSFORM : UPDATE,
+             map != null ? map.keySet() : invokeMap != null ? 
invokeMap.keySet() : drPutMap != null ?
+                 drPutMap.keySet() : drRmvMap.keySet(),
+             map != null ? map.values() : invokeMap != null ? 
invokeMap.values() : null,
+             invokeArgs,
+             drPutMap != null ? drPutMap.values() : null,
+             drRmvMap != null ? drRmvMap.values() : null,
+             retval,
+             rawRetval,
+             cached,
+             prj != null ? prj.expiry() : null,
+             filter,
+             subjId,
+             taskNameHash);
+ 
+         return asyncOp(new CO<IgniteFuture<Object>>() {
+             @Override public IgniteFuture<Object> apply() {
+                 updateFut.map();
+ 
+                 return updateFut;
+             }
+         });
+     }
+ 
+     /**
+      * Entry point for all public API remove methods.
+      *
+      * @param keys Keys to remove.
+      * @param drMap DR map.
+      * @param cached Cached cache entry for key. May be passed if and only if 
keys size is {@code 1}.
+      * @param retval Return value required flag.
+      * @param rawRetval Return {@code GridCacheReturn} instance.
+      * @param filter Cache entry filter for atomic removes.
+      * @return Completion future.
+      */
+     private IgniteFuture removeAllAsync0(
+         @Nullable final Collection<? extends K> keys,
+         @Nullable final Map<? extends K, GridCacheVersion> drMap,
+         @Nullable GridCacheEntryEx<K, V> cached,
+         final boolean retval,
+         boolean rawRetval,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+     ) {
++        final boolean statsEnabled = ctx.config().isStatisticsEnabled();
++
++        final long start = statsEnabled ? System.nanoTime() : 0L;
++
+         assert keys != null || drMap != null;
+ 
+         if (keyCheck)
+             validateCacheKeys(keys);
+ 
+         ctx.checkSecurity(GridSecurityPermission.CACHE_REMOVE);
+ 
+         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+ 
+         UUID subjId = ctx.subjectIdPerCall(null, prj);
+ 
+         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+ 
+         final GridNearAtomicUpdateFuture<K, V> updateFut = new 
GridNearAtomicUpdateFuture<>(
+             ctx,
+             this,
+             ctx.config().getWriteSynchronizationMode(),
+             DELETE,
+             keys != null ? keys : drMap.keySet(),
+             null,
+             null,
+             null,
+             keys != null ? null : drMap.values(),
+             retval,
+             rawRetval,
+             cached,
+             (filter != null && prj != null) ? prj.expiry() : null,
+             filter,
+             subjId,
+             taskNameHash);
+ 
++        if (statsEnabled)
++            updateFut.listenAsync(new 
UpdateRemoveTimeStatClosure<>(metrics0(), start));
++
+         return asyncOp(new CO<IgniteFuture<Object>>() {
+             @Override public IgniteFuture<Object> apply() {
+                 updateFut.map();
+ 
+                 return updateFut;
+             }
+         });
+     }
+ 
+     /**
+      * Entry point to all public API get methods.
+      *
+      * @param keys Keys to remove.
+      * @param reload Reload flag.
+      * @param forcePrimary Force primary flag.
+      * @param filter Filter.
+      * @param subjId Subject ID.
+      * @param taskName Task name.
+      * @param deserializePortable Deserialize portable flag.
+      * @param expiryPlc Expiry policy.
+      * @return Get future.
+      */
+     private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? 
extends K> keys,
+         boolean reload,
+         boolean forcePrimary,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter,
+         UUID subjId,
+         String taskName,
+         boolean deserializePortable,
+         @Nullable ExpiryPolicy expiryPlc) {
+         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+ 
+         if (F.isEmpty(keys))
+             return new GridFinishedFuture<>(ctx.kernalContext(), 
Collections.<K, V>emptyMap());
+ 
+         if (keyCheck)
+             validateCacheKeys(keys);
+ 
+         long topVer = ctx.affinity().affinityTopologyVersion();
+ 
+         final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
+ 
+         // Optimisation: try to resolve value locally and escape 'get future' 
creation.
+         if (!reload && !forcePrimary) {
+             Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f);
+ 
+             boolean success = true;
+ 
+             // Optimistically expect that all keys are available locally 
(avoid creation of get future).
+             for (K key : keys) {
+                 GridCacheEntryEx<K, V> entry = null;
+ 
+                 while (true) {
+                     try {
+                         entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : 
peekEx(key);
+ 
+                         // If our DHT cache do has value, then we peek it.
+                         if (entry != null) {
+                             boolean isNew = entry.isNewLocked();
+ 
+                             V v = entry.innerGet(null,
+                                 /*swap*/true,
+                                 /*read-through*/false,
+                                 /*fail-fast*/true,
+                                 /*unmarshal*/true,
 -                                /**update-metrics*/true,
++                                /**update-metrics*/false,
+                                 /*event*/true,
+                                 /*temporary*/false,
+                                 subjId,
+                                 null,
+                                 taskName,
+                                 filter,
+                                 expiry);
+ 
+                             // Entry was not in memory or in swap, so we 
remove it from cache.
+                             if (v == null) {
+                                 GridCacheVersion obsoleteVer = 
context().versions().next();
+ 
+                                 if (isNew && 
entry.markObsoleteIfEmpty(obsoleteVer))
+                                     removeIfObsolete(key);
+ 
+                                 success = false;
+                             }
+                             else {
+                                 if (ctx.portableEnabled() && 
deserializePortable) {
+                                     key = (K)ctx.unwrapPortableIfNeeded(key, 
false);
+                                     v = (V)ctx.unwrapPortableIfNeeded(v, 
false);
+                                 }
+ 
+                                 locVals.put(key, v);
+                             }
+                         }
+                         else
+                             success = false;
+ 
+                         break; // While.
+                     }
+                     catch (GridCacheEntryRemovedException ignored) {
+                         // No-op, retry.
+                     }
+                     catch (GridCacheFilterFailedException ignored) {
+                         // No-op, skip the key.
+                         break;
+                     }
+                     catch (GridDhtInvalidPartitionException ignored) {
+                         success = false;
+ 
+                         break; // While.
+                     }
+                     catch (IgniteCheckedException e) {
+                         return new GridFinishedFuture<>(ctx.kernalContext(), 
e);
+                     }
+                     finally {
+                         if (entry != null)
+                             ctx.evicts().touch(entry, topVer);
+                     }
+                 }
+ 
+                 if (!success)
+                     break;
++                else
++                    metrics0().onRead(true);
+             }
+ 
+             if (success) {
+                 sendTtlUpdateRequest(expiry);
+ 
+                 return ctx.wrapCloneMap(new 
GridFinishedFuture<>(ctx.kernalContext(), locVals));
+             }
+         }
+ 
+         if (expiry != null)
+             expiry.reset();
+ 
+         // Either reload or not all values are available locally.
+         GridPartitionedGetFuture<K, V> fut = new 
GridPartitionedGetFuture<>(ctx,
+             keys,
+             topVer,
+             true,
+             reload,
+             forcePrimary,
+             filter,
+             subjId,
+             taskName,
+             deserializePortable,
+             expiry);
+ 
+         fut.init();
+ 
+         return ctx.wrapCloneMap(fut);
+     }
+ 
+     /**
+      * Executes local update.
+      *
+      * @param nodeId Node ID.
+      * @param req Update request.
+      * @param cached Cached entry if updating single local entry.
+      * @param completionCb Completion callback.
+      */
+     public void updateAllAsyncInternal(
+         final UUID nodeId,
+         final GridNearAtomicUpdateRequest<K, V> req,
+         @Nullable final GridCacheEntryEx<K, V> cached,
+         final CI2<GridNearAtomicUpdateRequest<K, V>, 
GridNearAtomicUpdateResponse<K, V>> completionCb
+     ) {
+         IgniteFuture<Object> forceFut = preldr.request(req.keys(), 
req.topologyVersion());
+ 
+         if (forceFut.isDone())
+             updateAllAsyncInternal0(nodeId, req, completionCb);
+         else {
+             forceFut.listenAsync(new CI1<IgniteFuture<Object>>() {
+                 @Override public void apply(IgniteFuture<Object> t) {
+                     updateAllAsyncInternal0(nodeId, req, completionCb);
+                 }
+             });
+         }
+     }
+ 
+     /**
+      * Executes local update after preloader fetched values.
+      *
+      * @param nodeId Node ID.
+      * @param req Update request.
+      * @param completionCb Completion callback.
+      */
+     public void updateAllAsyncInternal0(
+         UUID nodeId,
+         GridNearAtomicUpdateRequest<K, V> req,
+         CI2<GridNearAtomicUpdateRequest<K, V>, 
GridNearAtomicUpdateResponse<K, V>> completionCb
+     ) {
+         GridNearAtomicUpdateResponse<K, V> res = new 
GridNearAtomicUpdateResponse<>(ctx.cacheId(), nodeId,
+             req.futureVersion());
+ 
+         List<K> keys = req.keys();
+ 
+         assert !req.returnValue() || (req.operation() == TRANSFORM || 
keys.size() == 1);
+ 
+         GridDhtAtomicUpdateFuture<K, V> dhtFut = null;
+ 
+         boolean remap = false;
+ 
+         String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+ 
+         IgniteCacheExpiryPolicy expiry = null;
+ 
+         try {
+             // If batch store update is enabled, we need to lock all entries.
+             // First, need to acquire locks on cache entries, then check 
filter.
+             List<GridDhtCacheEntry<K, V>> locked = lockEntries(keys, 
req.topologyVersion());
+             Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, 
GridCacheVersion>> deleted = null;
+ 
+             try {
+                 topology().readLock();
+ 
+                 try {
+                     // Do not check topology version for CLOCK versioning 
since
+                     // partition exchange will wait for near update future.
+                     if (topology().topologyVersion() == req.topologyVersion() 
||
+                         ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                         ClusterNode node = ctx.discovery().node(nodeId);
+ 
+                         if (node == null) {
+                             U.warn(log, "Node originated update request left 
grid: " + nodeId);
+ 
+                             return;
+                         }
+ 
+                         checkClearForceTransformBackups(req, locked);
+ 
+                         boolean hasNear = U.hasNearCache(node, name());
+ 
+                         GridCacheVersion ver = req.updateVersion();
+ 
+                         if (ver == null) {
+                             // Assign next version for update inside entries 
lock.
+                             ver = ctx.versions().next(req.topologyVersion());
+ 
+                             if (hasNear)
+                                 res.nearVersion(ver);
+                         }
+ 
+                         assert ver != null : "Got null version for update 
request: " + req;
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Using cache version for update request 
on primary node [ver=" + ver +
+                                 ", req=" + req + ']');
+ 
+                         dhtFut = createDhtFuture(ver, req, res, completionCb, 
false);
+ 
+                         GridCacheReturn<Object> retVal = null;
+ 
+                         boolean replicate = ctx.isDrEnabled();
+ 
+                         ExpiryPolicy plc = req.expiry() != null ? 
req.expiry() : ctx.expiry();
+ 
+                         if (plc != null)
+                             expiry = new UpdateExpiryPolicy(plc);
+ 
+                         if (writeThrough() && keys.size() > 1 && 
!ctx.dr().receiveEnabled()) {
+                             // This method can only be used when there are no 
replicated entries in the batch.
+                             UpdateBatchResult<K, V> updRes = 
updateWithBatch(node,
+                                 hasNear,
+                                 req,
+                                 res,
+                                 locked,
+                                 ver,
+                                 dhtFut,
+                                 completionCb,
+                                 replicate,
+                                 taskName,
+                                 expiry);
+ 
+                             deleted = updRes.deleted();
+                             dhtFut = updRes.dhtFuture();
+ 
+                             if (req.operation() == TRANSFORM)
+                                 retVal = new 
GridCacheReturn<>((Object)updRes.invokeResults(), true);
+                         }
+                         else {
+                             UpdateSingleResult<K, V> updRes = 
updateSingle(node,
+                                 hasNear,
+                                 req,
+                                 res,
+                                 locked,
+                                 ver,
+                                 dhtFut,
+                                 completionCb,
+                                 replicate,
+                                 taskName,
+                                 expiry);
+ 
+                             retVal = updRes.returnValue();
+                             deleted = updRes.deleted();
+                             dhtFut = updRes.dhtFuture();
+                         }
+ 
+                         if (retVal == null)
+                             retVal = new GridCacheReturn<>(null, true);
+ 
+                         res.returnValue(retVal);
+                     }
+                     else
+                         // Should remap all keys.
+                         remap = true;
+                 }
+                 finally {
+                     topology().readUnlock();
+                 }
+             }
+             catch (GridCacheEntryRemovedException e) {
+                 assert false : "Entry should not become obsolete while 
holding lock.";
+ 
+                 e.printStackTrace();
+             }
+             finally {
+                 unlockEntries(locked, req.topologyVersion());
+ 
+                 // Enqueue if necessary after locks release.
+                 if (deleted != null) {
+                     assert !deleted.isEmpty();
+                     assert ctx.deferredDelete();
+ 
+                     for (IgniteBiTuple<GridDhtCacheEntry<K, V>, 
GridCacheVersion> e : deleted)
+                         ctx.onDeferredDelete(e.get1(), e.get2());
+                 }
+             }
+         }
+         catch (GridDhtInvalidPartitionException ignore) {
+             assert ctx.config().getAtomicWriteOrderMode() == PRIMARY;
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Caught invalid partition exception for cache entry 
(will remap update request): " + req);
+ 
+             remap = true;
+         }
+ 
+         if (remap) {
+             assert dhtFut == null;
+ 
+             res.remapKeys(req.keys());
+ 
+             completionCb.apply(req, res);
+         }
+         else {
+             // If there are backups, map backup update future.
+             if (dhtFut != null)
+                 dhtFut.map();
+                 // Otherwise, complete the call.
+             else
+                 completionCb.apply(req, res);
+         }
+ 
+         sendTtlUpdateRequest(expiry);
+     }
+ 
+     /**
+      * Updates locked entries using batched write-through.
+      *
+      * @param node Sender node.
+      * @param hasNear {@code True} if originating node has near cache.
+      * @param req Update request.
+      * @param res Update response.
+      * @param locked Locked entries.
+      * @param ver Assigned version.
+      * @param dhtFut Optional DHT future.
+      * @param completionCb Completion callback to invoke when DHT future is 
completed.
+      * @param replicate Whether replication is enabled.
+      * @param taskName Task name.
+      * @param expiry Expiry policy.
+      * @return Deleted entries.
+      * @throws GridCacheEntryRemovedException Should not be thrown.
+      */
+     @SuppressWarnings("unchecked")
+     private UpdateBatchResult<K, V> updateWithBatch(
+         ClusterNode node,
+         boolean hasNear,
+         GridNearAtomicUpdateRequest<K, V> req,
+         GridNearAtomicUpdateResponse<K, V> res,
+         List<GridDhtCacheEntry<K, V>> locked,
+         GridCacheVersion ver,
+         @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
+         CI2<GridNearAtomicUpdateRequest<K, V>, 
GridNearAtomicUpdateResponse<K, V>> completionCb,
+         boolean replicate,
+         String taskName,
+         @Nullable IgniteCacheExpiryPolicy expiry
+     ) throws GridCacheEntryRemovedException {
+         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during 
DR due to possible conflicts.
+         assert !req.returnValue() || req.operation() == TRANSFORM; // Should 
not request return values for putAll.
+ 
+         if (!F.isEmpty(req.filter()) && ctx.loadPreviousValue()) {
+             try {
+                 reloadIfNeeded(locked);
+             }
+             catch (IgniteCheckedException e) {
+                 res.addFailedKeys(req.keys(), e);
+ 
+                 return new UpdateBatchResult<>();
+             }
+         }
+ 
+         int size = req.keys().size();
+ 
+         Map<K, V> putMap = null;
+ 
+         Map<K, EntryProcessor<K, V, ?>> entryProcessorMap = null;
+ 
+         Collection<K> rmvKeys = null;
+ 
+         UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>();
+ 
+         List<GridDhtCacheEntry<K, V>> filtered = new ArrayList<>(size);
+ 
+         GridCacheOperation op = req.operation();
+ 
+         Map<K, EntryProcessorResult> invokeResMap =
+             op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : 
null;
+ 
+         int firstEntryIdx = 0;
+ 
+         boolean intercept = ctx.config().getInterceptor() != null;
+ 
+         for (int i = 0; i < locked.size(); i++) {
+             GridDhtCacheEntry<K, V> entry = locked.get(i);
+ 
+             if (entry == null)
+                 continue;
+ 
+             try {
+                 if (!checkFilter(entry, req, res)) {
+                     if (expiry != null && entry.hasValue()) {
+                         long ttl = expiry.forAccess();
+ 
+                         if (ttl != -1L) {
+                             entry.updateTtl(null, ttl);
+ 
+                             expiry.ttlUpdated(entry.key(),
+                                 entry.getOrMarshalKeyBytes(),
+                                 entry.version(),
+                                 entry.readers());
+                         }
+                     }
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Entry did not pass the filter (will skip 
write) [entry=" + entry +
+                             ", filter=" + Arrays.toString(req.filter()) + ", 
res=" + res + ']');
+ 
+                     if (hasNear)
+                         res.addSkippedIndex(i);
+ 
+                     firstEntryIdx++;
+ 
+                     continue;
+                 }
+ 
+                 if (op == TRANSFORM) {
+                     EntryProcessor<K, V, ?> entryProcessor = 
req.entryProcessor(i);
+ 
+                     V old = entry.innerGet(
+                         null,
+                         /*read swap*/true,
+                         /*read through*/true,
+                         /*fail fast*/false,
+                         /*unmarshal*/true,
+                         /*metrics*/true,
+                         /*event*/true,
+                         /*temporary*/true,
+                         req.subjectId(),
+                         entryProcessor,
+                         taskName,
+                         CU.<K, V>empty(),
+                         null);
+ 
+                     CacheInvokeEntry<K, V> invokeEntry = new 
CacheInvokeEntry<>(entry.key(), old);
+ 
+                     V updated;
+                     CacheInvokeResult invokeRes = null;
+ 
+                     try {
+                         Object computed = entryProcessor.process(invokeEntry, 
req.invokeArguments());
+ 
+                         updated = ctx.unwrapTemporary(invokeEntry.getValue());
+ 
+                         if (computed != null)
+                             invokeRes = new 
CacheInvokeResult<>(ctx.unwrapTemporary(computed));
+                     }
+                     catch (Exception e) {
+                         invokeRes = new CacheInvokeResult<>(e);
+ 
+                         updated = old;
+                     }
+ 
+                     if (invokeRes != null)
+                         invokeResMap.put(entry.key(), invokeRes);
+ 
+                     if (updated == null) {
+                         if (intercept) {
+                             IgniteBiTuple<Boolean, ?> interceptorRes = 
ctx.config().getInterceptor().onBeforeRemove(
+                                 entry.key(), old);
+ 
+                             if (ctx.cancelRemove(interceptorRes))
+                                 continue;
+                         }
+ 
+                         // Update previous batch.
+                         if (putMap != null) {
+                             dhtFut = updatePartialBatch(
+                                 hasNear,
+                                 firstEntryIdx,
+                                 filtered,
+                                 ver,
+                                 node,
+                                 putMap,
+                                 null,
+                                 entryProcessorMap,
+                                 dhtFut,
+                                 completionCb,
+                                 req,
+                                 res,
+                                 replicate,
+                                 updRes,
+                                 taskName,
+                                 expiry);
+ 
+                             firstEntryIdx = i + 1;
+ 
+                             putMap = null;
+                             entryProcessorMap = null;
+ 
+                             filtered = new ArrayList<>();
+                         }
+ 
+                         // Start collecting new batch.
+                         if (rmvKeys == null)
+                             rmvKeys = new ArrayList<>(size);
+ 
+                         rmvKeys.add(entry.key());
+                     }
+                     else {
+                         if (intercept) {
+                             updated = 
(V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated);
+ 
+                             if (updated == null)
+                                 continue;
+                         }
+ 
+                         // Update previous batch.
+                         if (rmvKeys != null) {
+                             dhtFut = updatePartialBatch(
+                                 hasNear,
+                                 firstEntryIdx,
+                                 filtered,
+                                 ver,
+                                 node,
+                                 null,
+                                 rmvKeys,
+                                 entryProcessorMap,
+                                 dhtFut,
+                                 completionCb,
+                                 req,
+                                 res,
+                                 replicate,
+                                 updRes,
+                                 taskName,
+                                 expiry);
+ 
+                             firstEntryIdx = i + 1;
+ 
+                             rmvKeys = null;
+                             entryProcessorMap = null;
+ 
+                             filtered = new ArrayList<>();
+                         }
+ 
+                         if (putMap == null)
+                             putMap = new LinkedHashMap<>(size, 1.0f);
+ 
+                         putMap.put(entry.key(), 
ctx.<V>unwrapTemporary(updated));
+                     }
+ 
+                     if (entryProcessorMap == null)
+                         entryProcessorMap = new HashMap<>();
+ 
+                     entryProcessorMap.put(entry.key(), entryProcessor);
+                 }
+                 else if (op == UPDATE) {
+                     V updated = req.value(i);
+ 
+                     if (intercept) {
+                         V old = entry.innerGet(
+                              null,
+                             /*read swap*/true,
+                             /*read through*/ctx.loadPreviousValue(),
+                             /*fail fast*/false,
+                             /*unmarshal*/true,
+                             /*metrics*/true,
+                             /*event*/true,
+                             /*temporary*/true,
+                             req.subjectId(),
+                             null,
+                             taskName,
+                             CU.<K, V>empty(),
+                             null);
+ 
+                         updated = 
(V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated);
+ 
+                         if (updated == null)
+                             continue;
+ 
+                         updated = ctx.unwrapTemporary(updated);
+                     }
+ 
+                     assert updated != null;
+ 
+                     if (putMap == null)
+                         putMap = new LinkedHashMap<>(size, 1.0f);
+ 
+                     putMap.put(entry.key(), updated);
+                 }
+                 else {
+                     assert op == DELETE;
+ 
+                     if (intercept) {
+                         V old = entry.innerGet(
+                             null,
+                             /*read swap*/true,
+                             /*read through*/ctx.loadPreviousValue(),
+                             /*fail fast*/false,
+                             /*unmarshal*/true,
+                             /*metrics*/true,
+                             /*event*/true,
+                             /*temporary*/true,
+                             req.subjectId(),
+                             null,
+                             taskName,
+                             CU.<K, V>empty(),
+                             null);
+ 
+                         IgniteBiTuple<Boolean, ?> interceptorRes = 
ctx.config().getInterceptor().onBeforeRemove(
+                             entry.key(), old);
+ 
+                         if (ctx.cancelRemove(interceptorRes))
+                             continue;
+                     }
+ 
+                     if (rmvKeys == null)
+                         rmvKeys = new ArrayList<>(size);
+ 
+                     rmvKeys.add(entry.key());
+                 }
+ 
+                 filtered.add(entry);
+             }
+             catch (IgniteCheckedException e) {
+                 res.addFailedKey(entry.key(), e);
+             }
+             catch (GridCacheFilterFailedException ignore) {
+                 assert false : "Filter should never fail with failFast=false 
and empty filter.";
+             }
+         }
+ 
+         // Store final batch.
+         if (putMap != null || rmvKeys != null) {
+             dhtFut = updatePartialBatch(
+                 hasNear,
+                 firstEntryIdx,
+                 filtered,
+                 ver,
+                 node,
+                 putMap,
+                 rmvKeys,
+                 entryProcessorMap,
+                 dhtFut,
+                 completionCb,
+                 req,
+                 res,
+                 replicate,
+                 updRes,
+                 taskName,
+                 expiry);
+         }
+         else
+             assert filtered.isEmpty();
+ 
+         updRes.dhtFuture(dhtFut);
+ 
+         updRes.invokeResult(invokeResMap);
+ 
+         return updRes;
+     }
+ 
+     /**
+      * @param entries Entries.
+      * @throws IgniteCheckedException If failed.
+      */
+     private void reloadIfNeeded(final List<GridDhtCacheEntry<K, V>> entries) 
throws IgniteCheckedException {
+         Map<K, Integer> needReload = null;
+ 
+         for (int i = 0; i < entries.size(); i++) {
+             GridDhtCacheEntry<K, V> entry = entries.get(i);
+ 
+             if (entry == null)
+                 continue;
+ 
+             V val = entry.rawGetOrUnmarshal(false);
+ 
+             if (val == null) {
+                 if (needReload == null)
+                     needReload = new HashMap<>(entries.size(), 1.0f);
+ 
+                 needReload.put(entry.key(), i);
+             }
+         }
+ 
+         if (needReload != null) {
+             final Map<K, Integer> idxMap = needReload;
+ 
+             ctx.store().loadAllFromStore(null, needReload.keySet(), new 
CI2<K, V>() {
+                 @Override public void apply(K k, V v) {
+                     Integer idx = idxMap.get(k);
+ 
+                     if (idx != null) {
+                         GridDhtCacheEntry<K, V> entry = entries.get(idx);
+                         try {
+                             GridCacheVersion ver = entry.version();
+ 
+                             entry.versionedValue(v, null, ver);
+                         }
+                         catch (GridCacheEntryRemovedException e) {
+                             assert false : "Entry should not get obsolete 
while holding lock [entry=" + entry +
+                                 ", e=" + e + ']';
+                         }
+                         catch (IgniteCheckedException e) {
+                             throw new IgniteException(e);
+                         }
+                     }
+                 }
+             });
+         }
+     }
+ 
+     /**
+      * Updates locked entries one-by-one.
+      *
+      * @param node Originating node.
+      * @param hasNear {@code True} if originating node has near cache.
+      * @param req Update request.
+      * @param res Update response.
+      * @param locked Locked entries.
+      * @param ver Assigned update version.
+      * @param dhtFut Optional DHT future.
+      * @param completionCb Completion callback to invoke when DHT future is 
completed.
+      * @param replicate Whether DR is enabled for that cache.
+      * @param taskName Task name.
+      * @param expiry Expiry policy.
+      * @return Return value.
+      * @throws GridCacheEntryRemovedException Should be never thrown.
+      */
+     private UpdateSingleResult<K, V> updateSingle(
+         ClusterNode node,
+         boolean hasNear,
+         GridNearAtomicUpdateRequest<K, V> req,
+         GridNearAtomicUpdateResponse<K, V> res,
+         List<GridDhtCacheEntry<K, V>> locked,
+         GridCacheVersion ver,
+         @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
+         CI2<GridNearAtomicUpdateRequest<K, V>, 
GridNearAtomicUpdateResponse<K, V>> completionCb,
+         boolean replicate,
+         String taskName,
+         @Nullable IgniteCacheExpiryPolicy expiry
+     ) throws GridCacheEntryRemovedException {
+         GridCacheReturn<Object> retVal = null;
+         Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> 
deleted = null;
+ 
+         List<K> keys = req.keys();
+ 
+         long topVer = req.topologyVersion();
+ 
+         boolean checkReaders = hasNear || 
ctx.discovery().hasNearCache(name(), topVer);
+ 
+         boolean readersOnly = false;
+ 
+         boolean intercept = ctx.config().getInterceptor() != null;
+ 
+         Map<K, EntryProcessorResult<?>> computedMap = null;
+ 
+         // Avoid iterator creation.
+         for (int i = 0; i < keys.size(); i++) {
+             K k = keys.get(i);
+ 
+             GridCacheOperation op = req.operation();
+ 
+             // We are holding java-level locks on entries at this point.
+             // No GridCacheEntryRemovedException can be thrown.
+             try {
+                 GridDhtCacheEntry<K, V> entry = locked.get(i);
+ 
+                 if (entry == null)
+                     continue;
+ 
+                 GridCacheVersion newDrVer = req.drVersion(i);
+                 long newDrTtl = req.drTtl(i);
+                 long newDrExpireTime = req.drExpireTime(i);
+ 
+                 assert !(newDrVer instanceof GridCacheVersionEx) : newDrVer; 
// Plain version is expected here.
+ 
+                 if (newDrVer == null)
+                     newDrVer = ver;
+ 
+                 boolean primary = !req.fastMap() || 
ctx.affinity().primary(ctx.localNode(), entry.key(),
+                     req.topologyVersion());
+ 
+                 byte[] newValBytes = req.valueBytes(i);
+ 
+                 Object writeVal = req.writeValue(i);
+ 
+                 Collection<UUID> readers = null;
+                 Collection<UUID> filteredReaders = null;
+ 
+                 if (checkReaders) {
+                     readers = entry.readers();
+                     filteredReaders = F.view(entry.readers(), 
F.notEqualTo(node.id()));
+                 }
+ 
+                 GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
+                     ver,
+                     node.id(),
+                     locNodeId,
+                     op,
+                     writeVal,
+                     newValBytes,
+                     req.invokeArguments(),
+                     primary && writeThrough(),
+                     req.returnValue(),
+                     expiry,
+                     true,
+                     true,
+                     primary,
+                     ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check 
version in CLOCK mode on primary node.
+                     req.filter(),
+                     replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+                     newDrTtl,
+                     newDrExpireTime,
+                     newDrVer,
+                     true,
+                     intercept,
+                     req.subjectId(),
+                     taskName);
+ 
+                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+                     dhtFut = createDhtFuture(ver, req, res, completionCb, 
true);
+ 
+                     readersOnly = true;
+                 }
+ 
+                 if (dhtFut != null) {
+                     if (updRes.sendToDht()) { // Send to backups even in case 
of remove-remove scenarios.
+                         GridDrResolveResult<V> ctx = updRes.drResolveResult();
+ 
+                         long ttl = updRes.newTtl();
+                         long expireTime = updRes.drExpireTime();
+ 
+                         if (ctx == null)
+                             newDrVer = null;
+                         else if (ctx.isMerge()) {
+                             newDrVer = null; // DR version is discarded in 
case of merge.
+                             newValBytes = null; // Value has been changed.
+                         }
+ 
+                         EntryProcessor<K, V, ?> entryProcessor = null;
+ 
+                         if (req.forceTransformBackups() && op == TRANSFORM)
+                             entryProcessor = (EntryProcessor<K, V, 
?>)writeVal;
+ 
+                         if (!readersOnly) {
+                             dhtFut.addWriteEntry(entry,
+                                 updRes.newValue(),
+                                 newValBytes,
+                                 entryProcessor,
+                                 updRes.newTtl(),
+                                 expireTime,
+                                 newDrVer);
+                         }
+ 
+                         if (!F.isEmpty(filteredReaders))
+                             dhtFut.addNearWriteEntries(filteredReaders,
+                                 entry,
+                                 updRes.newValue(),
+                                 newValBytes,
+                                 entryProcessor,
+                                 ttl,
+                                 expireTime);
+                     }
+                     else {
+                         if (log.isDebugEnabled())
+                             log.debug("Entry did not pass the filter or 
conflict resolution (will skip write) " +
+                                 "[entry=" + entry + ", filter=" + 
Arrays.toString(req.filter()) + ']');
+                     }
+                 }
+ 
+                 if (hasNear) {
+                     if (primary && updRes.sendToDht()) {
+                         if (!ctx.affinity().belongs(node, entry.partition(), 
topVer)) {
+                             GridDrResolveResult<V> ctx = 
updRes.drResolveResult();
+ 
+                             long ttl = updRes.newTtl();
+                             long expireTime = updRes.drExpireTime();
+ 
+                             if (ctx != null && ctx.isMerge())
+                                 newValBytes = null;
+ 
+                             // If put the same value as in request then do 
not need to send it back.
+                             if (op == TRANSFORM || writeVal != 
updRes.newValue()) {
+                                 res.addNearValue(i,
+                                     updRes.newValue(),
+                                     newValBytes,
+                                     ttl,
+                                     expireTime);
+                             }
+                             else
+                                 res.addNearTtl(i, ttl, expireTime);
+ 
+                             if (updRes.newValue() != null || newValBytes != 
null) {
+                                 IgniteFuture<Boolean> f = 
entry.addReader(node.id(), req.messageId(), topVer);
+ 
+                                 assert f == null : f;
+                             }
+                         }
+                         else if (F.contains(readers, node.id())) // Reader 
became primary or backup.
+                             entry.removeReader(node.id(), req.messageId());
+                         else
+                             res.addSkippedIndex(i);
+                     }
+                     else
+                         res.addSkippedIndex(i);
+                 }
+ 
+                 if (updRes.removeVersion() != null) {
+                     if (deleted == null)
+                         deleted = new ArrayList<>(keys.size());
+ 
+                     deleted.add(F.t(entry, updRes.removeVersion()));
+                 }
+ 
+                 if (op == TRANSFORM) {
+                     assert req.returnValue();
+ 
+                     if (updRes.computedResult() != null) {
+                         if (retVal == null) {
+                             computedMap = U.newHashMap(keys.size());
+ 
+                             retVal = new 
GridCacheReturn<>((Object)computedMap, updRes.success());
+                         }
+ 
+                         computedMap.put(k, updRes.computedResult());
+                     }
+                 }
+                 else {
+                     // Create only once.
+                     if (retVal == null) {
+                         Object ret = updRes.oldValue();
+ 
+                         retVal = new GridCacheReturn<>(req.returnValue() ? 
ret : null, updRes.success());
+                     }
+                 }
+             }
+             catch (IgniteCheckedException e) {
+                 res.addFailedKey(k, e);
+             }
+         }
+ 
+         return new UpdateSingleResult<>(retVal, deleted, dhtFut);
+     }
+ 
+     /**
+      * @param hasNear {@code True} if originating node has near cache.
+      * @param firstEntryIdx Index of the first entry in the request keys 
collection.
+      * @param entries Entries to update.
+      * @param ver Version to set.
+      * @param node Originating node.
+      * @param putMap Values to put.
+      * @param rmvKeys Keys to remove.
+      * @param entryProcessorMap Entry processors.
+      * @param dhtFut DHT update future if has backups.
+      * @param completionCb Completion callback to invoke when DHT future is 
completed.
+      * @param req Request.
+      * @param res Response.
+      * @param replicate Whether replication is enabled.
+      * @param batchRes Batch update result.
+      * @param taskName Task name.
+      * @param expiry Expiry policy.
+      * @return Deleted entries.
+      */
+     @SuppressWarnings("ForLoopReplaceableByForEach")
+     @Nullable private GridDhtAtomicUpdateFuture<K, V> updatePartialBatch(
+         boolean hasNear,
+         int firstEntryIdx,
+         List<GridDhtCacheEntry<K, V>> entries,
+         final GridCacheVersion ver,
+         ClusterNode node,
+         @Nullable Map<K, V> putMap,
+         @Nullable Collection<K> rmvKeys,
+         @Nullable Map<K, EntryProcessor<K, V, ?>> entryProcessorMap,
+         @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
+         CI2<GridNearAtomicUpdateRequest<K, V>, 
GridNearAtomicUpdateResponse<K, V>> completionCb,
+         final GridNearAtomicUpdateRequest<K, V> req,
+         final GridNearAtomicUpdateResponse<K, V> res,
+         boolean replicate,
+         UpdateBatchResult<K, V> batchRes,
+         String taskName,
+         @Nullable IgniteCacheExpiryPolicy expiry
+     ) {
+         assert putMap == null ^ rmvKeys == null;
+ 
+         assert req.drVersions() == null : "updatePartialBatch cannot be 
called when there are DR entries in the batch.";
+ 
+         long topVer = req.topologyVersion();
+ 
+         boolean checkReaders = hasNear || 
ctx.discovery().hasNearCache(name(), topVer);
+ 
+         CacheStorePartialUpdateException storeErr = null;
+ 
+         try {
+             GridCacheOperation op;
+ 
+             if (putMap != null) {
+                 // If fast mapping, filter primary keys for write to store.
+                 Map<K, V> storeMap = req.fastMap() ?
+                     F.view(putMap, new P1<K>() {
+                         @Override public boolean apply(K key) {
+                             return ctx.affinity().primary(ctx.localNode(), 
key, req.topologyVersion());
+                         }
+                     }) :
+                     putMap;
+ 
+                 try {
+                     ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, 
new C1<V, IgniteBiTuple<V, GridCacheVersion>>() {
+                         @Override public IgniteBiTuple<V, GridCacheVersion> 
apply(V v) {
+                             return F.t(v, ver);
+                         }
+                     }));
+                 }
+                 catch (CacheStorePartialUpdateException e) {
+                     storeErr = e;
+                 }
+ 
+                 op = UPDATE;
+             }
+             else {
+                 // If fast mapping, filter primary keys for write to store.
+                 Collection<K> storeKeys = req.fastMap() ?
+                     F.view(rmvKeys, new P1<K>() {
+                         @Override public boolean apply(K key) {
+                             return ctx.affinity().primary(ctx.localNode(), 
key, req.topologyVersion());
+                         }
+                     }) :
+                     rmvKeys;
+ 
+                 try {
+                     ctx.store().removeAllFromStore(null, storeKeys);
+                 }
+                 catch (CacheStorePartialUpdateException e) {
+                     storeErr = e;
+                 }
+ 
+                 op = DELETE;
+             }
+ 
+             boolean intercept = ctx.config().getInterceptor() != null;
+ 
+             // Avoid iterator creation.
+             for (int i = 0; i < entries.size(); i++) {
+                 GridDhtCacheEntry<K, V> entry = entries.get(i);
+ 
+                 assert Thread.holdsLock(entry);
+ 
+                 if (entry.obsolete()) {
+                     assert req.operation() == DELETE : "Entry can become 
obsolete only after remove: " + entry;
+ 
+                     continue;
+                 }
+ 
+                 if (storeErr != null && 
storeErr.failedKeys().contains(entry.key()))
+                     continue;
+ 
+                 try {
+                     // We are holding java-level locks on entries at this 
point.
+                     V writeVal = op == UPDATE ? putMap.get(entry.key()) : 
null;
+ 
+                     assert writeVal != null || op == DELETE : "null write 
value found.";
+ 
+                     boolean primary = !req.fastMap() || 
ctx.affinity().primary(ctx.localNode(), entry.key(),
+                         req.topologyVersion());
+ 
+                     Collection<UUID> readers = null;
+                     Collection<UUID> filteredReaders = null;
+ 
+                     if (checkReaders) {
+                         readers = entry.readers();
+                         filteredReaders = F.view(entry.readers(), 
F.notEqualTo(node.id()));
+                     }
+ 
+                     GridCacheUpdateAtomicResult<K, V> updRes = 
entry.innerUpdate(
+                         ver,
+                         node.id(),
+                         locNodeId,
+                         op,
+                         writeVal,
+                         null,
+                         null,
+                         false,
+                         false,
+                         expiry,
+                         true,
+                         true,
+                         primary,
+                         ctx.config().getAtomicWriteOrderMode() == CLOCK, // 
Check version in CLOCK mode on primary node.
+                         null,
+                         replicate ? primary ? DR_PRIMARY : DR_BACKUP : 
DR_NONE,
+                         -1L,
+                         -1L,
+                         null,
+                         false,
+                         false,
+                         req.subjectId(),
+                         taskName);
+ 
+                     assert updRes.newTtl() == -1L || expiry != null;
+ 
+                     if (intercept) {
+                         if (op == UPDATE)
+                             
ctx.config().getInterceptor().onAfterPut(entry.key(), updRes.newValue());
+                         else {
+                             assert op == DELETE : op;
+ 
+                             // Old value should be already loaded for 
'CacheInterceptor.onBeforeRemove'.
+                             ctx.config().<K, 
V>getInterceptor().onAfterRemove(entry.key(), updRes.oldValue());
+                         }
+                     }
+ 
+                     batchRes.addDeleted(entry, updRes, entries);
+ 
+                     if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+                         dhtFut = createDhtFuture(ver, req, res, completionCb, 
true);
+ 
+                         batchRes.readersOnly(true);
+                     }
+ 
+                     if (dhtFut != null) {
+                         GridCacheValueBytes valBytesTuple = op == DELETE ? 
GridCacheValueBytes.nil():
+                             entry.valueBytes();
+ 
+                         byte[] valBytes = valBytesTuple.getIfMarshaled();
+ 
+                         EntryProcessor<K, V, ?> entryProcessor =
+                             entryProcessorMap == null ? null : 
entryProcessorMap.get(entry.key());
+ 
+                         if (!batchRes.readersOnly())
+                             dhtFut.addWriteEntry(entry,
+                                 writeVal,
+                                 valBytes,
+                                 entryProcessor,
+                                 updRes.newTtl(),
+                                 -1,
+                                 null);
+ 
+                         if (!F.isEmpty(filteredReaders))
+                             dhtFut.addNearWriteEntries(filteredReaders,
+                                 entry,
+                                 writeVal,
+                                 valBytes,
+                                 entryProcessor,
+                                 updRes.newTtl(),
+                                 -1);
+                     }
+ 
+                     if (hasNear) {
+                         if (primary) {
+                             if (!ctx.affinity().belongs(node, 
entry.partition(), topVer)) {
+                                 int idx = firstEntryIdx + i;
+ 
+                                 if (req.operation() == TRANSFORM) {
+                                     GridCacheValueBytes valBytesTuple = 
entry.valueBytes();
+ 
+                                     byte[] valBytes = 
valBytesTuple.getIfMarshaled();
+ 
+                                     res.addNearValue(idx,
+                                         writeVal,
+                                         valBytes,
+                                         updRes.newTtl(),
+                                         -1);
+                                 }
+                                 else
+                                     res.addNearTtl(idx, updRes.newTtl(), -1);
+ 
+                                 if (writeVal != null || 
!entry.valueBytes().isNull()) {
+                                     IgniteFuture<Boolean> f = 
entry.addReader(node.id(), req.messageId(), topVer);
+ 
+                                     assert f == null : f;
+                                 }
+                             } else if (readers.contains(node.id())) // Reader 
became primary or backup.
+                                 entry.removeReader(node.id(), 
req.messageId());
+                             else
+                                 res.addSkippedIndex(firstEntryIdx + i);
+                         }
+                         else
+                             res.addSkippedIndex(firstEntryIdx + i);
+                     }
+                 }
+                 catch (GridCacheEntryRemovedException e) {
+                     assert false : "Entry cannot become obsolete while 
holding lock.";
+ 
+                     e.printStackTrace();
+                 }
+             }
+         }
+         catch (IgniteCheckedException e) {
+             res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);
+         }
+ 
+         if (storeErr != null)
+             res.addFailedKeys((Collection<K>)storeErr.failedKeys(), 
storeErr.getCause());
+ 
+         return dhtFut;
+     }
+ 
+     /**
+      * Acquires java-level locks on cache entries. Returns collection of 
locked entries.
+      *
+      * @param keys Keys to lock.
+      * @param topVer Topology version to lock on.
+      * @return Collection of locked entries.
+      * @throws GridDhtInvalidPartitionException If entry does not belong to 
local node. If exception is thrown,
+      *      locks are released.
+      */
+     @SuppressWarnings("ForLoopReplaceableByForEach")
+     private List<GridDhtCacheEntry<K, V>> lockEntries(List<K> keys, long 
topVer)
+         throws GridDhtInvalidPartitionException {
+         if (keys.size() == 1) {
+             K key = keys.get(0);
+ 
+             while (true) {
+                 try {
+                     GridDhtCacheEntry<K, V> entry = entryExx(key, topVer);
+ 
+                     UNSAFE.monitorEnter(entry);
+ 
+                     if (entry.obsolete())
+                         UNSAFE.monitorExit(entry);
+                     else
+                         return Collections.singletonList(entry);
+                 }
+                 catch (GridDhtInvalidPartitionException e) {
+                     // Ignore invalid partition exception in CLOCK ordering 
mode.
+                     if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                         return Collections.singletonList(null);
+                     else
+                         throw e;
+                 }
+             }
+         }
+         else {
+             List<GridDhtCacheEntry<K, V>> locked = new 
ArrayList<>(keys.size());
+ 
+             while (true) {
+                 for (K key : keys) {
+                     try {
+                         GridDhtCacheEntry<K, V> entry = entryExx(key, topVer);
+ 
+                         locked.add(entry);
+                     }
+                     catch (GridDhtInvalidPartitionException e) {
+                         // Ignore invalid partition exception in CLOCK 
ordering mode.
+                         if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                             locked.add(null);
+                         else
+                             throw e;
+                     }
+                 }
+ 
+                 boolean retry = false;
+ 
+                 for (int i = 0; i < locked.size(); i++) {
+                     GridCacheMapEntry<K, V> entry = locked.get(i);
+ 
+                     if (entry == null)
+                         continue;
+ 
+                     UNSAFE.monitorEnter(entry);
+ 
+                     if (entry.obsolete()) {
+                         // Unlock all locked.
+                         for (int j = 0; j <= i; j++) {
+                             if (locked.get(j) != null)
+                                 UNSAFE.monitorExit(locked.get(j));
+                         }
+ 
+                         // Clear entries.
+                         locked.clear();
+ 
+                         // Retry.
+                         retry = true;
+ 
+                         break;
+                     }
+                 }
+ 
+                 if (!retry)
+                     return locked;
+             }
+         }
+     }
+ 
+     /**
+      * Releases java-level locks on cache entries.
+      *
+      * @param locked Locked entries.
+      * @param topVer Topology version.
+      */
+     private void unlockEntries(Collection<GridDhtCacheEntry<K, V>> locked, 
long topVer) {
+         // Process deleted entries before locks release.
+         assert ctx.deferredDelete();
+ 
+         // Entries to skip eviction manager notification for.
+         // Enqueue entries while holding locks.
+         Collection<K> skip = null;
+ 
+         for (GridCacheMapEntry<K, V> entry : locked) {
+             if (entry != null && entry.deleted()) {
+                 if (skip == null)
+                     skip = new HashSet<>(locked.size(), 1.0f);
+ 
+                 skip.add(entry.key());
+             }
+         }
+ 
+         // Release locks.
+         for (GridCacheMapEntry<K, V> entry : locked) {
+             if (entry != null)
+                 UNSAFE.monitorExit(entry);
+         }
+ 
+         // Try evict partitions.
+         for (GridDhtCacheEntry<K, V> entry : locked) {
+             if (entry != null)
+                 entry.onUnlock();
+         }
+ 
+         if (skip != null && skip.size() == locked.size())
+             // Optimization.
+             return;
+ 
+         // Must touch all entries since update may have deleted entries.
+         // Eviction manager will remove empty entries.
+         for (GridCacheMapEntry<K, V> entry : locked) {
+             if (entry != null && (skip == null || 
!skip.contains(entry.key())))
+                 ctx.evicts().touch(entry, topVer);
+         }
+     }
+ 
+     /**
+      * Checks if future timeout happened.
+      */
+     private void scheduleAtomicFutureRecheck() {
+         final long timeout = ctx.kernalContext().config().getNetworkTimeout();
+ 
+         ctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(timeout * 2) 
{
+             @Override public void onTimeout() {
+                 boolean leave = false;
+ 
+                 try {
+                     ctx.gate().enter();
+ 
+                     leave = true;
+ 
+                     for (GridCacheAtomicFuture fut : 
ctx.mvcc().atomicFutures())
+                         fut.checkTimeout(timeout);
+                 }
+                 catch (IllegalStateException ignored) {
+                     if (log.isDebugEnabled())
+                         log.debug("Will not check pending atomic update 
futures for timeout (Grid is stopping).");
+                 }
+                 finally {
+                     if (leave)
+                         ctx.gate().leave();
+                 }
+             }
+         });
+     }
+ 
+     /**
+      * @param entry Entry to check.
+      * @param req Update request.
+      * @param res Update response. If filter evaluation failed, key will be 
added to failed keys and method
+      *      will return false.
+      * @return {@code True} if filter evaluation succeeded.
+      */
+     private boolean checkFilter(GridCacheEntryEx<K, V> entry, 
GridNearAtomicUpdateRequest<K, V> req,
+         GridNearAtomicUpdateResponse<K, V> res) {
+         try {
+             return ctx.isAll(entry.wrapFilterLocked(), req.filter());
+         }
+         catch (IgniteCheckedException e) {
+             res.addFailedKey(entry.key(), e);
+ 
+             return false;
+         }
+     }
+ 
+     /**
+      * @param req Request to remap.
+      */
+     private void remapToNewPrimary(GridNearAtomicUpdateRequest<K, V> req) {
+         if (log.isDebugEnabled())
+             log.debug("Remapping near update request locally: " + req);
+ 
+         Collection<?> vals;
+         Collection<GridCacheDrInfo<V>> drPutVals;
+         Collection<GridCacheVersion> drRmvVals;
+ 
+         if (req.drVersions() == null) {
+             vals = req.values();
+ 
+             drPutVals = null;
+             drRmvVals = null;
+         }
+         else if (req.operation() == UPDATE) {
+             int size = req.keys().size();
+ 
+             drPutVals = new ArrayList<>(size);
+ 
+             for (int i = 0; i < size; i++) {
+                 long ttl = req.drTtl(i);
+ 
+                 if (ttl == -1L)
+                     drPutVals.add(new GridCacheDrInfo<>(req.value(i), 
req.drVersion(i)));
+                 else
+                     drPutVals.add(new 
GridCacheDrExpirationInfo<>(req.value(i), req.drVersion(i), ttl,
+                         req.drExpireTime(i)));
+             }
+ 
+             vals = null;
+             drRmvVals = null;
+         }
+         else {
+             assert req.operation() == DELETE;
+ 
+             drRmvVals = req.drVersions();
+ 
+             vals = null;
+             drPutVals = null;
+         }
+ 
+         final GridNearAtomicUpdateFuture<K, V> updateFut = new 
GridNearAtomicUpdateFuture<>(
+             ctx,
+             this,
+             ctx.config().getWriteSynchronizationMode(),
+             req.operation(),
+             req.keys(),
+             vals,
+             req.invokeArguments(),
+             drPutVals,
+             drRmvVals,
+             req.returnValue(),
+             false,
+             null,
+             req.expiry(),
+             req.filter(),
+             req.subjectId(),
+             req.taskNameHash());
+ 
+         updateFut.

<TRUNCATED>

Reply via email to