http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
new file mode 100644
index 0000000..2b00d24
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -0,0 +1,1492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxState.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
+
+/**
+ * Base class for transactional DHT caches.
+ */
+public abstract class GridDhtTransactionalCacheAdapter<K, V> extends 
GridDhtCacheAdapter<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    protected GridDhtTransactionalCacheAdapter() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     */
+    protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx) {
+        super(ctx);
+    }
+
+    /**
+     * Constructor used for near-only cache.
+     *
+     * @param ctx Cache context.
+     * @param map Cache map.
+     */
+    protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx, 
GridCacheConcurrentMap<K, V> map) {
+        super(ctx, map);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        super.start();
+
+        preldr = new GridDhtPreloader<>(ctx);
+
+        preldr.start();
+
+        ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new 
CI2<UUID, GridNearGetRequest<K, V>>() {
+            @Override public void apply(UUID nodeId, GridNearGetRequest<K, V> 
req) {
+                processNearGetRequest(nodeId, req);
+            }
+        });
+
+        ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new 
CI2<UUID, GridNearLockRequest<K, V>>() {
+            @Override public void apply(UUID nodeId, GridNearLockRequest<K, V> 
req) {
+                processNearLockRequest(nodeId, req);
+            }
+        });
+
+        ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new 
CI2<UUID, GridDhtLockRequest<K, V>>() {
+            @Override public void apply(UUID nodeId, GridDhtLockRequest<K, V> 
req) {
+                processDhtLockRequest(nodeId, req);
+            }
+        });
+
+        ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new 
CI2<UUID, GridDhtLockResponse<K, V>>() {
+            @Override public void apply(UUID nodeId, GridDhtLockResponse<K, V> 
req) {
+                processDhtLockResponse(nodeId, req);
+            }
+        });
+
+        ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new 
CI2<UUID, GridNearUnlockRequest<K, V>>() {
+            @Override public void apply(UUID nodeId, GridNearUnlockRequest<K, 
V> req) {
+                processNearUnlockRequest(nodeId, req);
+            }
+        });
+
+        ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new 
CI2<UUID, GridDhtUnlockRequest<K, V>>() {
+            @Override public void apply(UUID nodeId, GridDhtUnlockRequest<K, 
V> req) {
+                processDhtUnlockRequest(nodeId, req);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public abstract GridNearTransactionalCache<K, V> near();
+
+    /**
+     * @param nodeId Primary node ID.
+     * @param req Request.
+     * @param res Response.
+     * @return Remote transaction.
+     * @throws IgniteCheckedException If failed.
+     * @throws GridDistributedLockCancelledException If lock has been 
cancelled.
+     */
+    @SuppressWarnings({"RedundantTypeArguments"})
+    @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId,
+        GridDhtLockRequest<K, V> req,
+        GridDhtLockResponse<K, V> res)
+        throws IgniteCheckedException, GridDistributedLockCancelledException {
+        List<K> keys = req.keys();
+        List<IgniteTxEntry<K, V>> writes = req.writeEntries();
+
+        GridDhtTxRemote<K, V> tx = null;
+
+        int size = F.size(keys);
+
+        for (int i = 0; i < size; i++) {
+            K key = keys.get(i);
+
+            if (key == null)
+                continue;
+
+            IgniteTxKey<K> txKey = ctx.txKey(key);
+
+            IgniteTxEntry<K, V> writeEntry = writes == null ? null : 
writes.get(i);
+
+            assert F.isEmpty(req.candidatesByIndex(i));
+
+            GridCacheVersion drVer = req.drVersionByIndex(i);
+
+            if (log.isDebugEnabled())
+                log.debug("Unmarshalled key: " + key);
+
+            GridDistributedCacheEntry<K, V> entry = null;
+
+            while (true) {
+                try {
+                    int part = ctx.affinity().partition(key);
+
+                    GridDhtLocalPartition<K, V> locPart = 
ctx.topology().localPartition(part, req.topologyVersion(),
+                        false);
+
+                    if (locPart == null || !locPart.reserve()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Local partition for given key is 
already evicted (will add to invalid " +
+                                "partition list) [key=" + key + ", part=" + 
part + ", locPart=" + locPart + ']');
+
+                        res.addInvalidPartition(part);
+
+                        // Invalidate key in near cache, if any.
+                        if (isNearEnabled(cacheCfg))
+                            obsoleteNearEntry(key, req.version());
+
+                        break;
+                    }
+
+                    try {
+                        // Handle implicit locks for pessimistic transactions.
+                        if (req.inTx()) {
+                            if (tx == null)
+                                tx = ctx.tm().tx(req.version());
+
+                            if (tx == null) {
+                                tx = new GridDhtTxRemote<>(
+                                    ctx.shared(),
+                                    req.nodeId(),
+                                    req.futureId(),
+                                    nodeId,
+                                    req.nearXidVersion(),
+                                    req.threadId(),
+                                    req.topologyVersion(),
+                                    req.version(),
+                                    /*commitVer*/null,
+                                    ctx.system(),
+                                    PESSIMISTIC,
+                                    req.isolation(),
+                                    req.isInvalidate(),
+                                    req.timeout(),
+                                    req.txSize(),
+                                    req.groupLockKey(),
+                                    req.subjectId(),
+                                    req.taskNameHash());
+
+                                tx = ctx.tm().onCreated(tx);
+
+                                if (tx == null || !ctx.tm().onStarted(tx))
+                                    throw new 
IgniteTxRollbackException("Failed to acquire lock (transaction " +
+                                        "has been completed) [ver=" + 
req.version() + ", tx=" + tx + ']');
+                            }
+
+                            tx.addWrite(
+                                ctx,
+                                writeEntry == null ? NOOP : writeEntry.op(),
+                                txKey,
+                                req.keyBytes() != null ? req.keyBytes().get(i) 
: null,
+                                writeEntry == null ? null : writeEntry.value(),
+                                writeEntry == null ? null : 
writeEntry.valueBytes(),
+                                writeEntry == null ? null : 
writeEntry.entryProcessors(),
+                                drVer,
+                                req.accessTtl());
+
+                            if (req.groupLock())
+                                tx.groupLockKey(txKey);
+                        }
+
+                        entry = entryExx(key, req.topologyVersion());
+
+                        // Add remote candidate before reordering.
+                        entry.addRemote(
+                            req.nodeId(),
+                            nodeId,
+                            req.threadId(),
+                            req.version(),
+                            req.timeout(),
+                            tx != null,
+                            tx != null && tx.implicitSingle(),
+                            null
+                        );
+
+                        // Invalidate key in near cache, if any.
+                        if (isNearEnabled(cacheCfg) && 
req.invalidateNearEntry(i))
+                            invalidateNearEntry(key, req.version());
+
+                        // Get entry info after candidate is added.
+                        if (req.needPreloadKey(i)) {
+                            entry.unswap();
+
+                            GridCacheEntryInfo<K, V> info = entry.info();
+
+                            if (info != null && !info.isNew() && 
!info.isDeleted())
+                                res.addPreloadEntry(info);
+                        }
+
+                        // Double-check in case if sender node left the grid.
+                        if (ctx.discovery().node(req.nodeId()) == null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Node requesting lock left grid 
(lock request will be ignored): " + req);
+
+                            entry.removeLock(req.version());
+
+                            if (tx != null) {
+                                tx.clearEntry(txKey);
+
+                                // If there is a concurrent salvage, there 
could be a case when tx is moved to
+                                // COMMITTING state, but this lock is never 
acquired.
+                                if (tx.state() == COMMITTING)
+                                    tx.forceCommit();
+                                else
+                                    tx.rollback();
+                            }
+
+                            return null;
+                        }
+
+                        // Entry is legit.
+                        break;
+                    }
+                    finally {
+                        locPart.release();
+                    }
+                }
+                catch (GridDhtInvalidPartitionException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received invalid partition exception [e=" + 
e + ", req=" + req + ']');
+
+                    res.addInvalidPartition(e.partition());
+
+                    // Invalidate key in near cache, if any.
+                    if (isNearEnabled(cacheCfg))
+                        obsoleteNearEntry(key, req.version());
+
+                    if (tx != null) {
+                        tx.clearEntry(txKey);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Cleared invalid entry from remote 
transaction (will skip) [entry=" +
+                                entry + ", tx=" + tx + ']');
+                    }
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    assert entry.obsoleteVersion() != null : "Obsolete flag 
not set on removed entry: " +
+                        entry;
+
+                    if (log.isDebugEnabled())
+                        log.debug("Received entry removed exception (will 
retry on renewed entry): " + entry);
+
+                    if (tx != null) {
+                        tx.clearEntry(txKey);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Cleared removed entry from remote 
transaction (will retry) [entry=" +
+                                entry + ", tx=" + tx + ']');
+                    }
+                }
+            }
+        }
+
+        if (tx != null && tx.empty()) {
+            if (log.isDebugEnabled())
+                log.debug("Rolling back remote DHT transaction because it is 
empty [req=" + req + ", res=" + res + ']');
+
+            tx.rollback();
+
+            tx = null;
+        }
+
+        return tx;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    protected final void processDhtLockRequest(final UUID nodeId, final 
GridDhtLockRequest<K, V> req) {
+        IgniteFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
+            ctx.dht().dhtPreloader().request(req.keys(), 
req.topologyVersion());
+
+        if (keyFut == null || keyFut.isDone())
+            processDhtLockRequest0(nodeId, req);
+        else {
+            keyFut.listenAsync(new CI1<IgniteFuture<Object>>() {
+                @Override public void apply(IgniteFuture<Object> t) {
+                    processDhtLockRequest0(nodeId, req);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    protected final void processDhtLockRequest0(UUID nodeId, 
GridDhtLockRequest<K, V> req) {
+        assert nodeId != null;
+        assert req != null;
+        assert !nodeId.equals(locNodeId);
+
+        if (log.isDebugEnabled())
+            log.debug("Processing dht lock request [locNodeId=" + locNodeId + 
", nodeId=" + nodeId + ", req=" + req +
+                ']');
+
+        int cnt = F.size(req.keys());
+
+        GridDhtLockResponse<K, V> res;
+
+        GridDhtTxRemote<K, V> dhtTx = null;
+        GridNearTxRemote<K, V> nearTx = null;
+
+        boolean fail = false;
+        boolean cancelled = false;
+
+        try {
+            res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), 
req.futureId(), req.miniId(), cnt);
+
+            dhtTx = startRemoteTx(nodeId, req, res);
+            nearTx = isNearEnabled(cacheCfg) ? near().startRemoteTx(nodeId, 
req) : null;
+
+            if (nearTx != null && !nearTx.empty())
+                res.nearEvicted(nearTx.evicted());
+            else {
+                if (!F.isEmpty(req.nearKeys())) {
+                    Collection<IgniteTxKey<K>> nearEvicted = new 
ArrayList<>(req.nearKeys().size());
+
+                    nearEvicted.addAll(F.viewReadOnly(req.nearKeys(), new 
C1<K, IgniteTxKey<K>>() {
+                        @Override public IgniteTxKey<K> apply(K k) {
+                            return ctx.txKey(k);
+                        }
+                    }));
+
+                    res.nearEvicted(nearEvicted);
+                }
+            }
+        }
+        catch (IgniteTxRollbackException e) {
+            String err = "Failed processing DHT lock request (transaction has 
been completed): " + req;
+
+            U.error(log, err, e);
+
+            res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), 
req.futureId(), req.miniId(),
+                new IgniteTxRollbackException(err, e));
+
+            fail = true;
+        }
+        catch (IgniteCheckedException e) {
+            String err = "Failed processing DHT lock request: " + req;
+
+            U.error(log, err, e);
+
+            res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), 
req.futureId(), req.miniId(), new IgniteCheckedException(err, e));
+
+            fail = true;
+        }
+        catch (GridDistributedLockCancelledException ignored) {
+            // Received lock request for cancelled lock.
+            if (log.isDebugEnabled())
+                log.debug("Received lock request for canceled lock (will 
ignore): " + req);
+
+            res = null;
+
+            fail = true;
+            cancelled = true;
+        }
+
+        boolean releaseAll = false;
+
+        if (res != null) {
+            try {
+                // Reply back to sender.
+                ctx.io().send(nodeId, res, ctx.system() ? UTILITY_CACHE_POOL : 
SYSTEM_POOL);
+            }
+            catch (ClusterTopologyException ignored) {
+                U.warn(log, "Failed to send lock reply to remote node because 
it left grid: " + nodeId);
+
+                fail = true;
+                releaseAll = true;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send lock reply to node (lock will not 
be acquired): " + nodeId, e);
+
+                fail = true;
+            }
+        }
+
+        if (fail) {
+            if (dhtTx != null)
+                dhtTx.rollback();
+
+            if (nearTx != null) // Even though this should never happen, we 
leave this check for consistency.
+                nearTx.rollback();
+
+            List<K> keys = req.keys();
+
+            if (keys != null) {
+                for (K key : keys) {
+                    while (true) {
+                        GridDistributedCacheEntry<K, V> entry = peekExx(key);
+
+                        try {
+                            if (entry != null) {
+                                // Release all locks because sender node left 
grid.
+                                if (releaseAll)
+                                    
entry.removeExplicitNodeLocks(req.nodeId());
+                                else
+                                    entry.removeLock(req.version());
+                            }
+
+                            break;
+                        }
+                        catch (GridCacheEntryRemovedException ignore) {
+                            if (log.isDebugEnabled())
+                                log.debug("Attempted to remove lock on removed 
entity during during failure " +
+                                    "handling for dht lock request (will 
retry): " + entry);
+                        }
+                    }
+                }
+            }
+
+            if (releaseAll && !cancelled)
+                U.warn(log, "Sender node left grid in the midst of lock 
acquisition (locks have been released).");
+        }
+    }
+
+    /** {@inheritDoc} */
+    protected void processDhtUnlockRequest(UUID nodeId, 
GridDhtUnlockRequest<K, V> req) {
+        clearLocks(nodeId, req);
+
+        if (isNearEnabled(cacheCfg))
+            near().clearLocks(nodeId, req);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void processNearLockRequest(UUID nodeId, GridNearLockRequest<K, V> 
req) {
+        assert isAffinityNode(cacheCfg);
+        assert nodeId != null;
+        assert req != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Processing near lock request [locNodeId=" + locNodeId + 
", nodeId=" + nodeId + ", req=" + req +
+                ']');
+
+        ClusterNode nearNode = ctx.discovery().node(nodeId);
+
+        if (nearNode == null) {
+            U.warn(log, "Received lock request from unknown node (will 
ignore): " + nodeId);
+
+            return;
+        }
+
+        // Group lock can be only started from local node, so we never start 
group lock transaction on remote node.
+        IgniteFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
+
+        // Register listener just so we print out errors.
+        // Exclude lock timeout exception since it's not a fatal exception.
+        f.listenAsync(CU.errorLogger(log, GridCacheLockTimeoutException.class,
+            GridDistributedLockCancelledException.class));
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse<K, V> 
res) {
+        assert nodeId != null;
+        assert res != null;
+        GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, 
V>)ctx.mvcc().<Boolean>future(res.version(),
+            res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Received response for unknown future (will ignore): 
" + res);
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> lockAllAsync(@Nullable Collection<? 
extends K> keys,
+        long timeout,
+        IgniteTxLocalEx<K, V> txx,
+        boolean isInvalidate,
+        boolean isRead,
+        boolean retval,
+        IgniteTxIsolation isolation,
+        long accessTtl,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        return lockAllAsyncInternal(keys,
+            timeout,
+            txx,
+            isInvalidate,
+            isRead,
+            retval,
+            isolation,
+            accessTtl,
+            filter);
+    }
+
+    /**
+     * Acquires locks in partitioned cache.
+     *
+     * @param keys Keys to lock.
+     * @param timeout Lock timeout.
+     * @param txx Transaction.
+     * @param isInvalidate Invalidate flag.
+     * @param isRead Read flag.
+     * @param retval Return value flag.
+     * @param isolation Transaction isolation.
+     * @param accessTtl TTL for read operation.
+     * @param filter Optional filter.
+     * @return Lock future.
+     */
+    public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable Collection<? 
extends K> keys,
+        long timeout,
+        IgniteTxLocalEx<K, V> txx,
+        boolean isInvalidate,
+        boolean isRead,
+        boolean retval,
+        IgniteTxIsolation isolation,
+        long accessTtl,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        if (keys == null || keys.isEmpty())
+            return new GridDhtFinishedFuture<>(ctx.kernalContext(), true);
+
+        GridDhtTxLocalAdapter<K, V> tx = (GridDhtTxLocalAdapter<K, V>)txx;
+
+        assert tx != null;
+
+        GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
+            ctx,
+            tx.nearNodeId(),
+            tx.nearXidVersion(),
+            tx.topologyVersion(),
+            keys.size(),
+            isRead,
+            timeout,
+            tx,
+            tx.threadId(),
+            accessTtl,
+            filter);
+
+        for (K key : keys) {
+            if (key == null)
+                continue;
+
+            try {
+                while (true) {
+                    GridDhtCacheEntry<K, V> entry = entryExx(key, 
tx.topologyVersion());
+
+                    try {
+                        fut.addEntry(entry);
+
+                        // Possible in case of cancellation or time out.
+                        if (fut.isDone())
+                            return fut;
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry when adding lock 
(will retry): " + entry);
+                    }
+                    catch (GridDistributedLockCancelledException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got lock request for cancelled lock 
(will fail): " + entry);
+
+                        return new 
GridDhtFinishedFuture<>(ctx.kernalContext(), e);
+                    }
+                }
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                fut.addInvalidPartition(ctx, e.partition());
+
+                if (log.isDebugEnabled())
+                    log.debug("Added invalid partition to DHT lock future 
[part=" + e.partition() + ", fut=" +
+                        fut + ']');
+            }
+        }
+
+        ctx.mvcc().addFuture(fut);
+
+        fut.map();
+
+        return fut;
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param nearNode Near node.
+     * @param req Request.
+     * @param filter0 Filter.
+     * @return Future.
+     */
+    public IgniteFuture<GridNearLockResponse<K, V>> lockAllAsync(
+        final GridCacheContext<K, V> cacheCtx,
+        final ClusterNode nearNode,
+        final GridNearLockRequest<K, V> req,
+        @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter0) {
+        final List<K> keys = req.keys();
+
+        IgniteFuture<Object> keyFut = null;
+
+        if (req.onePhaseCommit()) {
+            boolean forceKeys = req.hasTransforms() || req.filter() != null;
+
+            if (!forceKeys) {
+                for (int i = 0; i < req.keysCount() && !forceKeys; i++)
+                    forceKeys |= req.returnValue(i);
+            }
+
+            if (forceKeys)
+                keyFut = ctx.dht().dhtPreloader().request(keys, 
req.topologyVersion());
+        }
+
+        if (keyFut == null)
+            keyFut = new GridFinishedFutureEx<>();
+
+        return new GridEmbeddedFuture<>(true, keyFut,
+            new C2<Object, Exception, 
IgniteFuture<GridNearLockResponse<K,V>>>() {
+                @Override public IgniteFuture<GridNearLockResponse<K, V>> 
apply(Object o, Exception exx) {
+                    if (exx != null)
+                        return new 
GridDhtFinishedFuture<>(ctx.kernalContext(), exx);
+
+                    IgnitePredicate<GridCacheEntry<K, V>>[] filter = filter0;
+
+                    // Set message into thread context.
+                    GridDhtTxLocal<K, V> tx = null;
+
+                    try {
+                        int cnt = keys.size();
+
+                        if (req.inTx()) {
+                            GridCacheVersion dhtVer = 
ctx.tm().mappedVersion(req.version());
+
+                            if (dhtVer != null)
+                                tx = ctx.tm().tx(dhtVer);
+                        }
+
+                        final List<GridCacheEntryEx<K, V>> entries = new 
ArrayList<>(cnt);
+
+                        // Unmarshal filter first.
+                        if (filter == null)
+                            filter = req.filter();
+
+                        GridDhtLockFuture<K, V> fut = null;
+
+                        if (!req.inTx()) {
+                            fut = new GridDhtLockFuture<>(ctx,
+                                nearNode.id(),
+                                req.version(),
+                                req.topologyVersion(),
+                                cnt,
+                                req.txRead(),
+                                req.timeout(),
+                                tx,
+                                req.threadId(),
+                                req.accessTtl(),
+                                filter);
+
+                            // Add before mapping.
+                            if (!ctx.mvcc().addFuture(fut))
+                                throw new IllegalStateException("Duplicate 
future ID: " + fut);
+                        }
+
+                        boolean timedout = false;
+
+                        for (K key : keys) {
+                            if (timedout)
+                                break;
+
+                            while (true) {
+                                // Specify topology version to make sure 
containment is checked
+                                // based on the requested version, not the 
latest.
+                                GridDhtCacheEntry<K, V> entry = entryExx(key, 
req.topologyVersion());
+
+                                try {
+                                    if (fut != null) {
+                                        // This method will add local 
candidate.
+                                        // Entry cannot become obsolete after 
this method succeeded.
+                                        fut.addEntry(key == null ? null : 
entry);
+
+                                        if (fut.isDone()) {
+                                            timedout = true;
+
+                                            break;
+                                        }
+                                    }
+
+                                    entries.add(entry);
+
+                                    break;
+                                }
+                                catch (GridCacheEntryRemovedException ignore) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got removed entry when 
adding lock (will retry): " + entry);
+                                }
+                                catch (GridDistributedLockCancelledException 
e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got lock request for 
cancelled lock (will ignore): " +
+                                            entry);
+
+                                    fut.onError(e);
+
+                                    return new 
GridDhtFinishedFuture<>(ctx.kernalContext(), e);
+                                }
+                            }
+                        }
+
+                        // Handle implicit locks for pessimistic transactions.
+                        if (req.inTx()) {
+                            if (tx == null) {
+                                tx = new GridDhtTxLocal<>(
+                                    ctx.shared(),
+                                    nearNode.id(),
+                                    req.version(),
+                                    req.futureId(),
+                                    req.miniId(),
+                                    req.threadId(),
+                                    req.implicitTx(),
+                                    req.implicitSingleTx(),
+                                    ctx.system(),
+                                    PESSIMISTIC,
+                                    req.isolation(),
+                                    req.timeout(),
+                                    req.isInvalidate(),
+                                    false,
+                                    req.txSize(),
+                                    req.groupLockKey(),
+                                    req.partitionLock(),
+                                    null,
+                                    req.subjectId(),
+                                    req.taskNameHash());
+
+                                tx.syncCommit(req.syncCommit());
+
+                                tx = ctx.tm().onCreated(tx);
+
+                                if (tx == null || !tx.init()) {
+                                    String msg = "Failed to acquire lock 
(transaction has been completed): " +
+                                        req.version();
+
+                                    U.warn(log, msg);
+
+                                    if (tx != null)
+                                        tx.rollback();
+
+                                    return new 
GridDhtFinishedFuture<>(ctx.kernalContext(), new IgniteCheckedException(msg));
+                                }
+
+                                tx.topologyVersion(req.topologyVersion());
+                            }
+
+                            ctx.tm().txContext(tx);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Performing DHT lock [tx=" + tx + ", 
entries=" + entries + ']');
+
+                            assert req.writeEntries() == null || 
req.writeEntries().size() == entries.size();
+
+                            IgniteFuture<GridCacheReturn<V>> txFut = 
tx.lockAllAsync(
+                                cacheCtx,
+                                entries,
+                                req.writeEntries(),
+                                req.onePhaseCommit(),
+                                req.drVersions(),
+                                req.messageId(),
+                                req.implicitTx(),
+                                req.txRead(),
+                                req.accessTtl());
+
+                            final GridDhtTxLocal<K, V> t = tx;
+
+                            return new GridDhtEmbeddedFuture<>(
+                                txFut,
+                                new C2<GridCacheReturn<V>, Exception, 
IgniteFuture<GridNearLockResponse<K, V>>>() {
+                                    @Override public 
IgniteFuture<GridNearLockResponse<K, V>> apply(
+                                        GridCacheReturn<V> o, Exception e) {
+                                        if (e != null)
+                                            e = U.unwrap(e);
+
+                                        assert !t.empty();
+
+                                        // Create response while holding locks.
+                                        final GridNearLockResponse<K, V> resp 
= createLockReply(nearNode,
+                                            entries,
+                                            req,
+                                            t,
+                                            t.xidVersion(),
+                                            e);
+
+                                        if (resp.error() == null && 
t.onePhaseCommit()) {
+                                            assert t.implicit();
+
+                                            return t.commitAsync().chain(
+                                                new C1<IgniteFuture<IgniteTx>, 
GridNearLockResponse<K, V>>() {
+                                                    @Override public 
GridNearLockResponse<K, V> apply(IgniteFuture<IgniteTx> f) {
+                                                        try {
+                                                            // Check for error.
+                                                            f.get();
+                                                        }
+                                                        catch 
(IgniteCheckedException e1) {
+                                                            resp.error(e1);
+                                                        }
+
+                                                        
sendLockReply(nearNode, t, req, resp);
+
+                                                        return resp;
+                                                    }
+                                                });
+                                        }
+                                        else {
+                                            sendLockReply(nearNode, t, req, 
resp);
+
+                                            return new 
GridFinishedFutureEx<>(resp);
+                                        }
+                                    }
+                                },
+                                ctx.kernalContext());
+                        }
+                        else {
+                            assert fut != null;
+
+                            // This will send remote messages.
+                            fut.map();
+
+                            final GridCacheVersion mappedVer = fut.version();
+
+                            return new GridDhtEmbeddedFuture<>(
+                                ctx.kernalContext(),
+                                fut,
+                                new C2<Boolean, Exception, 
GridNearLockResponse<K, V>>() {
+                                    @Override public GridNearLockResponse<K, 
V> apply(Boolean b, Exception e) {
+                                        if (e != null)
+                                            e = U.unwrap(e);
+                                        else if (!b)
+                                            e = new 
GridCacheLockTimeoutException(req.version());
+
+                                        GridNearLockResponse<K, V> res = 
createLockReply(nearNode,
+                                            entries,
+                                            req,
+                                            null,
+                                            mappedVer,
+                                            e);
+
+                                        sendLockReply(nearNode, null, req, 
res);
+
+                                        return res;
+                                    }
+                                });
+                        }
+                    }
+                    catch (IgniteCheckedException e) {
+                        String err = "Failed to unmarshal at least one of the 
keys for lock request message: " + req;
+
+                        U.error(log, err, e);
+
+                        if (tx != null) {
+                            try {
+                                tx.rollback();
+                            }
+                            catch (IgniteCheckedException ex) {
+                                U.error(log, "Failed to rollback the 
transaction: " + tx, ex);
+                            }
+                        }
+
+                        return new GridDhtFinishedFuture<>(ctx.kernalContext(),
+                            new IgniteCheckedException(err, e));
+                    }
+                }
+            },
+            ctx.kernalContext());
+    }
+
+    /**
+     * @param nearNode Near node.
+     * @param entries Entries.
+     * @param req Lock request.
+     * @param tx Transaction.
+     * @param mappedVer Mapped version.
+     * @param err Error.
+     * @return Response.
+     */
+    private GridNearLockResponse<K, V> createLockReply(
+        ClusterNode nearNode,
+        List<GridCacheEntryEx<K, V>> entries,
+        GridNearLockRequest<K, V> req,
+        @Nullable GridDhtTxLocalAdapter<K,V> tx,
+        GridCacheVersion mappedVer,
+        Throwable err) {
+        assert mappedVer != null;
+        assert tx == null || tx.xidVersion().equals(mappedVer);
+
+        try {
+            // Send reply back to originating near node.
+            GridNearLockResponse<K, V> res = new 
GridNearLockResponse<>(ctx.cacheId(),
+                req.version(), req.futureId(), req.miniId(), tx != null && 
tx.onePhaseCommit(), entries.size(), err);
+
+            if (err == null) {
+                res.pending(localDhtPendingVersions(entries, mappedVer));
+
+                // We have to add completed versions for cases when nearLocal 
and remote transactions
+                // execute concurrently.
+                
res.completedVersions(ctx.tm().committedVersions(req.version()),
+                    ctx.tm().rolledbackVersions(req.version()));
+
+                int i = 0;
+
+                for (ListIterator<GridCacheEntryEx<K, V>> it = 
entries.listIterator(); it.hasNext();) {
+                    GridCacheEntryEx<K, V> e = it.next();
+
+                    assert e != null;
+
+                    while (true) {
+                        try {
+                            // Don't return anything for invalid partitions.
+                            if (tx == null || !tx.isRollbackOnly()) {
+                                GridCacheVersion dhtVer = req.dhtVersion(i);
+
+                                try {
+                                    GridCacheVersion ver = e.version();
+
+                                    boolean ret = req.returnValue(i) || dhtVer 
== null || !dhtVer.equals(ver);
+
+                                    V val = null;
+
+                                    if (ret)
+                                        val = e.innerGet(tx,
+                                            /*swap*/true,
+                                            
/*read-through*/ctx.loadPreviousValue(),
+                                            /*fail-fast.*/false,
+                                            /*unmarshal*/false,
+                                            /*update-metrics*/true,
+                                            /*event 
notification*/req.returnValue(i),
+                                            /*temporary*/false,
+                                            CU.subjectId(tx, ctx.shared()),
+                                            null,
+                                            tx != null ? tx.resolveTaskName() 
: null,
+                                            CU.<K, V>empty(),
+                                            null);
+
+                                    assert e.lockedBy(mappedVer) ||
+                                        (ctx.mvcc().isRemoved(e.context(), 
mappedVer) && req.timeout() > 0) :
+                                        "Entry does not own lock for tx 
[locNodeId=" + ctx.localNodeId() +
+                                            ", entry=" + e +
+                                            ", mappedVer=" + mappedVer + ", 
ver=" + ver +
+                                            ", tx=" + tx + ", req=" + req +
+                                            ", err=" + err + ']';
+
+                                    boolean filterPassed = false;
+
+                                    if (tx != null && tx.onePhaseCommit()) {
+                                        IgniteTxEntry<K, V> writeEntry = 
tx.entry(ctx.txKey(e.key()));
+
+                                        assert writeEntry != null :
+                                            "Missing tx entry for locked cache 
entry: " + e;
+
+                                        filterPassed = 
writeEntry.filtersPassed();
+                                    }
+
+                                    GridCacheValueBytes valBytes = ret ? 
e.valueBytes(null) : GridCacheValueBytes.nil();
+
+                                    // We include values into response since 
they are required for local
+                                    // calls and won't be serialized. We are 
also including DHT version.
+                                    res.addValueBytes(
+                                        val != null ? val : 
(V)valBytes.getIfPlain(),
+                                        ret ? valBytes.getIfMarshaled() : null,
+                                        filterPassed,
+                                        ver,
+                                        mappedVer,
+                                        ctx);
+                                }
+                                catch (GridCacheFilterFailedException ex) {
+                                    assert false : "Filter should never fail 
if fail-fast is false.";
+
+                                    ex.printStackTrace();
+                                }
+                            }
+                            else {
+                                // We include values into response since they 
are required for local
+                                // calls and won't be serialized. We are also 
including DHT version.
+                                res.addValueBytes(null, null, false, 
e.version(), mappedVer, ctx);
+                            }
+
+                            break;
+                        }
+                        catch (GridCacheEntryRemovedException ignore) {
+                            if (log.isDebugEnabled())
+                                log.debug("Got removed entry when sending 
reply to DHT lock request " +
+                                    "(will retry): " + e);
+
+                            e = entryExx(e.key());
+
+                            it.set(e);
+                        }
+                    }
+
+                    i++;
+                }
+            }
+
+            return res;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to get value for lock reply message for node 
[node=" +
+                U.toShortString(nearNode) + ", req=" + req + ']', e);
+
+            return new GridNearLockResponse<>(ctx.cacheId(), req.version(), 
req.futureId(), req.miniId(), false,
+                entries.size(), e);
+        }
+    }
+
+    /**
+     * Send lock reply back to near node.
+     *
+     * @param nearNode Near node.
+     * @param tx Transaction.
+     * @param req Lock request.
+     * @param res Lock response.
+     */
+    private void sendLockReply(
+        ClusterNode nearNode,
+        @Nullable IgniteTxEx<K,V> tx,
+        GridNearLockRequest<K, V> req,
+        GridNearLockResponse<K, V> res
+    ) {
+        Throwable err = res.error();
+
+        // Log error before sending reply.
+        if (err != null && !(err instanceof GridCacheLockTimeoutException))
+            U.error(log, "Failed to acquire lock for request: " + req, err);
+
+        try {
+            // Don't send reply message to this node or if lock was cancelled.
+            if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, 
GridDistributedLockCancelledException.class))
+                ctx.io().send(nearNode, res, ctx.system() ? UTILITY_CACHE_POOL 
: SYSTEM_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send lock reply to originating node (will 
rollback transaction) [node=" +
+                U.toShortString(nearNode) + ", req=" + req + ']', e);
+
+            if (tx != null)
+                tx.rollbackAsync();
+
+            // Convert to closure exception as this method is only called form 
closures.
+            throw new GridClosureException(e);
+        }
+    }
+
+    /**
+     * Collects versions of pending candidates versions less then base.
+     *
+     * @param entries Tx entries to process.
+     * @param baseVer Base version.
+     * @return Collection of pending candidates versions.
+     */
+    private Collection<GridCacheVersion> 
localDhtPendingVersions(Iterable<GridCacheEntryEx<K, V>> entries,
+        GridCacheVersion baseVer) {
+        Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
+
+        for (GridCacheEntryEx<K, V> entry : entries) {
+            // Since entries were collected before locks are added, some of 
them may become obsolete.
+            while (true) {
+                try {
+                    for (GridCacheMvccCandidate cand : 
entry.localCandidates()) {
+                        if (cand.version().isLess(baseVer))
+                            lessPending.add(cand.version());
+                    }
+
+                    break; // While.
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry is 
localDhtPendingVersions (will retry): " + entry);
+
+                    entry = entryExx(entry.key());
+                }
+            }
+        }
+
+        return lessPending;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    @SuppressWarnings({"RedundantTypeArguments"})
+    private void clearLocks(UUID nodeId, GridDistributedUnlockRequest<K, V> 
req) {
+        assert nodeId != null;
+
+        List<K> keys = req.keys();
+
+        if (keys != null) {
+            for (K key : keys) {
+                while (true) {
+                    GridDistributedCacheEntry<K, V> entry = peekExx(key);
+
+                    boolean created = false;
+
+                    if (entry == null) {
+                        entry = entryExx(key);
+
+                        created = true;
+                    }
+
+                    try {
+                        entry.doneRemote(
+                            req.version(),
+                            req.version(),
+                            null,
+                            null,
+                            null,
+                            /*system invalidate*/false);
+
+                        // Note that we don't reorder completed versions here,
+                        // as there is no point to reorder relative to the 
version
+                        // we are about to remove.
+                        if (entry.removeLock(req.version())) {
+                            if (log.isDebugEnabled())
+                                log.debug("Removed lock [lockId=" + 
req.version() + ", key=" + key + ']');
+                        }
+                        else {
+                            if (log.isDebugEnabled())
+                                log.debug("Received unlock request for unknown 
candidate " +
+                                    "(added to cancelled locks set): " + req);
+                        }
+
+                        if (created && entry.markObsolete(req.version()))
+                            removeEntry(entry);
+
+                        ctx.evicts().touch(entry, 
ctx.affinity().affinityTopologyVersion());
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received remove lock request for 
removed entry (will retry) [entry=" +
+                                entry + ", req=" + req + ']');
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param req Request.
+     */
+    @SuppressWarnings({"RedundantTypeArguments", "TypeMayBeWeakened"})
+    private void processNearUnlockRequest(UUID nodeId, 
GridNearUnlockRequest<K, V> req) {
+        assert isAffinityNode(cacheCfg);
+        assert nodeId != null;
+
+        removeLocks(nodeId, req.version(), req.keys(), true);
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param topVer Topology version.
+     * @param cached Entry.
+     * @param readers Readers for this entry.
+     * @param dhtMap DHT map.
+     * @param nearMap Near map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void map(UUID nodeId,
+        long topVer,
+        GridCacheEntryEx<K,V> cached,
+        Collection<UUID> readers,
+        Map<ClusterNode, List<T2<K, byte[]>>> dhtMap,
+        Map<ClusterNode, List<T2<K, byte[]>>> nearMap)
+        throws IgniteCheckedException {
+        Collection<ClusterNode> dhtNodes = 
ctx.dht().topology().nodes(cached.partition(), topVer);
+
+        ClusterNode primary = F.first(dhtNodes);
+
+        assert primary != null;
+
+        if (!primary.id().equals(ctx.nodeId())) {
+            if (log.isDebugEnabled())
+                log.debug("Primary node mismatch for unlock [entry=" + cached 
+ ", expected=" + ctx.nodeId() +
+                    ", actual=" + U.toShortString(primary) + ']');
+
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Mapping entry to DHT nodes [nodes=" + 
U.toShortString(dhtNodes) + ", entry=" + cached + ']');
+
+        Collection<ClusterNode> nearNodes = null;
+
+        if (!F.isEmpty(readers)) {
+            nearNodes = ctx.discovery().nodes(readers, 
F0.not(F.idForNodeId(nodeId)));
+
+            if (log.isDebugEnabled())
+                log.debug("Mapping entry to near nodes [nodes=" + 
U.toShortString(nearNodes) + ", entry=" + cached +
+                    ']');
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Entry has no near readers: " + cached);
+        }
+
+        map(cached, F.view(dhtNodes, F.remoteNodes(ctx.nodeId())), dhtMap); // 
Exclude local node.
+        map(cached, nearNodes, nearMap);
+    }
+
+    /**
+     * @param entry Entry.
+     * @param nodes Nodes.
+     * @param map Map.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings( {"MismatchedQueryAndUpdateOfCollection"})
+    private void map(GridCacheEntryEx<K, V> entry,
+        @Nullable Iterable<? extends ClusterNode> nodes,
+        Map<ClusterNode, List<T2<K, byte[]>>> map) throws 
IgniteCheckedException {
+        if (nodes != null) {
+            for (ClusterNode n : nodes) {
+                List<T2<K, byte[]>> keys = map.get(n);
+
+                if (keys == null)
+                    map.put(n, keys = new LinkedList<>());
+
+                keys.add(new T2<>(entry.key(), entry.getOrMarshalKeyBytes()));
+            }
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param ver Version.
+     * @param keys Keys.
+     * @param unmap Flag for un-mapping version.
+     */
+    public void removeLocks(UUID nodeId, GridCacheVersion ver, Iterable<? 
extends K> keys, boolean unmap) {
+        assert nodeId != null;
+        assert ver != null;
+
+        if (F.isEmpty(keys))
+            return;
+
+        // Remove mapped versions.
+        GridCacheVersion dhtVer = unmap ? ctx.mvcc().unmapVersion(ver) : ver;
+
+        Map<ClusterNode, List<T2<K, byte[]>>> dhtMap = new HashMap<>();
+        Map<ClusterNode, List<T2<K, byte[]>>> nearMap = new HashMap<>();
+
+        GridCacheVersion obsoleteVer = null;
+
+        for (K key : keys) {
+            while (true) {
+                boolean created = false;
+
+                GridDhtCacheEntry<K, V> entry = peekExx(key);
+
+                if (entry == null) {
+                    entry = entryExx(key);
+
+                    created = true;
+                }
+
+                try {
+                    GridCacheMvccCandidate<K> cand = null;
+
+                    if (dhtVer == null) {
+                        cand = entry.localCandidateByNearVersion(ver, true);
+
+                        if (cand != null)
+                            dhtVer = cand.version();
+                        else {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to locate lock candidate 
based on dht or near versions [nodeId=" +
+                                    nodeId + ", ver=" + ver + ", unmap=" + 
unmap + ", keys=" + keys + ']');
+
+                            entry.removeLock(ver);
+
+                            if (created) {
+                                if (obsoleteVer == null)
+                                    obsoleteVer = ctx.versions().next();
+
+                                if (entry.markObsolete(obsoleteVer))
+                                    removeEntry(entry);
+                            }
+
+                            break;
+                        }
+                    }
+
+                    if (cand == null)
+                        cand = entry.candidate(dhtVer);
+
+                    long topVer = cand == null ? -1 : cand.topologyVersion();
+
+                    // Note that we obtain readers before lock is removed.
+                    // Even in case if entry would be removed just after lock 
is removed,
+                    // we must send release messages to backups and readers.
+                    Collection<UUID> readers = entry.readers();
+
+                    // Note that we don't reorder completed versions here,
+                    // as there is no point to reorder relative to the version
+                    // we are about to remove.
+                    if (entry.removeLock(dhtVer)) {
+                        // Map to backups and near readers.
+                        map(nodeId, topVer, entry, readers, dhtMap, nearMap);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Removed lock [lockId=" + ver + ", key=" 
+ key + ']');
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Received unlock request for unknown 
candidate " +
+                            "(added to cancelled locks set) [ver=" + ver + ", 
entry=" + entry + ']');
+
+                    if (created && entry.markObsolete(dhtVer))
+                        removeEntry(entry);
+
+                    ctx.evicts().touch(entry, topVer);
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received remove lock request for removed 
entry (will retry): " + entry);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to remove locks for keys: " + keys, 
e);
+                }
+            }
+        }
+
+        Collection<GridCacheVersion> committed = 
ctx.tm().committedVersions(ver);
+        Collection<GridCacheVersion> rolledback = 
ctx.tm().rolledbackVersions(ver);
+
+        // Backups.
+        for (Map.Entry<ClusterNode, List<T2<K, byte[]>>> entry : 
dhtMap.entrySet()) {
+            ClusterNode n = entry.getKey();
+
+            List<T2<K, byte[]>> keyBytes = entry.getValue();
+
+            GridDhtUnlockRequest<K, V> req = new 
GridDhtUnlockRequest<>(ctx.cacheId(), keyBytes.size());
+
+            req.version(dhtVer);
+
+            try {
+                for (T2<K, byte[]> key : keyBytes)
+                    req.addKey(key.get1(), key.get2(), ctx);
+
+                keyBytes = nearMap.get(n);
+
+                if (keyBytes != null)
+                    for (T2<K, byte[]> key : keyBytes)
+                        req.addNearKey(key.get1(), key.get2(), ctx.shared());
+
+                req.completedVersions(committed, rolledback);
+
+                ctx.io().send(n, req);
+            }
+            catch (ClusterTopologyException ignore) {
+                if (log.isDebugEnabled())
+                    log.debug("Node left while sending unlock request: " + n);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send unlock request to node (will make 
best effort to complete): " + n, e);
+            }
+        }
+
+        // Readers.
+        for (Map.Entry<ClusterNode, List<T2<K, byte[]>>> entry : 
nearMap.entrySet()) {
+            ClusterNode n = entry.getKey();
+
+            if (!dhtMap.containsKey(n)) {
+                List<T2<K, byte[]>> keyBytes = entry.getValue();
+
+                GridDhtUnlockRequest<K, V> req = new 
GridDhtUnlockRequest<>(ctx.cacheId(), keyBytes.size());
+
+                req.version(dhtVer);
+
+                try {
+                    for (T2<K, byte[]> key : keyBytes)
+                        req.addNearKey(key.get1(), key.get2(), ctx.shared());
+
+                    req.completedVersions(committed, rolledback);
+
+                    ctx.io().send(n, req);
+                }
+                catch (ClusterTopologyException ignore) {
+                    if (log.isDebugEnabled())
+                        log.debug("Node left while sending unlock request: " + 
n);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send unlock request to node (will 
make best effort to complete): " + n, e);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param key Key
+     * @param ver Version.
+     * @throws IgniteCheckedException If invalidate failed.
+     */
+    private void invalidateNearEntry(K key, GridCacheVersion ver) throws 
IgniteCheckedException {
+        GridCacheEntryEx<K, V> nearEntry = near().peekEx(key);
+
+        if (nearEntry != null)
+            nearEntry.invalidate(null, ver);
+    }
+
+    /**
+     * @param key Key
+     * @param ver Version.
+     */
+    private void obsoleteNearEntry(K key, GridCacheVersion ver) {
+        GridCacheEntryEx<K, V> nearEntry = near().peekEx(key);
+
+        if (nearEntry != null)
+            nearEntry.markObsolete(ver);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
new file mode 100644
index 0000000..cb09fbf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.IgniteTxState.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ *
+ */
+public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFuture<IgniteTx>
+    implements GridCacheFuture<IgniteTx> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+
+    /** Context. */
+    private GridCacheSharedContext<K, V> cctx;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Transaction. */
+    @GridToStringExclude
+    private GridDhtTxLocalAdapter<K, V> tx;
+
+    /** Commit flag. */
+    private boolean commit;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Error. */
+    @GridToStringExclude
+    private AtomicReference<Throwable> err = new AtomicReference<>(null);
+
+    /** DHT mappings. */
+    private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap;
+
+    /** Near mappings. */
+    private Map<UUID, GridDistributedTxMapping<K, V>> nearMap;
+
+    /** Trackable flag. */
+    private boolean trackable = true;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtTxFinishFuture() {
+        // No-op.
+    }
+
+    /**
+     * @param cctx Context.
+     * @param tx Transaction.
+     * @param commit Commit flag.
+     */
+    public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, 
GridDhtTxLocalAdapter<K, V> tx, boolean commit) {
+        super(cctx.kernalContext(), F.<IgniteTx>identityReducer(tx));
+
+        assert cctx != null;
+
+        this.cctx = cctx;
+        this.tx = tx;
+        this.commit = commit;
+
+        dhtMap = tx.dhtMap();
+        nearMap = tx.nearMap();
+
+        futId = IgniteUuid.randomUuid();
+
+        log = U.logger(ctx, logRef, GridDhtTxFinishFuture.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return tx.xidVersion();
+    }
+
+    /**
+     * @return Involved nodes.
+     */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        return
+            F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, 
ClusterNode>() {
+                @Nullable @Override public ClusterNode apply(IgniteFuture<?> 
f) {
+                    if (isMini(f))
+                        return ((MiniFuture)f).node();
+
+                    return cctx.discovery().localNode();
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        for (IgniteFuture<?> fut : futures())
+            if (isMini(fut)) {
+                MiniFuture f = (MiniFuture)fut;
+
+                if (f.node().id().equals(nodeId)) {
+                    f.onResult(new ClusterTopologyException("Remote node left 
grid (will retry): " + nodeId));
+
+                    return true;
+                }
+            }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return trackable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        trackable = false;
+    }
+
+    /**
+     * @param e Error.
+     */
+    public void onError(Throwable e) {
+        if (err.compareAndSet(null, e)) {
+            boolean marked = tx.setRollbackOnly();
+
+            if (e instanceof IgniteTxRollbackException) {
+                if (marked) {
+                    try {
+                        tx.rollback();
+                    }
+                    catch (IgniteCheckedException ex) {
+                        U.error(log, "Failed to automatically rollback 
transaction: " + tx, ex);
+                    }
+                }
+            }
+            else if (tx.isSystemInvalidate()) { // Invalidate remote 
transactions on heuristic error.
+                finish();
+
+                try {
+                    get();
+                }
+                catch (IgniteTxHeuristicException ignore) {
+                    // Future should complete with 
GridCacheTxHeuristicException.
+                }
+                catch (IgniteCheckedException err) {
+                    U.error(log, "Failed to invalidate transaction: " + tx, 
err);
+                }
+            }
+
+            onComplete();
+        }
+    }
+
+    /**
+     * @param nodeId Sender.
+     * @param res Result.
+     */
+    public void onResult(UUID nodeId, GridDhtTxFinishResponse<K, V> res) {
+        if (!isDone()) {
+            for (IgniteFuture<IgniteTx> fut : futures()) {
+                if (isMini(fut)) {
+                    MiniFuture f = (MiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId())) {
+                        assert f.node().id().equals(nodeId);
+
+                        f.onResult(res);
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(IgniteTx tx, Throwable err) {
+        if (initialized() || err != null) {
+            if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING))
+                this.tx.tmCommit();
+
+            Throwable e = this.err.get();
+
+            if (super.onDone(tx, e != null ? e : err)) {
+                // Always send finish reply.
+                this.tx.sendFinishReply(commit, error());
+
+                // Don't forget to clean up.
+                cctx.mvcc().removeFuture(this);
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param f Future.
+     * @return {@code True} if mini-future.
+     */
+    private boolean isMini(IgniteFuture<?> f) {
+        return f.getClass().equals(MiniFuture.class);
+    }
+
+    /**
+     * Completeness callback.
+     */
+    private void onComplete() {
+        onDone(tx, err.get());
+    }
+
+    /**
+     * Completes this future.
+     */
+    void complete() {
+        onComplete();
+    }
+
+    /**
+     * Initializes future.
+     */
+    public void finish() {
+        if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap)) {
+            boolean sync = finish(dhtMap, nearMap);
+
+            markInitialized();
+
+            if (!sync)
+                onComplete();
+        }
+        else {
+            markInitialized();
+
+            // No backup or near nodes to send commit message to (just 
complete then).
+            onComplete();
+        }
+    }
+
+    /**
+     * @param dhtMap DHT map.
+     * @param nearMap Near map.
+     * @return {@code True} in case there is at least one synchronous {@code 
MiniFuture} to wait for.
+     */
+    private boolean finish(Map<UUID, GridDistributedTxMapping<K, V>> dhtMap,
+        Map<UUID, GridDistributedTxMapping<K, V>> nearMap) {
+        boolean res = false;
+
+        boolean sync = commit ? tx.syncCommit() : tx.syncRollback();
+
+        // Create mini futures.
+        for (GridDistributedTxMapping<K, V> dhtMapping : dhtMap.values()) {
+            ClusterNode n = dhtMapping.node();
+
+            assert !n.isLocal();
+
+            GridDistributedTxMapping<K, V> nearMapping = nearMap.get(n.id());
+
+            if (dhtMapping.empty() && nearMapping != null && 
nearMapping.empty())
+                // Nothing to send.
+                continue;
+
+            MiniFuture fut = new MiniFuture(dhtMapping, nearMapping);
+
+            add(fut); // Append new future.
+
+            GridDhtTxFinishRequest<K, V> req = new GridDhtTxFinishRequest<>(
+                tx.nearNodeId(),
+                futId,
+                fut.futureId(),
+                tx.topologyVersion(),
+                tx.xidVersion(),
+                tx.commitVersion(),
+                tx.threadId(),
+                tx.isolation(),
+                commit,
+                tx.isInvalidate(),
+                tx.system(),
+                tx.isSystemInvalidate(),
+                tx.syncCommit(),
+                tx.syncRollback(),
+                tx.completedBase(),
+                tx.committedVersions(),
+                tx.rolledbackVersions(),
+                tx.pendingVersions(),
+                tx.size(),
+                tx.pessimistic() ? dhtMapping.writes() : null,
+                tx.pessimistic() && nearMapping != null ? nearMapping.writes() 
: null,
+                tx.recoveryWrites(),
+                tx.onePhaseCommit(),
+                tx.groupLockKey(),
+                tx.subjectId(),
+                tx.taskNameHash());
+
+            if (!tx.pessimistic()) {
+                int idx = 0;
+
+                for (IgniteTxEntry<K, V> e : dhtMapping.writes())
+                    req.ttl(idx++, e.ttl());
+
+                if (nearMapping != null) {
+                    idx = 0;
+
+                    for (IgniteTxEntry<K, V> e : nearMapping.writes())
+                        req.nearTtl(idx++, e.ttl());
+                }
+            }
+
+            if (tx.onePhaseCommit())
+                req.writeVersion(tx.writeVersion());
+
+            try {
+                cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : 
SYSTEM_POOL);
+
+                if (sync)
+                    res = true;
+                else
+                    fut.onDone();
+            }
+            catch (IgniteCheckedException e) {
+                // Fail the whole thing.
+                if (e instanceof ClusterTopologyException)
+                    fut.onResult((ClusterTopologyException)e);
+                else
+                    fut.onResult(e);
+            }
+        }
+
+        for (GridDistributedTxMapping<K, V> nearMapping : nearMap.values()) {
+            if (!dhtMap.containsKey(nearMapping.node().id())) {
+                if (nearMapping.empty())
+                    // Nothing to send.
+                    continue;
+
+                MiniFuture fut = new MiniFuture(null, nearMapping);
+
+                add(fut); // Append new future.
+
+                GridDhtTxFinishRequest<K, V> req = new 
GridDhtTxFinishRequest<>(
+                    tx.nearNodeId(),
+                    futId,
+                    fut.futureId(),
+                    tx.topologyVersion(),
+                    tx.xidVersion(),
+                    tx.commitVersion(),
+                    tx.threadId(),
+                    tx.isolation(),
+                    commit,
+                    tx.isInvalidate(),
+                    tx.system(),
+                    tx.isSystemInvalidate(),
+                    tx.syncCommit(),
+                    tx.syncRollback(),
+                    tx.completedBase(),
+                    tx.committedVersions(),
+                    tx.rolledbackVersions(),
+                    tx.pendingVersions(),
+                    tx.size(),
+                    null,
+                    tx.pessimistic() ? nearMapping.writes() : null,
+                    tx.recoveryWrites(),
+                    tx.onePhaseCommit(),
+                    tx.groupLockKey(),
+                    tx.subjectId(),
+                    tx.taskNameHash());
+
+                if (!tx.pessimistic()) {
+                    int idx = 0;
+
+                    for (IgniteTxEntry<K, V> e : nearMapping.writes())
+                        req.nearTtl(idx++, e.ttl());
+                }
+
+                if (tx.onePhaseCommit())
+                    req.writeVersion(tx.writeVersion());
+
+                try {
+                    cctx.io().send(nearMapping.node(), req, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
+
+                    if (sync)
+                        res = true;
+                    else
+                        fut.onDone();
+                }
+                catch (IgniteCheckedException e) {
+                    // Fail the whole thing.
+                    if (e instanceof ClusterTopologyException)
+                        fut.onResult((ClusterTopologyException)e);
+                    else
+                        fut.onResult(e);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxFinishFuture.class, this, super.toString());
+    }
+
+    /**
+     * Mini-future for get operations. Mini-futures are only waiting on a 
single
+     * node as opposed to multiple nodes.
+     */
+    private class MiniFuture extends GridFutureAdapter<IgniteTx> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /** DHT mapping. */
+        @GridToStringInclude
+        private GridDistributedTxMapping<K, V> dhtMapping;
+
+        /** Near mapping. */
+        @GridToStringInclude
+        private GridDistributedTxMapping<K, V> nearMapping;
+
+        /**
+         * Empty constructor required for {@link Externalizable}.
+         */
+        public MiniFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param dhtMapping Mapping.
+         * @param nearMapping nearMapping.
+         */
+        MiniFuture(GridDistributedTxMapping<K, V> dhtMapping, 
GridDistributedTxMapping<K, V> nearMapping) {
+            super(cctx.kernalContext());
+
+            assert dhtMapping == null || nearMapping == null || 
dhtMapping.node() == nearMapping.node();
+
+            this.dhtMapping = dhtMapping;
+            this.nearMapping = nearMapping;
+        }
+
+        /**
+         * @return Future ID.
+         */
+        IgniteUuid futureId() {
+            return futId;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public ClusterNode node() {
+            return dhtMapping != null ? dhtMapping.node() : nearMapping.node();
+        }
+
+        /**
+         * @param e Error.
+         */
+        void onResult(Throwable e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to get future result [fut=" + this + ", 
err=" + e + ']');
+
+            // Fail.
+            onDone(e);
+        }
+
+        /**
+         * @param e Node failure.
+         */
+        void onResult(ClusterTopologyException e) {
+            if (log.isDebugEnabled())
+                log.debug("Remote node left grid while sending or waiting for 
reply (will ignore): " + this);
+
+            // If node left, then there is nothing to commit on it.
+            onDone(tx);
+        }
+
+        /**
+         * @param res Result callback.
+         */
+        void onResult(GridDhtTxFinishResponse<K, V> res) {
+            if (log.isDebugEnabled())
+                log.debug("Transaction synchronously completed on node [node=" 
+ node() + ", res=" + res + ']');
+
+            onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MiniFuture.class, this, "done", isDone(), 
"cancelled", isCancelled(), "err", error());
+        }
+    }
+}

Reply via email to