http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
new file mode 100644
index 0000000..8669598
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -0,0 +1,1074 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.IgniteTxState.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ *
+ */
+public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFuture<IgniteTxEx<K, V>>
+    implements GridCacheMvccFuture<K, V, IgniteTxEx<K, V>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+
+    /** Context. */
+    private GridCacheSharedContext<K, V> cctx;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Transaction. */
+    @GridToStringExclude
+    private GridDhtTxLocalAdapter<K, V> tx;
+
+    /** Near mappings. */
+    private Map<UUID, GridDistributedTxMapping<K, V>> nearMap;
+
+    /** DHT mappings. */
+    private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Error. */
+    private AtomicReference<Throwable> err = new AtomicReference<>(null);
+
+    /** Replied flag. */
+    private AtomicBoolean replied = new AtomicBoolean(false);
+
+    /** All replies flag. */
+    private AtomicBoolean mapped = new AtomicBoolean(false);
+
+    /** Prepare reads. */
+    private Iterable<IgniteTxEntry<K, V>> reads;
+
+    /** Prepare writes. */
+    private Iterable<IgniteTxEntry<K, V>> writes;
+
+    /** Tx nodes. */
+    private Map<UUID, Collection<UUID>> txNodes;
+
+    /** Trackable flag. */
+    private boolean trackable = true;
+
+    /** Near mini future id. */
+    private IgniteUuid nearMiniId;
+
+    /** DHT versions map. */
+    private Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap;
+
+    /** {@code True} if this is last prepare operation for node. */
+    private boolean last;
+
+    /** IDs of backup nodes receiving last prepare request during this 
prepare. */
+    private Collection<UUID> lastBackups;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtTxPrepareFuture() {
+        // No-op.
+    }
+
+    /**
+     * @param cctx Context.
+     * @param tx Transaction.
+     * @param nearMiniId Near mini future id.
+     * @param dhtVerMap DHT versions map.
+     * @param last {@code True} if this is last prepare operation for node.
+     * @param lastBackups IDs of backup nodes receiving last prepare request 
during this prepare.
+     */
+    public GridDhtTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final 
GridDhtTxLocalAdapter<K, V> tx,
+        IgniteUuid nearMiniId, Map<IgniteTxKey<K>, GridCacheVersion> 
dhtVerMap, boolean last, Collection<UUID> lastBackups) {
+        super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, 
IgniteTxEx<K, V>>() {
+            @Override public boolean collect(IgniteTxEx<K, V> e) {
+                return true;
+            }
+
+            @Override public IgniteTxEx<K, V> reduce() {
+                // Nothing to aggregate.
+                return tx;
+            }
+        });
+
+        assert cctx != null;
+
+        this.cctx = cctx;
+        this.tx = tx;
+        this.dhtVerMap = dhtVerMap;
+        this.last = last;
+        this.lastBackups = lastBackups;
+
+        futId = IgniteUuid.randomUuid();
+
+        this.nearMiniId = nearMiniId;
+
+        log = U.logger(ctx, logRef, GridDhtTxPrepareFuture.class);
+
+        dhtMap = tx.dhtMap();
+        nearMap = tx.nearMap();
+
+        assert dhtMap != null;
+        assert nearMap != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Near mini future id.
+     */
+    public IgniteUuid nearMiniId() {
+        return nearMiniId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return tx.xidVersion();
+    }
+
+    /**
+     * @return Involved nodes.
+     */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        return
+            F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, 
ClusterNode>() {
+                @Nullable @Override public ClusterNode apply(IgniteFuture<?> 
f) {
+                    if (isMini(f))
+                        return ((MiniFuture)f).node();
+
+                    return cctx.discovery().localNode();
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, 
GridCacheMvccCandidate<K> owner) {
+        if (log.isDebugEnabled())
+            log.debug("Transaction future received owner changed callback: " + 
entry);
+
+        boolean ret = tx.hasWriteKey(entry.txKey());
+
+        return ret && mapIfLocked();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return trackable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        trackable = false;
+    }
+
+    /**
+     * @return Transaction.
+     */
+    GridDhtTxLocalAdapter<K, V> tx() {
+        return tx;
+    }
+
+    /**
+     * @return {@code True} if all locks are owned.
+     */
+    private boolean checkLocks() {
+        for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) {
+            while (true) {
+                GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+                try {
+                    // Don't compare entry against itself.
+                    if (!cached.lockedLocally(tx.xidVersion())) {
+                        if (log.isDebugEnabled())
+                            log.debug("Transaction entry is not locked by 
transaction (will wait) [entry=" + cached +
+                                ", tx=" + tx + ']');
+
+                        return false;
+                    }
+
+                    break; // While.
+                }
+                // Possible if entry cached within transaction is obsolete.
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry in future onAllReplies 
method (will retry): " + txEntry);
+
+                    
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
+                }
+            }
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        for (IgniteFuture<?> fut : futures())
+            if (isMini(fut)) {
+                MiniFuture f = (MiniFuture)fut;
+
+                if (f.node().id().equals(nodeId)) {
+                    f.onResult(new ClusterTopologyException("Remote node left 
grid (will retry): " + nodeId));
+
+                    return true;
+                }
+            }
+
+        return false;
+    }
+
+    /**
+     * @param t Error.
+     */
+    public void onError(Throwable t) {
+        if (err.compareAndSet(null, t)) {
+            tx.setRollbackOnly();
+
+            // TODO: GG-4005:
+            // TODO: as an improvement, at some point we must rollback right 
away.
+            // TODO: However, in this case need to make sure that reply is 
sent back
+            // TODO: even for non-existing transactions whenever finish 
request comes in.
+//            try {
+//                tx.rollback();
+//            }
+//            catch (IgniteCheckedException ex) {
+//                U.error(log, "Failed to automatically rollback transaction: 
" + tx, ex);
+//            }
+//
+            // If not local node.
+            if (!tx.nearNodeId().equals(cctx.localNodeId())) {
+                // Send reply back to near node.
+                GridCacheMessage<K, V> res = new 
GridNearTxPrepareResponse<>(tx.nearXidVersion(), tx.nearFutureId(),
+                    nearMiniId, tx.xidVersion(), 
Collections.<Integer>emptySet(), t);
+
+                try {
+                    cctx.io().send(tx.nearNodeId(), res, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send reply to originating near 
node (will rollback): " + tx.nearNodeId(), e);
+
+                    tx.rollbackAsync();
+                }
+            }
+
+            onComplete();
+        }
+    }
+
+    /**
+     * @param nodeId Sender.
+     * @param res Result.
+     */
+    public void onResult(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) {
+        if (!isDone()) {
+            for (IgniteFuture<IgniteTxEx<K, V>> fut : pending()) {
+                if (isMini(fut)) {
+                    MiniFuture f = (MiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId())) {
+                        assert f.node().id().equals(nodeId);
+
+                        f.onResult(res);
+
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Marks all locks as ready for local transaction.
+     */
+    private void readyLocks() {
+        // Ready all locks.
+        if (log.isDebugEnabled())
+            log.debug("Marking all local candidates as ready: " + this);
+
+        Iterable<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ?
+            Collections.singletonList(tx.groupLockEntry()) : writes;
+
+        for (IgniteTxEntry<K, V> txEntry : checkEntries) {
+            if (txEntry.cached().isLocal())
+                continue;
+
+            while (true) {
+                GridDistributedCacheEntry<K, V> entry = 
(GridDistributedCacheEntry<K, V>)txEntry.cached();
+
+                try {
+                    GridCacheMvccCandidate<K> c = 
entry.readyLock(tx.xidVersion());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Current lock owner for entry [owner=" + c + 
", entry=" + entry + ']');
+
+                    break; // While.
+                }
+                // Possible if entry cached within transaction is obsolete.
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry in future onAllReplies 
method (will retry): " + txEntry);
+
+                    
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
+                }
+            }
+        }
+    }
+
+    /**
+     * Checks if all ready locks are acquired and sends requests to remote 
nodes in this case.
+     *
+     * @return {@code True} if all locks are acquired, {@code false} otherwise.
+     */
+    private boolean mapIfLocked() {
+        if (checkLocks()) {
+            prepare0();
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(IgniteTxEx<K, V> tx0, Throwable err) {
+        assert err != null || (initialized() && !hasPending()) : "On done 
called for prepare future that has " +
+            "pending mini futures: " + this;
+
+        this.err.compareAndSet(null, err);
+
+        if (replied.compareAndSet(false, true)) {
+            try {
+                // Must clear prepare future before response is sent or 
listeners are notified.
+                if (tx.optimistic())
+                    tx.clearPrepareFuture(this);
+
+                if (!tx.nearNodeId().equals(cctx.localNodeId())) {
+                    // Send reply back to originating near node.
+                    GridNearTxPrepareResponse<K, V> res = new 
GridNearTxPrepareResponse<>(tx.nearXidVersion(),
+                        tx.nearFutureId(), nearMiniId, tx.xidVersion(), 
tx.invalidPartitions(), this.err.get());
+
+                    addDhtValues(res);
+
+                    GridCacheVersion min = tx.minVersion();
+
+                    res.completedVersions(cctx.tm().committedVersions(min), 
cctx.tm().rolledbackVersions(min));
+
+                    res.pending(localDhtPendingVersions(tx.writeEntries(), 
min));
+
+                    cctx.io().send(tx.nearNodeId(), res, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
+                }
+
+                return true;
+            }
+            catch (IgniteCheckedException e) {
+                onError(e);
+
+                return true;
+            }
+            finally {
+                // Will call super.onDone().
+                onComplete();
+            }
+        }
+        else {
+            // Other thread is completing future. Wait for it to complete.
+            try {
+                get();
+            }
+            catch (IgniteInterruptedException e) {
+                onError(new IgniteCheckedException("Got interrupted while 
waiting for replies to be sent.", e));
+            }
+            catch (IgniteCheckedException ignored) {
+                // No-op, get() was just synchronization.
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * @param res Response being sent.
+     */
+    private void addDhtValues(GridNearTxPrepareResponse<K, V> res) {
+        // Interceptor on near node needs old values to execute callbacks.
+        if (!F.isEmpty(writes)) {
+            for (IgniteTxEntry<K, V> e : writes) {
+                IgniteTxEntry<K, V> txEntry = tx.entry(e.txKey());
+
+                assert txEntry != null : "Missing tx entry for key [tx=" + tx 
+ ", key=" + e.txKey() + ']';
+
+                while (true) {
+                    try {
+                        GridCacheEntryEx<K, V> entry = txEntry.cached();
+
+                        GridCacheVersion dhtVer = entry.version();
+
+                        V val0 = null;
+                        byte[] valBytes0 = null;
+
+                        GridCacheValueBytes valBytesTuple = entry.valueBytes();
+
+                        if (!valBytesTuple.isNull()) {
+                            if (valBytesTuple.isPlain())
+                                val0 = (V) valBytesTuple.get();
+                            else
+                                valBytes0 = valBytesTuple.get();
+                        }
+                        else
+                            val0 = entry.rawGet();
+
+                        if (val0 != null || valBytes0 != null)
+                            res.addOwnedValue(txEntry.txKey(), dhtVer, val0, 
valBytes0);
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        // Retry.
+                    }
+                }
+            }
+        }
+
+        for (Map.Entry<IgniteTxKey<K>, GridCacheVersion> ver : 
dhtVerMap.entrySet()) {
+            IgniteTxEntry<K, V> txEntry = tx.entry(ver.getKey());
+
+            if (res.hasOwnedValue(ver.getKey()))
+                continue;
+
+            while (true) {
+                try {
+                    GridCacheEntryEx<K, V> entry = txEntry.cached();
+
+                    GridCacheVersion dhtVer = entry.version();
+
+                    if (ver.getValue() == null || 
!ver.getValue().equals(dhtVer)) {
+                        V val0 = null;
+                        byte[] valBytes0 = null;
+
+                        GridCacheValueBytes valBytesTuple = entry.valueBytes();
+
+                        if (!valBytesTuple.isNull()) {
+                            if (valBytesTuple.isPlain())
+                                val0 = (V)valBytesTuple.get();
+                            else
+                                valBytes0 = valBytesTuple.get();
+                        }
+                        else
+                            val0 = entry.rawGet();
+
+                        res.addOwnedValue(txEntry.txKey(), dhtVer, val0, 
valBytes0);
+                    }
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    // Retry.
+                }
+            }
+        }
+    }
+
+    /**
+     * @param f Future.
+     * @return {@code True} if mini-future.
+     */
+    private boolean isMini(IgniteFuture<?> f) {
+        return f.getClass().equals(MiniFuture.class);
+    }
+
+    /**
+     * Completeness callback.
+     *
+     * @return {@code True} if {@code done} flag was changed as a result of 
this call.
+     */
+    private boolean onComplete() {
+        if (last || tx.isSystemInvalidate())
+            tx.state(PREPARED);
+
+        if (super.onDone(tx, err.get())) {
+            // Don't forget to clean up.
+            cctx.mvcc().removeFuture(this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Completes this future.
+     */
+    public void complete() {
+        onComplete();
+    }
+
+    /**
+     * Initializes future.
+     *
+     * @param reads Read entries.
+     * @param writes Write entries.
+     * @param txNodes Transaction nodes mapping.
+     */
+    public void prepare(Iterable<IgniteTxEntry<K, V>> reads, 
Iterable<IgniteTxEntry<K, V>> writes,
+        Map<UUID, Collection<UUID>> txNodes) {
+        if (tx.empty()) {
+            tx.setRollbackOnly();
+
+            onDone(tx);
+        }
+
+        this.reads = reads;
+        this.writes = writes;
+        this.txNodes = txNodes;
+
+        readyLocks();
+
+        mapIfLocked();
+    }
+
+    /**
+     * @param backupId Backup node ID.
+     * @return {@code True} if backup node receives last prepare request for 
this transaction.
+     */
+    private boolean lastBackup(UUID backupId) {
+        return lastBackups != null && lastBackups.contains(backupId);
+    }
+
+    /**
+     *
+     */
+    private void prepare0() {
+        if (!mapped.compareAndSet(false, true))
+            return;
+
+        try {
+            Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new 
HashMap<>();
+            Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new 
HashMap<>();
+
+            boolean hasRemoteNodes = false;
+
+            // Assign keys to primary nodes.
+            if (!F.isEmpty(reads)) {
+                for (IgniteTxEntry<K, V> read : reads)
+                    hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, 
futNearMap);
+            }
+
+            if (!F.isEmpty(writes)) {
+                for (IgniteTxEntry<K, V> write : writes)
+                    hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, 
futNearMap);
+            }
+
+            if (isDone())
+                return;
+
+            tx.needsCompletedVersions(hasRemoteNodes);
+
+            // Create mini futures.
+            for (GridDistributedTxMapping<K, V> dhtMapping : 
futDhtMap.values()) {
+                assert !dhtMapping.empty();
+
+                ClusterNode n = dhtMapping.node();
+
+                assert !n.isLocal();
+
+                GridDistributedTxMapping<K, V> nearMapping = 
futNearMap.get(n.id());
+
+                MiniFuture fut = new MiniFuture(n.id(), dhtMap.get(n.id()), 
nearMap.get(n.id()));
+
+                add(fut); // Append new future.
+
+                Collection<IgniteTxEntry<K, V>> nearWrites = nearMapping == 
null ? null : nearMapping.writes();
+
+                GridDhtTxPrepareRequest<K, V> req = new 
GridDhtTxPrepareRequest<>(
+                    futId,
+                    fut.futureId(),
+                    tx.topologyVersion(),
+                    tx,
+                    dhtMapping.writes(),
+                    nearWrites,
+                    tx.groupLockKey(),
+                    tx.partitionLock(),
+                    txNodes,
+                    tx.nearXidVersion(),
+                    lastBackup(n.id()),
+                    tx.subjectId(),
+                    tx.taskNameHash());
+
+                int idx = 0;
+
+                for (IgniteTxEntry<K, V> entry : dhtMapping.writes()) {
+                    try {
+                        GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, 
V>)entry.cached();
+
+                        GridCacheMvccCandidate<K> added = 
cached.candidate(version());
+
+                        assert added != null || entry.groupLockEntry() : "Null 
candidate for non-group-lock entry " +
+                            "[added=" + added + ", entry=" + entry + ']';
+                        assert added == null || added.dhtLocal() : "Got 
non-dht-local candidate for prepare future" +
+                            "[added=" + added + ", entry=" + entry + ']';
+
+                        if (added.ownerVersion() != null)
+                            req.owned(entry.txKey(), added.ownerVersion());
+
+                        req.invalidateNearEntry(idx, cached.readerId(n.id()) 
!= null);
+
+                        if (cached.isNewLocked())
+                            req.markKeyForPreload(idx);
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        assert false : "Got removed exception on entry with 
dht local candidate: " + entry;
+                    }
+
+                    idx++;
+                }
+
+                if (!F.isEmpty(nearWrites)) {
+                    for (IgniteTxEntry<K, V> entry : nearWrites) {
+                        try {
+                            GridCacheMvccCandidate<K> added = 
entry.cached().candidate(version());
+
+                            assert added != null;
+                            assert added.dhtLocal();
+
+                            if (added.ownerVersion() != null)
+                                req.owned(entry.txKey(), added.ownerVersion());
+
+                            break;
+                        }
+                        catch (GridCacheEntryRemovedException ignore) {
+                            assert false : "Got removed exception on entry 
with dht local candidate: " + entry;
+                        }
+                    }
+                }
+
+                //noinspection TryWithIdenticalCatches
+                try {
+                    cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : 
SYSTEM_POOL);
+                }
+                catch (ClusterTopologyException e) {
+                    fut.onResult(e);
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onResult(e);
+                }
+            }
+
+            for (GridDistributedTxMapping<K, V> nearMapping : 
futNearMap.values()) {
+                if (!futDhtMap.containsKey(nearMapping.node().id())) {
+                    assert nearMapping.writes() != null;
+
+                    MiniFuture fut = new MiniFuture(nearMapping.node().id(), 
null, nearMapping);
+
+                    add(fut); // Append new future.
+
+                    GridDhtTxPrepareRequest<K, V> req = new 
GridDhtTxPrepareRequest<>(
+                        futId,
+                        fut.futureId(),
+                        tx.topologyVersion(),
+                        tx,
+                        null,
+                        nearMapping.writes(),
+                        tx.groupLockKey(),
+                        tx.partitionLock(),
+                        null,
+                        tx.nearXidVersion(),
+                        false,
+                        tx.subjectId(),
+                        tx.taskNameHash());
+
+                    for (IgniteTxEntry<K, V> entry : nearMapping.writes()) {
+                        try {
+                            GridCacheMvccCandidate<K> added = 
entry.cached().candidate(version());
+
+                            assert added != null || entry.groupLockEntry() : 
"Null candidate for non-group-lock entry " +
+                                "[added=" + added + ", entry=" + entry + ']';
+                            assert added == null || added.dhtLocal() : "Got 
non-dht-local candidate for prepare future" +
+                                "[added=" + added + ", entry=" + entry + ']';
+
+                            if (added != null && added.ownerVersion() != null)
+                                req.owned(entry.txKey(), added.ownerVersion());
+
+                            break;
+                        }
+                        catch (GridCacheEntryRemovedException ignore) {
+                            assert false : "Got removed exception on entry 
with dht local candidate: " + entry;
+                        }
+                    }
+
+                    //noinspection TryWithIdenticalCatches
+                    try {
+                        cctx.io().send(nearMapping.node(), req, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
+                    }
+                    catch (ClusterTopologyException e) {
+                        fut.onResult(e);
+                    }
+                    catch (IgniteCheckedException e) {
+                        fut.onResult(e);
+                    }
+                }
+            }
+        }
+        finally {
+            markInitialized();
+        }
+    }
+
+    /**
+     * @param entry Transaction entry.
+     * @param futDhtMap DHT mapping.
+     * @param futNearMap Near mapping.
+     * @return {@code True} if mapped.
+     */
+    private boolean map(
+        IgniteTxEntry<K, V> entry,
+        Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap,
+        Map<UUID, GridDistributedTxMapping<K, V>> futNearMap) {
+        if (entry.cached().isLocal())
+            return false;
+
+        GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, 
V>)entry.cached();
+
+        boolean ret;
+
+        GridCacheContext<K, V> cacheCtx = entry.context();
+
+        GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? 
cacheCtx.near().dht() : cacheCtx.dht();
+
+        while (true) {
+            try {
+                Collection<ClusterNode> dhtNodes = 
dht.topology().nodes(cached.partition(), tx.topologyVersion());
+
+                if (log.isDebugEnabled())
+                    log.debug("Mapping entry to DHT nodes [nodes=" + 
U.toShortString(dhtNodes) +
+                        ", entry=" + entry + ']');
+
+                Collection<UUID> readers = cached.readers();
+
+                Collection<ClusterNode> nearNodes = null;
+
+                if (!F.isEmpty(readers)) {
+                    nearNodes = cctx.discovery().nodes(readers, 
F0.not(F.idForNodeId(tx.nearNodeId())));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Mapping entry to near nodes [nodes=" + 
U.toShortString(nearNodes) +
+                            ", entry=" + entry + ']');
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Entry has no near readers: " + entry);
+
+                // Exclude local node.
+                ret = map(entry, F.view(dhtNodes, 
F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap);
+
+                // Exclude DHT nodes.
+                ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), 
nearMap, futNearMap);
+
+                break;
+            }
+            catch (GridCacheEntryRemovedException ignore) {
+                cached = dht.entryExx(entry.key());
+
+                entry.cached(cached, cached.keyBytes());
+            }
+        }
+
+        return ret;
+    }
+
+    /**
+     * @param entry Entry.
+     * @param nodes Nodes.
+     * @param globalMap Map.
+     * @param locMap Exclude map.
+     * @return {@code True} if mapped.
+     */
+    private boolean map(IgniteTxEntry<K, V> entry, Iterable<ClusterNode> nodes,
+        Map<UUID, GridDistributedTxMapping<K, V>> globalMap, Map<UUID, 
GridDistributedTxMapping<K, V>> locMap) {
+        boolean ret = false;
+
+        if (nodes != null) {
+            for (ClusterNode n : nodes) {
+                GridDistributedTxMapping<K, V> global = globalMap.get(n.id());
+
+                if (global == null)
+                    globalMap.put(n.id(), global = new 
GridDistributedTxMapping<>(n));
+
+                global.add(entry);
+
+                GridDistributedTxMapping<K, V> loc = locMap.get(n.id());
+
+                if (loc == null)
+                    locMap.put(n.id(), loc = new 
GridDistributedTxMapping<>(n));
+
+                loc.add(entry);
+
+                ret = true;
+            }
+        }
+
+        return ret;
+    }
+
+    /**
+     * Collects versions of pending candidates versions less than base.
+     *
+     * @param entries Tx entries to process.
+     * @param baseVer Base version.
+     * @return Collection of pending candidates versions.
+     */
+    private Collection<GridCacheVersion> 
localDhtPendingVersions(Iterable<IgniteTxEntry<K, V>> entries,
+        GridCacheVersion baseVer) {
+        Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
+
+        for (IgniteTxEntry<K, V> entry : entries) {
+            try {
+                for (GridCacheMvccCandidate cand : 
entry.cached().localCandidates()) {
+                    if (cand.version().isLess(baseVer))
+                        lessPending.add(cand.version());
+                }
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op, no candidates.
+            }
+        }
+
+        return lessPending;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxPrepareFuture.class, this, "super", 
super.toString());
+    }
+
+    /**
+     * Mini-future for get operations. Mini-futures are only waiting on a 
single
+     * node as opposed to multiple nodes.
+     */
+    private class MiniFuture extends GridFutureAdapter<IgniteTxEx<K, V>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /** Node ID. */
+        private UUID nodeId;
+
+        /** DHT mapping. */
+        @GridToStringInclude
+        private GridDistributedTxMapping<K, V> dhtMapping;
+
+        /** Near mapping. */
+        @GridToStringInclude
+        private GridDistributedTxMapping<K, V> nearMapping;
+
+        /**
+         * Empty constructor required for {@link Externalizable}.
+         */
+        public MiniFuture() {
+            super(cctx.kernalContext());
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param dhtMapping Mapping.
+         * @param nearMapping nearMapping.
+         */
+        MiniFuture(UUID nodeId, GridDistributedTxMapping<K, V> dhtMapping, 
GridDistributedTxMapping<K, V> nearMapping) {
+            super(cctx.kernalContext());
+
+            assert dhtMapping == null || nearMapping == null || 
dhtMapping.node() == nearMapping.node();
+
+            this.nodeId = nodeId;
+            this.dhtMapping = dhtMapping;
+            this.nearMapping = nearMapping;
+        }
+
+        /**
+         * @return Future ID.
+         */
+        IgniteUuid futureId() {
+            return futId;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public ClusterNode node() {
+            return dhtMapping != null ? dhtMapping.node() : nearMapping.node();
+        }
+
+        /**
+         * @param e Error.
+         */
+        void onResult(Throwable e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to get future result [fut=" + this + ", 
err=" + e + ']');
+
+            // Fail.
+            onDone(e);
+        }
+
+        /**
+         * @param e Node failure.
+         */
+        void onResult(ClusterTopologyException e) {
+            if (log.isDebugEnabled())
+                log.debug("Remote node left grid while sending or waiting for 
reply (will ignore): " + this);
+
+            if (tx != null)
+                tx.removeMapping(nodeId);
+
+            onDone(tx);
+        }
+
+        /**
+         * @param res Result callback.
+         */
+        void onResult(GridDhtTxPrepareResponse<K, V> res) {
+            if (res.error() != null)
+                // Fail the whole compound future.
+                onError(res.error());
+            else {
+                // Process evicted readers (no need to remap).
+                if (nearMapping != null && !F.isEmpty(res.nearEvicted())) {
+                    nearMapping.evictReaders(res.nearEvicted());
+
+                    for (IgniteTxEntry<K, V> entry : nearMapping.entries()) {
+                        if (res.nearEvicted().contains(entry.txKey())) {
+                            while (true) {
+                                try {
+                                    GridDhtCacheEntry<K, V> cached = 
(GridDhtCacheEntry<K, V>)entry.cached();
+
+                                    
cached.removeReader(nearMapping.node().id(), res.messageId());
+
+                                    break;
+                                }
+                                catch (GridCacheEntryRemovedException ignore) {
+                                    GridCacheEntryEx<K, V> e = 
entry.context().cache().peekEx(entry.key());
+
+                                    if (e == null)
+                                        break;
+
+                                    entry.cached(e, entry.keyBytes());
+                                }
+                            }
+                        }
+                    }
+                }
+
+                // Process invalid partitions (no need to remap).
+                if (!F.isEmpty(res.invalidPartitions())) {
+                    for (Iterator<IgniteTxEntry<K, V>> it = 
dhtMapping.entries().iterator(); it.hasNext();) {
+                        IgniteTxEntry<K, V> entry  = it.next();
+
+                        if 
(res.invalidPartitions().contains(entry.cached().partition())) {
+                            it.remove();
+
+                            if (log.isDebugEnabled())
+                                log.debug("Removed mapping for entry from dht 
mapping [key=" + entry.key() +
+                                    ", tx=" + tx + ", dhtMapping=" + 
dhtMapping + ']');
+                        }
+                    }
+
+                    if (dhtMapping.empty()) {
+                        dhtMap.remove(nodeId);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Removed mapping for node entirely 
because all partitions are invalid [nodeId=" +
+                                nodeId + ", tx=" + tx + ']');
+                    }
+                }
+
+                long topVer = tx.topologyVersion();
+
+                boolean rec = 
cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED);
+
+                for (GridCacheEntryInfo<K, V> info : res.preloadEntries()) {
+                    GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(info.cacheId());
+
+                    while (true) {
+                        GridCacheEntryEx<K, V> entry = 
cacheCtx.cache().entryEx(info.key());
+
+                        GridDrType drType = cacheCtx.isDrEnabled() ? 
GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
+
+                        try {
+                            if (entry.initialValue(info.value(), 
info.valueBytes(), info.version(),
+                                info.ttl(), info.expireTime(), true, topVer, 
drType)) {
+                                if (rec && !entry.isInternal())
+                                    
cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
+                                        (IgniteUuid)null, null, 
EVT_CACHE_PRELOAD_OBJECT_LOADED, info.value(), true, null,
+                                        false, null, null, null);
+                            }
+
+                            break;
+                        }
+                        catch (IgniteCheckedException e) {
+                            // Fail the whole thing.
+                            onDone(e);
+
+                            return;
+                        }
+                        catch (GridCacheEntryRemovedException ignore) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to set entry initial value 
(entry is obsolete, " +
+                                    "will retry): " + entry);
+                        }
+                    }
+                }
+
+                // Finish mini future.
+                onDone(tx);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MiniFuture.class, this, "done", isDone(), 
"cancelled", isCancelled(), "err", error());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
new file mode 100644
index 0000000..b458b5f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * DHT prepare request.
+ */
+public class GridDhtTxPrepareRequest<K, V> extends 
GridDistributedTxPrepareRequest<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Max order. */
+    private UUID nearNodeId;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Mini future ID. */
+    private IgniteUuid miniId;
+
+    /** Topology version. */
+    private long topVer;
+
+    /** Invalidate near entries flags. */
+    private BitSet invalidateNearEntries;
+
+    /** Near writes. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Collection<IgniteTxEntry<K, V>> nearWrites;
+
+    /** Serialized near writes. */
+    @GridDirectCollection(byte[].class)
+    private Collection<byte[]> nearWritesBytes;
+
+    /** Owned versions by key. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<IgniteTxKey<K>, GridCacheVersion> owned;
+
+    /** Owned versions bytes. */
+    private byte[] ownedBytes;
+
+    /** Near transaction ID. */
+    private GridCacheVersion nearXidVer;
+
+    /** {@code True} if this is last prepare request for node. */
+    private boolean last;
+
+    /** Subject ID. */
+    @GridDirectVersion(1)
+    private UUID subjId;
+
+    /** Task name hash. */
+    @GridDirectVersion(2)
+    private int taskNameHash;
+
+    @GridDirectVersion(3)
+    /** Preload keys. */
+    private BitSet preloadKeys;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtTxPrepareRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param miniId Mini future ID.
+     * @param topVer Topology version.
+     * @param tx Transaction.
+     * @param dhtWrites DHT writes.
+     * @param nearWrites Near writes.
+     * @param grpLockKey Group lock key if preparing group-lock transaction.
+     * @param partLock {@code True} if group-lock transaction locks partition.
+     * @param txNodes Transaction nodes mapping.
+     * @param nearXidVer Near transaction ID.
+     * @param last {@code True} if this is last prepare request for node.
+     */
+    public GridDhtTxPrepareRequest(
+        IgniteUuid futId,
+        IgniteUuid miniId,
+        long topVer,
+        GridDhtTxLocalAdapter<K, V> tx,
+        Collection<IgniteTxEntry<K, V>> dhtWrites,
+        Collection<IgniteTxEntry<K, V>> nearWrites,
+        IgniteTxKey grpLockKey,
+        boolean partLock,
+        Map<UUID, Collection<UUID>> txNodes,
+        GridCacheVersion nearXidVer,
+        boolean last,
+        UUID subjId,
+        int taskNameHash) {
+        super(tx, null, dhtWrites, grpLockKey, partLock, txNodes);
+
+        assert futId != null;
+        assert miniId != null;
+
+        this.topVer = topVer;
+        this.futId = futId;
+        this.nearWrites = nearWrites;
+        this.miniId = miniId;
+        this.nearXidVer = nearXidVer;
+        this.last = last;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+
+        invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : 
dhtWrites.size());
+
+        nearNodeId = tx.nearNodeId();
+    }
+
+    /**
+     * @return {@code True} if this is last prepare request for node.
+     */
+    public boolean last() {
+        return last;
+    }
+
+    /**
+     * @return Near transaction ID.
+     */
+    public GridCacheVersion nearXidVersion() {
+        return nearXidVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allowForStartup() {
+        return true;
+    }
+
+    /**
+     * @return Near node ID.
+     */
+    public UUID nearNodeId() {
+        return nearNodeId;
+    }
+
+    /**
+     * @return Subject ID.
+     */
+    @Nullable public UUID subjectId() {
+        return subjId;
+    }
+
+    /**
+     * @return Task name hash.
+     */
+    public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /**
+     * @return Near writes.
+     */
+    public Collection<IgniteTxEntry<K, V>> nearWrites() {
+        return nearWrites == null ? Collections.<IgniteTxEntry<K, 
V>>emptyList() : nearWrites;
+    }
+
+    /**
+     * @param idx Entry index to set invalidation flag.
+     * @param invalidate Invalidation flag value.
+     */
+    public void invalidateNearEntry(int idx, boolean invalidate) {
+        invalidateNearEntries.set(idx, invalidate);
+    }
+
+    /**
+     * @param idx Index to get invalidation flag value.
+     * @return Invalidation flag value.
+     */
+    public boolean invalidateNearEntry(int idx) {
+        return invalidateNearEntries.get(idx);
+    }
+
+    /**
+     * Marks last added key for preloading.
+     */
+    public void markKeyForPreload(int idx) {
+        if (preloadKeys == null)
+            preloadKeys = new BitSet();
+
+        preloadKeys.set(idx, true);
+    }
+
+    /**
+     * Checks whether entry info should be sent to primary node from backup.
+     *
+     * @param idx Index.
+     * @return {@code True} if value should be sent, {@code false} otherwise.
+     */
+    public boolean needPreloadKey(int idx) {
+        return preloadKeys != null && preloadKeys.get(idx);
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Mini future ID.
+     */
+    public IgniteUuid miniId() {
+        return miniId;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    @Override public long topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * Sets owner and its mapped version.
+     *
+     * @param key Key.
+     * @param ownerMapped Owner mapped version.
+     */
+    public void owned(IgniteTxKey<K> key, GridCacheVersion ownerMapped) {
+        if (owned == null)
+            owned = new GridLeanMap<>(3);
+
+        owned.put(key, ownerMapped);
+    }
+
+    /**
+     * @return Owned versions map.
+     */
+    public Map<IgniteTxKey<K>, GridCacheVersion> owned() {
+        return owned;
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (ownedBytes == null && owned != null) {
+            ownedBytes = CU.marshal(ctx, owned);
+
+            if (ctx.deploymentEnabled()) {
+                for (IgniteTxKey<K> k : owned.keySet())
+                    prepareObject(k, ctx);
+            }
+        }
+
+        if (nearWrites != null) {
+            marshalTx(nearWrites, ctx);
+
+            nearWritesBytes = new ArrayList<>(nearWrites.size());
+
+            for (IgniteTxEntry<K, V> e : nearWrites)
+                nearWritesBytes.add(ctx.marshaller().marshal(e));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (ownedBytes != null && owned == null)
+            owned = ctx.marshaller().unmarshal(ownedBytes, ldr);
+
+        if (nearWritesBytes != null) {
+            nearWrites = new ArrayList<>(nearWritesBytes.size());
+
+            for (byte[] arr : nearWritesBytes)
+                nearWrites.add(ctx.marshaller().<IgniteTxEntry<K, 
V>>unmarshal(arr, ldr));
+
+            unmarshalTx(nearWrites, true, ctx, ldr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxPrepareRequest.class, this, "super", 
super.toString());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDhtTxPrepareRequest _clone = new GridDhtTxPrepareRequest();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridDhtTxPrepareRequest _clone = (GridDhtTxPrepareRequest)_msg;
+
+        _clone.nearNodeId = nearNodeId;
+        _clone.futId = futId;
+        _clone.miniId = miniId;
+        _clone.topVer = topVer;
+        _clone.invalidateNearEntries = invalidateNearEntries;
+        _clone.nearWrites = nearWrites;
+        _clone.nearWritesBytes = nearWritesBytes;
+        _clone.owned = owned;
+        _clone.ownedBytes = ownedBytes;
+        _clone.nearXidVer = nearXidVer;
+        _clone.last = last;
+        _clone.subjId = subjId;
+        _clone.taskNameHash = taskNameHash;
+        _clone.preloadKeys = preloadKeys;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 22:
+                if (!commState.putGridUuid(futId))
+                    return false;
+
+                commState.idx++;
+
+            case 23:
+                if (!commState.putBitSet(invalidateNearEntries))
+                    return false;
+
+                commState.idx++;
+
+            case 24:
+                if (!commState.putBoolean(last))
+                    return false;
+
+                commState.idx++;
+
+            case 25:
+                if (!commState.putGridUuid(miniId))
+                    return false;
+
+                commState.idx++;
+
+            case 26:
+                if (!commState.putUuid(nearNodeId))
+                    return false;
+
+                commState.idx++;
+
+            case 27:
+                if (nearWritesBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(nearWritesBytes.size()))
+                            return false;
+
+                        commState.it = nearWritesBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 28:
+                if (!commState.putCacheVersion(nearXidVer))
+                    return false;
+
+                commState.idx++;
+
+            case 29:
+                if (!commState.putByteArray(ownedBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 30:
+                if (!commState.putLong(topVer))
+                    return false;
+
+                commState.idx++;
+
+            case 31:
+                if (!commState.putUuid(subjId))
+                    return false;
+
+                commState.idx++;
+
+            case 32:
+                if (!commState.putInt(taskNameHash))
+                    return false;
+
+                commState.idx++;
+
+            case 33:
+                if (!commState.putBitSet(preloadKeys))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 22:
+                IgniteUuid futId0 = commState.getGridUuid();
+
+                if (futId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                futId = futId0;
+
+                commState.idx++;
+
+            case 23:
+                BitSet invalidateNearEntries0 = commState.getBitSet();
+
+                if (invalidateNearEntries0 == BIT_SET_NOT_READ)
+                    return false;
+
+                invalidateNearEntries = invalidateNearEntries0;
+
+                commState.idx++;
+
+            case 24:
+                if (buf.remaining() < 1)
+                    return false;
+
+                last = commState.getBoolean();
+
+                commState.idx++;
+
+            case 25:
+                IgniteUuid miniId0 = commState.getGridUuid();
+
+                if (miniId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                miniId = miniId0;
+
+                commState.idx++;
+
+            case 26:
+                UUID nearNodeId0 = commState.getUuid();
+
+                if (nearNodeId0 == UUID_NOT_READ)
+                    return false;
+
+                nearNodeId = nearNodeId0;
+
+                commState.idx++;
+
+            case 27:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (nearWritesBytes == null)
+                        nearWritesBytes = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        nearWritesBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 28:
+                GridCacheVersion nearXidVer0 = commState.getCacheVersion();
+
+                if (nearXidVer0 == CACHE_VER_NOT_READ)
+                    return false;
+
+                nearXidVer = nearXidVer0;
+
+                commState.idx++;
+
+            case 29:
+                byte[] ownedBytes0 = commState.getByteArray();
+
+                if (ownedBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                ownedBytes = ownedBytes0;
+
+                commState.idx++;
+
+            case 30:
+                if (buf.remaining() < 8)
+                    return false;
+
+                topVer = commState.getLong();
+
+                commState.idx++;
+
+            case 31:
+                UUID subjId0 = commState.getUuid();
+
+                if (subjId0 == UUID_NOT_READ)
+                    return false;
+
+                subjId = subjId0;
+
+                commState.idx++;
+
+            case 32:
+                if (buf.remaining() < 4)
+                    return false;
+
+                taskNameHash = commState.getInt();
+
+                commState.idx++;
+
+            case 33:
+                BitSet preloadKeys0 = commState.getBitSet();
+
+                if (preloadKeys0 == BIT_SET_NOT_READ)
+                    return false;
+
+                preloadKeys = preloadKeys0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 33;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
new file mode 100644
index 0000000..0c2bfd1
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * DHT transaction prepare response.
+ */
+public class GridDhtTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareResponse<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Evicted readers. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Collection<IgniteTxKey<K>> nearEvicted;
+
+    /** */
+    @GridDirectCollection(byte[].class)
+    private Collection<byte[]> nearEvictedBytes;
+
+    /** Future ID.  */
+    private IgniteUuid futId;
+
+    /** Mini future ID. */
+    private IgniteUuid miniId;
+
+    /** Invalid partitions. */
+    @GridToStringInclude
+    @GridDirectCollection(int.class)
+    private Collection<Integer> invalidParts;
+
+    @GridDirectTransient
+    /** Preload entries. */
+    private List<GridCacheEntryInfo<K, V>> preloadEntries;
+
+    /** */
+    @GridDirectCollection(byte[].class)
+    @GridDirectVersion(1)
+    private List<byte[]> preloadEntriesBytes;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridDhtTxPrepareResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param xid Xid version.
+     * @param futId Future ID.
+     * @param miniId Mini future ID.
+     */
+    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, 
IgniteUuid miniId) {
+        super(xid);
+
+        assert futId != null;
+        assert miniId != null;
+
+        this.futId = futId;
+        this.miniId = miniId;
+    }
+
+    /**
+     * @param xid Xid version.
+     * @param futId Future ID.
+     * @param miniId Mini future ID.
+     * @param err Error.
+     */
+    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, 
IgniteUuid miniId, Throwable err) {
+        super(xid, err);
+
+        assert futId != null;
+        assert miniId != null;
+
+        this.futId = futId;
+        this.miniId = miniId;
+    }
+
+    /**
+     * @return Evicted readers.
+     */
+    public Collection<IgniteTxKey<K>> nearEvicted() {
+        return nearEvicted;
+    }
+
+    /**
+     * @param nearEvicted Evicted readers.
+     */
+    public void nearEvicted(Collection<IgniteTxKey<K>> nearEvicted) {
+        this.nearEvicted = nearEvicted;
+    }
+
+    /**
+     * @param nearEvictedBytes Near evicted bytes.
+     */
+    public void nearEvictedBytes(Collection<byte[]> nearEvictedBytes) {
+        this.nearEvictedBytes = nearEvictedBytes;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Mini future ID.
+     */
+    public IgniteUuid miniId() {
+        return miniId;
+    }
+
+    /**
+     * @return Invalid partitions.
+     */
+    public Collection<Integer> invalidPartitions() {
+        return invalidParts;
+    }
+
+    /**
+     * @param invalidParts Invalid partitions.
+     */
+    public void invalidPartitions(Collection<Integer> invalidParts) {
+        this.invalidParts = invalidParts;
+    }
+
+    /**
+     * Gets preload entries found on backup node.
+     *
+     * @return Collection of entry infos need to be preloaded.
+     */
+    public Collection<GridCacheEntryInfo<K, V>> preloadEntries() {
+        return preloadEntries == null ? Collections.<GridCacheEntryInfo<K, 
V>>emptyList() : preloadEntries;
+    }
+
+    /**
+     * Adds preload entry.
+     *
+     * @param info Info to add.
+     */
+    public void addPreloadEntry(GridCacheEntryInfo<K, V> info) {
+        if (preloadEntries == null)
+            preloadEntries = new ArrayList<>();
+
+        preloadEntries.add(info);
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (nearEvictedBytes == null)
+            nearEvictedBytes = marshalCollection(nearEvicted, ctx);
+
+        if (preloadEntriesBytes == null && preloadEntries != null)
+            preloadEntriesBytes = marshalCollection(preloadEntries, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        // Unmarshal even if deployment is disabled, since we could get bytes 
initially.
+        if (nearEvicted == null && nearEvictedBytes != null)
+            nearEvicted = unmarshalCollection(nearEvictedBytes, ctx, ldr);
+
+        if (preloadEntries == null && preloadEntriesBytes != null)
+            preloadEntries = unmarshalCollection(preloadEntriesBytes, ctx, 
ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxPrepareResponse.class, this, "super", 
super.toString());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDhtTxPrepareResponse _clone = new GridDhtTxPrepareResponse();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridDhtTxPrepareResponse _clone = (GridDhtTxPrepareResponse)_msg;
+
+        _clone.nearEvicted = nearEvicted;
+        _clone.nearEvictedBytes = nearEvictedBytes;
+        _clone.futId = futId;
+        _clone.miniId = miniId;
+        _clone.invalidParts = invalidParts;
+        _clone.preloadEntries = preloadEntries;
+        _clone.preloadEntriesBytes = preloadEntriesBytes;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 10:
+                if (!commState.putGridUuid(futId))
+                    return false;
+
+                commState.idx++;
+
+            case 11:
+                if (invalidParts != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(invalidParts.size()))
+                            return false;
+
+                        commState.it = invalidParts.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putInt((int)commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 12:
+                if (!commState.putGridUuid(miniId))
+                    return false;
+
+                commState.idx++;
+
+            case 13:
+                if (nearEvictedBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(nearEvictedBytes.size()))
+                            return false;
+
+                        commState.it = nearEvictedBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 14:
+                if (preloadEntriesBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(preloadEntriesBytes.size()))
+                            return false;
+
+                        commState.it = preloadEntriesBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 10:
+                IgniteUuid futId0 = commState.getGridUuid();
+
+                if (futId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                futId = futId0;
+
+                commState.idx++;
+
+            case 11:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (invalidParts == null)
+                        invalidParts = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        if (buf.remaining() < 4)
+                            return false;
+
+                        int _val = commState.getInt();
+
+                        invalidParts.add((Integer)_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 12:
+                IgniteUuid miniId0 = commState.getGridUuid();
+
+                if (miniId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                miniId = miniId0;
+
+                commState.idx++;
+
+            case 13:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (nearEvictedBytes == null)
+                        nearEvictedBytes = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        nearEvictedBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 14:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (preloadEntriesBytes == null)
+                        preloadEntriesBytes = new 
ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        preloadEntriesBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 34;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
new file mode 100644
index 0000000..524493d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+
+import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
+
+/**
+ * Transaction created by system implicitly on remote nodes.
+ */
+public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, 
V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Near node ID. */
+    private UUID nearNodeId;
+
+    /** Remote future ID. */
+    private IgniteUuid rmtFutId;
+
+    /** Near transaction ID. */
+    private GridCacheVersion nearXidVer;
+
+    /** Transaction nodes mapping (primary node -> related backup nodes). */
+    private Map<UUID, Collection<UUID>> txNodes;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtTxRemote() {
+        // No-op.
+    }
+
+    /**
+     * This constructor is meant for optimistic transactions.
+     *
+     * @param nearNodeId Near node ID.
+     * @param rmtFutId Remote future ID.
+     * @param nodeId Node ID.
+     * @param rmtThreadId Remote thread ID.
+     * @param topVer Topology version.
+     * @param xidVer XID version.
+     * @param commitVer Commit version.
+     * @param sys System flag.
+     * @param concurrency Concurrency level (should be pessimistic).
+     * @param isolation Transaction isolation.
+     * @param invalidate Invalidate flag.
+     * @param timeout Timeout.
+     * @param ctx Cache context.
+     * @param txSize Expected transaction size.
+     * @param grpLockKey Group lock key if this is a group-lock transaction.
+     * @param nearXidVer Near transaction ID.
+     * @param txNodes Transaction nodes mapping.
+     */
+    public GridDhtTxRemote(
+        GridCacheSharedContext<K, V> ctx,
+        UUID nearNodeId,
+        IgniteUuid rmtFutId,
+        UUID nodeId,
+        long rmtThreadId,
+        long topVer,
+        GridCacheVersion xidVer,
+        GridCacheVersion commitVer,
+        boolean sys,
+        IgniteTxConcurrency concurrency,
+        IgniteTxIsolation isolation,
+        boolean invalidate,
+        long timeout,
+        int txSize,
+        @Nullable IgniteTxKey grpLockKey,
+        GridCacheVersion nearXidVer,
+        Map<UUID, Collection<UUID>> txNodes,
+        @Nullable UUID subjId,
+        int taskNameHash
+    ) {
+        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, 
isolation, invalidate, timeout, txSize,
+            grpLockKey, subjId, taskNameHash);
+
+        assert nearNodeId != null;
+        assert rmtFutId != null;
+
+        this.nearNodeId = nearNodeId;
+        this.rmtFutId = rmtFutId;
+        this.nearXidVer = nearXidVer;
+        this.txNodes = txNodes;
+
+        readMap = Collections.emptyMap();
+
+        writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+
+        topologyVersion(topVer);
+    }
+
+    /**
+     * This constructor is meant for pessimistic transactions.
+     *
+     * @param nearNodeId Near node ID.
+     * @param rmtFutId Remote future ID.
+     * @param nodeId Node ID.
+     * @param nearXidVer Near transaction ID.
+     * @param rmtThreadId Remote thread ID.
+     * @param topVer Topology version.
+     * @param xidVer XID version.
+     * @param commitVer Commit version.
+     * @param sys System flag.
+     * @param concurrency Concurrency level (should be pessimistic).
+     * @param isolation Transaction isolation.
+     * @param invalidate Invalidate flag.
+     * @param timeout Timeout.
+     * @param ctx Cache context.
+     * @param txSize Expected transaction size.
+     * @param grpLockKey Group lock key if transaction is group-lock.
+     */
+    public GridDhtTxRemote(
+        GridCacheSharedContext<K, V> ctx,
+        UUID nearNodeId,
+        IgniteUuid rmtFutId,
+        UUID nodeId,
+        GridCacheVersion nearXidVer,
+        long rmtThreadId,
+        long topVer,
+        GridCacheVersion xidVer,
+        GridCacheVersion commitVer,
+        boolean sys,
+        IgniteTxConcurrency concurrency,
+        IgniteTxIsolation isolation,
+        boolean invalidate,
+        long timeout,
+        int txSize,
+        @Nullable IgniteTxKey grpLockKey,
+        @Nullable UUID subjId,
+        int taskNameHash
+    ) {
+        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, 
isolation, invalidate, timeout, txSize,
+            grpLockKey, subjId, taskNameHash);
+
+        assert nearNodeId != null;
+        assert rmtFutId != null;
+
+        this.nearXidVer = nearXidVer;
+        this.nearNodeId = nearNodeId;
+        this.rmtFutId = rmtFutId;
+
+        readMap = Collections.emptyMap();
+        writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+
+        topologyVersion(topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean dht() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID eventNodeId() {
+        return nearNodeId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<UUID> masterNodeIds() {
+        return Arrays.asList(nearNodeId, nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID otherNodeId() {
+        return nearNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean enforceSerializable() {
+        return false; // Serializable will be enforced on primary mode.
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion nearXidVersion() {
+        return nearXidVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<UUID, Collection<UUID>> transactionNodes() {
+        return txNodes;
+    }
+
+    /**
+     * @return Near node ID.
+     */
+    UUID nearNodeId() {
+        return nearNodeId;
+    }
+
+    /**
+     * @return Remote future ID.
+     */
+    IgniteUuid remoteFutureId() {
+        return rmtFutId;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean updateNearCache(GridCacheContext<K, V> 
cacheCtx, K key, long topVer) {
+        if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || 
cctx.localNodeId().equals(nearNodeId))
+            return false;
+
+        if (cacheCtx.config().getBackups() == 0)
+            return true;
+
+        // Check if we are on the backup node.
+        return !cacheCtx.affinity().backups(key, 
topVer).contains(cctx.localNode());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, 
int part) {
+        super.addInvalidPartition(cacheCtx, part);
+
+        for (Iterator<IgniteTxEntry<K, V>> it = writeMap.values().iterator(); 
it.hasNext();) {
+            IgniteTxEntry<K, V> e = it.next();
+
+            GridCacheEntryEx<K, V> cached = e.cached();
+
+            if (cached != null) {
+                if (cached.partition() == part)
+                    it.remove();
+            }
+            else if (cacheCtx.affinity().partition(e.key()) == part)
+                it.remove();
+        }
+    }
+
+    /**
+     * @param entry Write entry.
+     * @param ldr Class loader.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void addWrite(IgniteTxEntry<K, V> entry, ClassLoader ldr) throws 
IgniteCheckedException {
+        entry.unmarshal(cctx, false, ldr);
+
+        GridCacheContext<K, V> cacheCtx = entry.context();
+
+        try {
+            GridDhtCacheEntry<K, V> cached = 
cacheCtx.dht().entryExx(entry.key(), topologyVersion());
+
+            checkInternal(entry.txKey());
+
+            // Initialize cache entry.
+            entry.cached(cached, entry.keyBytes());
+
+            writeMap.put(entry.txKey(), entry);
+
+            addExplicit(entry);
+        }
+        catch (GridDhtInvalidPartitionException e) {
+            addInvalidPartition(cacheCtx, e.partition());
+        }
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param op Write operation.
+     * @param key Key to add to write set.
+     * @param keyBytes Key bytes.
+     * @param val Value.
+     * @param valBytes Value bytes.
+     * @param drVer Data center replication version.
+     * @param entryProcessors Entry processors.
+     * @param ttl TTL.
+     */
+    public void addWrite(GridCacheContext<K, V> cacheCtx,
+        GridCacheOperation op,
+        IgniteTxKey<K> key,
+        byte[] keyBytes,
+        @Nullable V val,
+        @Nullable byte[] valBytes,
+        @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> 
entryProcessors,
+        @Nullable GridCacheVersion drVer,
+        long ttl) {
+        checkInternal(key);
+
+        if (isSystemInvalidate())
+            return;
+
+        GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(key.key(), 
topologyVersion());
+
+        IgniteTxEntry<K, V> txEntry = new IgniteTxEntry<>(cacheCtx,
+            this,
+            op,
+            val,
+            ttl,
+            -1L,
+            cached,
+            drVer);
+
+        txEntry.keyBytes(keyBytes);
+        txEntry.valueBytes(valBytes);
+        txEntry.entryProcessors(entryProcessors);
+
+        writeMap.put(key, txEntry);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return GridToStringBuilder.toString(GridDhtTxRemote.class, this, 
"super", super.toString());
+    }
+}

Reply via email to