http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
deleted file mode 100644
index 9e43c68..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ /dev/null
@@ -1,1074 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.dr.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- *
- */
-public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFuture<IgniteTxEx<K, V>>
-    implements GridCacheMvccFuture<K, V, IgniteTxEx<K, V>> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
-
-    /** Context. */
-    private GridCacheSharedContext<K, V> cctx;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Transaction. */
-    @GridToStringExclude
-    private GridDhtTxLocalAdapter<K, V> tx;
-
-    /** Near mappings. */
-    private Map<UUID, GridDistributedTxMapping<K, V>> nearMap;
-
-    /** DHT mappings. */
-    private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Error. */
-    private AtomicReference<Throwable> err = new AtomicReference<>(null);
-
-    /** Replied flag. */
-    private AtomicBoolean replied = new AtomicBoolean(false);
-
-    /** All replies flag. */
-    private AtomicBoolean mapped = new AtomicBoolean(false);
-
-    /** Prepare reads. */
-    private Iterable<IgniteTxEntry<K, V>> reads;
-
-    /** Prepare writes. */
-    private Iterable<IgniteTxEntry<K, V>> writes;
-
-    /** Tx nodes. */
-    private Map<UUID, Collection<UUID>> txNodes;
-
-    /** Trackable flag. */
-    private boolean trackable = true;
-
-    /** Near mini future id. */
-    private IgniteUuid nearMiniId;
-
-    /** DHT versions map. */
-    private Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap;
-
-    /** {@code True} if this is last prepare operation for node. */
-    private boolean last;
-
-    /** IDs of backup nodes receiving last prepare request during this 
prepare. */
-    private Collection<UUID> lastBackups;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtTxPrepareFuture() {
-        // No-op.
-    }
-
-    /**
-     * @param cctx Context.
-     * @param tx Transaction.
-     * @param nearMiniId Near mini future id.
-     * @param dhtVerMap DHT versions map.
-     * @param last {@code True} if this is last prepare operation for node.
-     * @param lastBackups IDs of backup nodes receiving last prepare request 
during this prepare.
-     */
-    public GridDhtTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final 
GridDhtTxLocalAdapter<K, V> tx,
-        IgniteUuid nearMiniId, Map<IgniteTxKey<K>, GridCacheVersion> 
dhtVerMap, boolean last, Collection<UUID> lastBackups) {
-        super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, 
IgniteTxEx<K, V>>() {
-            @Override public boolean collect(IgniteTxEx<K, V> e) {
-                return true;
-            }
-
-            @Override public IgniteTxEx<K, V> reduce() {
-                // Nothing to aggregate.
-                return tx;
-            }
-        });
-
-        assert cctx != null;
-
-        this.cctx = cctx;
-        this.tx = tx;
-        this.dhtVerMap = dhtVerMap;
-        this.last = last;
-        this.lastBackups = lastBackups;
-
-        futId = IgniteUuid.randomUuid();
-
-        this.nearMiniId = nearMiniId;
-
-        log = U.logger(ctx, logRef, GridDhtTxPrepareFuture.class);
-
-        dhtMap = tx.dhtMap();
-        nearMap = tx.nearMap();
-
-        assert dhtMap != null;
-        assert nearMap != null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Near mini future id.
-     */
-    public IgniteUuid nearMiniId() {
-        return nearMiniId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return tx.xidVersion();
-    }
-
-    /**
-     * @return Involved nodes.
-     */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, 
ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteFuture<?> 
f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
-
-                    return cctx.discovery().localNode();
-                }
-            });
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, 
GridCacheMvccCandidate<K> owner) {
-        if (log.isDebugEnabled())
-            log.debug("Transaction future received owner changed callback: " + 
entry);
-
-        boolean ret = tx.hasWriteKey(entry.txKey());
-
-        return ret && mapIfLocked();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return trackable;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        trackable = false;
-    }
-
-    /**
-     * @return Transaction.
-     */
-    GridDhtTxLocalAdapter<K, V> tx() {
-        return tx;
-    }
-
-    /**
-     * @return {@code True} if all locks are owned.
-     */
-    private boolean checkLocks() {
-        for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) {
-            while (true) {
-                GridCacheEntryEx<K, V> cached = txEntry.cached();
-
-                try {
-                    // Don't compare entry against itself.
-                    if (!cached.lockedLocally(tx.xidVersion())) {
-                        if (log.isDebugEnabled())
-                            log.debug("Transaction entry is not locked by 
transaction (will wait) [entry=" + cached +
-                                ", tx=" + tx + ']');
-
-                        return false;
-                    }
-
-                    break; // While.
-                }
-                // Possible if entry cached within transaction is obsolete.
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry in future onAllReplies 
method (will retry): " + txEntry);
-
-                    
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
-                }
-            }
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        for (IgniteFuture<?> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.node().id().equals(nodeId)) {
-                    f.onResult(new ClusterTopologyException("Remote node left 
grid (will retry): " + nodeId));
-
-                    return true;
-                }
-            }
-
-        return false;
-    }
-
-    /**
-     * @param t Error.
-     */
-    public void onError(Throwable t) {
-        if (err.compareAndSet(null, t)) {
-            tx.setRollbackOnly();
-
-            // TODO: GG-4005:
-            // TODO: as an improvement, at some point we must rollback right 
away.
-            // TODO: However, in this case need to make sure that reply is 
sent back
-            // TODO: even for non-existing transactions whenever finish 
request comes in.
-//            try {
-//                tx.rollback();
-//            }
-//            catch (IgniteCheckedException ex) {
-//                U.error(log, "Failed to automatically rollback transaction: 
" + tx, ex);
-//            }
-//
-            // If not local node.
-            if (!tx.nearNodeId().equals(cctx.localNodeId())) {
-                // Send reply back to near node.
-                GridCacheMessage<K, V> res = new 
GridNearTxPrepareResponse<>(tx.nearXidVersion(), tx.nearFutureId(),
-                    nearMiniId, tx.xidVersion(), 
Collections.<Integer>emptySet(), t);
-
-                try {
-                    cctx.io().send(tx.nearNodeId(), res, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send reply to originating near 
node (will rollback): " + tx.nearNodeId(), e);
-
-                    tx.rollbackAsync();
-                }
-            }
-
-            onComplete();
-        }
-    }
-
-    /**
-     * @param nodeId Sender.
-     * @param res Result.
-     */
-    public void onResult(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) {
-        if (!isDone()) {
-            for (IgniteFuture<IgniteTxEx<K, V>> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
-
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.node().id().equals(nodeId);
-
-                        f.onResult(res);
-
-                        break;
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Marks all locks as ready for local transaction.
-     */
-    private void readyLocks() {
-        // Ready all locks.
-        if (log.isDebugEnabled())
-            log.debug("Marking all local candidates as ready: " + this);
-
-        Iterable<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ?
-            Collections.singletonList(tx.groupLockEntry()) : writes;
-
-        for (IgniteTxEntry<K, V> txEntry : checkEntries) {
-            if (txEntry.cached().isLocal())
-                continue;
-
-            while (true) {
-                GridDistributedCacheEntry<K, V> entry = 
(GridDistributedCacheEntry<K, V>)txEntry.cached();
-
-                try {
-                    GridCacheMvccCandidate<K> c = 
entry.readyLock(tx.xidVersion());
-
-                    if (log.isDebugEnabled())
-                        log.debug("Current lock owner for entry [owner=" + c + 
", entry=" + entry + ']');
-
-                    break; // While.
-                }
-                // Possible if entry cached within transaction is obsolete.
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry in future onAllReplies 
method (will retry): " + txEntry);
-
-                    
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
-                }
-            }
-        }
-    }
-
-    /**
-     * Checks if all ready locks are acquired and sends requests to remote 
nodes in this case.
-     *
-     * @return {@code True} if all locks are acquired, {@code false} otherwise.
-     */
-    private boolean mapIfLocked() {
-        if (checkLocks()) {
-            prepare0();
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(IgniteTxEx<K, V> tx0, Throwable err) {
-        assert err != null || (initialized() && !hasPending()) : "On done 
called for prepare future that has " +
-            "pending mini futures: " + this;
-
-        this.err.compareAndSet(null, err);
-
-        if (replied.compareAndSet(false, true)) {
-            try {
-                // Must clear prepare future before response is sent or 
listeners are notified.
-                if (tx.optimistic())
-                    tx.clearPrepareFuture(this);
-
-                if (!tx.nearNodeId().equals(cctx.localNodeId())) {
-                    // Send reply back to originating near node.
-                    GridNearTxPrepareResponse<K, V> res = new 
GridNearTxPrepareResponse<>(tx.nearXidVersion(),
-                        tx.nearFutureId(), nearMiniId, tx.xidVersion(), 
tx.invalidPartitions(), this.err.get());
-
-                    addDhtValues(res);
-
-                    GridCacheVersion min = tx.minVersion();
-
-                    res.completedVersions(cctx.tm().committedVersions(min), 
cctx.tm().rolledbackVersions(min));
-
-                    res.pending(localDhtPendingVersions(tx.writeEntries(), 
min));
-
-                    cctx.io().send(tx.nearNodeId(), res, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
-                }
-
-                return true;
-            }
-            catch (IgniteCheckedException e) {
-                onError(e);
-
-                return true;
-            }
-            finally {
-                // Will call super.onDone().
-                onComplete();
-            }
-        }
-        else {
-            // Other thread is completing future. Wait for it to complete.
-            try {
-                get();
-            }
-            catch (IgniteInterruptedException e) {
-                onError(new IgniteCheckedException("Got interrupted while 
waiting for replies to be sent.", e));
-            }
-            catch (IgniteCheckedException ignored) {
-                // No-op, get() was just synchronization.
-            }
-
-            return false;
-        }
-    }
-
-    /**
-     * @param res Response being sent.
-     */
-    private void addDhtValues(GridNearTxPrepareResponse<K, V> res) {
-        // Interceptor on near node needs old values to execute callbacks.
-        if (!F.isEmpty(writes)) {
-            for (IgniteTxEntry<K, V> e : writes) {
-                IgniteTxEntry<K, V> txEntry = tx.entry(e.txKey());
-
-                assert txEntry != null : "Missing tx entry for key [tx=" + tx 
+ ", key=" + e.txKey() + ']';
-
-                while (true) {
-                    try {
-                        GridCacheEntryEx<K, V> entry = txEntry.cached();
-
-                        GridCacheVersion dhtVer = entry.version();
-
-                        V val0 = null;
-                        byte[] valBytes0 = null;
-
-                        GridCacheValueBytes valBytesTuple = entry.valueBytes();
-
-                        if (!valBytesTuple.isNull()) {
-                            if (valBytesTuple.isPlain())
-                                val0 = (V) valBytesTuple.get();
-                            else
-                                valBytes0 = valBytesTuple.get();
-                        }
-                        else
-                            val0 = entry.rawGet();
-
-                        if (val0 != null || valBytes0 != null)
-                            res.addOwnedValue(txEntry.txKey(), dhtVer, val0, 
valBytes0);
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        // Retry.
-                    }
-                }
-            }
-        }
-
-        for (Map.Entry<IgniteTxKey<K>, GridCacheVersion> ver : 
dhtVerMap.entrySet()) {
-            IgniteTxEntry<K, V> txEntry = tx.entry(ver.getKey());
-
-            if (res.hasOwnedValue(ver.getKey()))
-                continue;
-
-            while (true) {
-                try {
-                    GridCacheEntryEx<K, V> entry = txEntry.cached();
-
-                    GridCacheVersion dhtVer = entry.version();
-
-                    if (ver.getValue() == null || 
!ver.getValue().equals(dhtVer)) {
-                        V val0 = null;
-                        byte[] valBytes0 = null;
-
-                        GridCacheValueBytes valBytesTuple = entry.valueBytes();
-
-                        if (!valBytesTuple.isNull()) {
-                            if (valBytesTuple.isPlain())
-                                val0 = (V)valBytesTuple.get();
-                            else
-                                valBytes0 = valBytesTuple.get();
-                        }
-                        else
-                            val0 = entry.rawGet();
-
-                        res.addOwnedValue(txEntry.txKey(), dhtVer, val0, 
valBytes0);
-                    }
-
-                    break;
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // Retry.
-                }
-            }
-        }
-    }
-
-    /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
-    }
-
-    /**
-     * Completeness callback.
-     *
-     * @return {@code True} if {@code done} flag was changed as a result of 
this call.
-     */
-    private boolean onComplete() {
-        if (last || tx.isSystemInvalidate())
-            tx.state(PREPARED);
-
-        if (super.onDone(tx, err.get())) {
-            // Don't forget to clean up.
-            cctx.mvcc().removeFuture(this);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Completes this future.
-     */
-    public void complete() {
-        onComplete();
-    }
-
-    /**
-     * Initializes future.
-     *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param txNodes Transaction nodes mapping.
-     */
-    public void prepare(Iterable<IgniteTxEntry<K, V>> reads, 
Iterable<IgniteTxEntry<K, V>> writes,
-        Map<UUID, Collection<UUID>> txNodes) {
-        if (tx.empty()) {
-            tx.setRollbackOnly();
-
-            onDone(tx);
-        }
-
-        this.reads = reads;
-        this.writes = writes;
-        this.txNodes = txNodes;
-
-        readyLocks();
-
-        mapIfLocked();
-    }
-
-    /**
-     * @param backupId Backup node ID.
-     * @return {@code True} if backup node receives last prepare request for 
this transaction.
-     */
-    private boolean lastBackup(UUID backupId) {
-        return lastBackups != null && lastBackups.contains(backupId);
-    }
-
-    /**
-     *
-     */
-    private void prepare0() {
-        if (!mapped.compareAndSet(false, true))
-            return;
-
-        try {
-            Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new 
HashMap<>();
-            Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new 
HashMap<>();
-
-            boolean hasRemoteNodes = false;
-
-            // Assign keys to primary nodes.
-            if (!F.isEmpty(reads)) {
-                for (IgniteTxEntry<K, V> read : reads)
-                    hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, 
futNearMap);
-            }
-
-            if (!F.isEmpty(writes)) {
-                for (IgniteTxEntry<K, V> write : writes)
-                    hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, 
futNearMap);
-            }
-
-            if (isDone())
-                return;
-
-            tx.needsCompletedVersions(hasRemoteNodes);
-
-            // Create mini futures.
-            for (GridDistributedTxMapping<K, V> dhtMapping : 
futDhtMap.values()) {
-                assert !dhtMapping.empty();
-
-                ClusterNode n = dhtMapping.node();
-
-                assert !n.isLocal();
-
-                GridDistributedTxMapping<K, V> nearMapping = 
futNearMap.get(n.id());
-
-                MiniFuture fut = new MiniFuture(n.id(), dhtMap.get(n.id()), 
nearMap.get(n.id()));
-
-                add(fut); // Append new future.
-
-                Collection<IgniteTxEntry<K, V>> nearWrites = nearMapping == 
null ? null : nearMapping.writes();
-
-                GridDhtTxPrepareRequest<K, V> req = new 
GridDhtTxPrepareRequest<>(
-                    futId,
-                    fut.futureId(),
-                    tx.topologyVersion(),
-                    tx,
-                    dhtMapping.writes(),
-                    nearWrites,
-                    tx.groupLockKey(),
-                    tx.partitionLock(),
-                    txNodes,
-                    tx.nearXidVersion(),
-                    lastBackup(n.id()),
-                    tx.subjectId(),
-                    tx.taskNameHash());
-
-                int idx = 0;
-
-                for (IgniteTxEntry<K, V> entry : dhtMapping.writes()) {
-                    try {
-                        GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, 
V>)entry.cached();
-
-                        GridCacheMvccCandidate<K> added = 
cached.candidate(version());
-
-                        assert added != null || entry.groupLockEntry() : "Null 
candidate for non-group-lock entry " +
-                            "[added=" + added + ", entry=" + entry + ']';
-                        assert added == null || added.dhtLocal() : "Got 
non-dht-local candidate for prepare future" +
-                            "[added=" + added + ", entry=" + entry + ']';
-
-                        if (added.ownerVersion() != null)
-                            req.owned(entry.txKey(), added.ownerVersion());
-
-                        req.invalidateNearEntry(idx, cached.readerId(n.id()) 
!= null);
-
-                        if (cached.isNewLocked())
-                            req.markKeyForPreload(idx);
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        assert false : "Got removed exception on entry with 
dht local candidate: " + entry;
-                    }
-
-                    idx++;
-                }
-
-                if (!F.isEmpty(nearWrites)) {
-                    for (IgniteTxEntry<K, V> entry : nearWrites) {
-                        try {
-                            GridCacheMvccCandidate<K> added = 
entry.cached().candidate(version());
-
-                            assert added != null;
-                            assert added.dhtLocal();
-
-                            if (added.ownerVersion() != null)
-                                req.owned(entry.txKey(), added.ownerVersion());
-
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            assert false : "Got removed exception on entry 
with dht local candidate: " + entry;
-                        }
-                    }
-                }
-
-                //noinspection TryWithIdenticalCatches
-                try {
-                    cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : 
SYSTEM_POOL);
-                }
-                catch (ClusterTopologyException e) {
-                    fut.onResult(e);
-                }
-                catch (IgniteCheckedException e) {
-                    fut.onResult(e);
-                }
-            }
-
-            for (GridDistributedTxMapping<K, V> nearMapping : 
futNearMap.values()) {
-                if (!futDhtMap.containsKey(nearMapping.node().id())) {
-                    assert nearMapping.writes() != null;
-
-                    MiniFuture fut = new MiniFuture(nearMapping.node().id(), 
null, nearMapping);
-
-                    add(fut); // Append new future.
-
-                    GridDhtTxPrepareRequest<K, V> req = new 
GridDhtTxPrepareRequest<>(
-                        futId,
-                        fut.futureId(),
-                        tx.topologyVersion(),
-                        tx,
-                        null,
-                        nearMapping.writes(),
-                        tx.groupLockKey(),
-                        tx.partitionLock(),
-                        null,
-                        tx.nearXidVersion(),
-                        false,
-                        tx.subjectId(),
-                        tx.taskNameHash());
-
-                    for (IgniteTxEntry<K, V> entry : nearMapping.writes()) {
-                        try {
-                            GridCacheMvccCandidate<K> added = 
entry.cached().candidate(version());
-
-                            assert added != null || entry.groupLockEntry() : 
"Null candidate for non-group-lock entry " +
-                                "[added=" + added + ", entry=" + entry + ']';
-                            assert added == null || added.dhtLocal() : "Got 
non-dht-local candidate for prepare future" +
-                                "[added=" + added + ", entry=" + entry + ']';
-
-                            if (added != null && added.ownerVersion() != null)
-                                req.owned(entry.txKey(), added.ownerVersion());
-
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            assert false : "Got removed exception on entry 
with dht local candidate: " + entry;
-                        }
-                    }
-
-                    //noinspection TryWithIdenticalCatches
-                    try {
-                        cctx.io().send(nearMapping.node(), req, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
-                    }
-                    catch (ClusterTopologyException e) {
-                        fut.onResult(e);
-                    }
-                    catch (IgniteCheckedException e) {
-                        fut.onResult(e);
-                    }
-                }
-            }
-        }
-        finally {
-            markInitialized();
-        }
-    }
-
-    /**
-     * @param entry Transaction entry.
-     * @param futDhtMap DHT mapping.
-     * @param futNearMap Near mapping.
-     * @return {@code True} if mapped.
-     */
-    private boolean map(
-        IgniteTxEntry<K, V> entry,
-        Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap,
-        Map<UUID, GridDistributedTxMapping<K, V>> futNearMap) {
-        if (entry.cached().isLocal())
-            return false;
-
-        GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, 
V>)entry.cached();
-
-        boolean ret;
-
-        GridCacheContext<K, V> cacheCtx = entry.context();
-
-        GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? 
cacheCtx.near().dht() : cacheCtx.dht();
-
-        while (true) {
-            try {
-                Collection<ClusterNode> dhtNodes = 
dht.topology().nodes(cached.partition(), tx.topologyVersion());
-
-                if (log.isDebugEnabled())
-                    log.debug("Mapping entry to DHT nodes [nodes=" + 
U.toShortString(dhtNodes) +
-                        ", entry=" + entry + ']');
-
-                Collection<UUID> readers = cached.readers();
-
-                Collection<ClusterNode> nearNodes = null;
-
-                if (!F.isEmpty(readers)) {
-                    nearNodes = cctx.discovery().nodes(readers, 
F0.not(F.idForNodeId(tx.nearNodeId())));
-
-                    if (log.isDebugEnabled())
-                        log.debug("Mapping entry to near nodes [nodes=" + 
U.toShortString(nearNodes) +
-                            ", entry=" + entry + ']');
-                }
-                else if (log.isDebugEnabled())
-                    log.debug("Entry has no near readers: " + entry);
-
-                // Exclude local node.
-                ret = map(entry, F.view(dhtNodes, 
F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap);
-
-                // Exclude DHT nodes.
-                ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), 
nearMap, futNearMap);
-
-                break;
-            }
-            catch (GridCacheEntryRemovedException ignore) {
-                cached = dht.entryExx(entry.key());
-
-                entry.cached(cached, cached.keyBytes());
-            }
-        }
-
-        return ret;
-    }
-
-    /**
-     * @param entry Entry.
-     * @param nodes Nodes.
-     * @param globalMap Map.
-     * @param locMap Exclude map.
-     * @return {@code True} if mapped.
-     */
-    private boolean map(IgniteTxEntry<K, V> entry, Iterable<ClusterNode> nodes,
-        Map<UUID, GridDistributedTxMapping<K, V>> globalMap, Map<UUID, 
GridDistributedTxMapping<K, V>> locMap) {
-        boolean ret = false;
-
-        if (nodes != null) {
-            for (ClusterNode n : nodes) {
-                GridDistributedTxMapping<K, V> global = globalMap.get(n.id());
-
-                if (global == null)
-                    globalMap.put(n.id(), global = new 
GridDistributedTxMapping<>(n));
-
-                global.add(entry);
-
-                GridDistributedTxMapping<K, V> loc = locMap.get(n.id());
-
-                if (loc == null)
-                    locMap.put(n.id(), loc = new 
GridDistributedTxMapping<>(n));
-
-                loc.add(entry);
-
-                ret = true;
-            }
-        }
-
-        return ret;
-    }
-
-    /**
-     * Collects versions of pending candidates versions less than base.
-     *
-     * @param entries Tx entries to process.
-     * @param baseVer Base version.
-     * @return Collection of pending candidates versions.
-     */
-    private Collection<GridCacheVersion> 
localDhtPendingVersions(Iterable<IgniteTxEntry<K, V>> entries,
-        GridCacheVersion baseVer) {
-        Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
-
-        for (IgniteTxEntry<K, V> entry : entries) {
-            try {
-                for (GridCacheMvccCandidate cand : 
entry.cached().localCandidates()) {
-                    if (cand.version().isLess(baseVer))
-                        lessPending.add(cand.version());
-                }
-            }
-            catch (GridCacheEntryRemovedException ignored) {
-                // No-op, no candidates.
-            }
-        }
-
-        return lessPending;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxPrepareFuture.class, this, "super", 
super.toString());
-    }
-
-    /**
-     * Mini-future for get operations. Mini-futures are only waiting on a 
single
-     * node as opposed to multiple nodes.
-     */
-    private class MiniFuture extends GridFutureAdapter<IgniteTxEx<K, V>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-        /** Node ID. */
-        private UUID nodeId;
-
-        /** DHT mapping. */
-        @GridToStringInclude
-        private GridDistributedTxMapping<K, V> dhtMapping;
-
-        /** Near mapping. */
-        @GridToStringInclude
-        private GridDistributedTxMapping<K, V> nearMapping;
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         */
-        public MiniFuture() {
-            super(cctx.kernalContext());
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param dhtMapping Mapping.
-         * @param nearMapping nearMapping.
-         */
-        MiniFuture(UUID nodeId, GridDistributedTxMapping<K, V> dhtMapping, 
GridDistributedTxMapping<K, V> nearMapping) {
-            super(cctx.kernalContext());
-
-            assert dhtMapping == null || nearMapping == null || 
dhtMapping.node() == nearMapping.node();
-
-            this.nodeId = nodeId;
-            this.dhtMapping = dhtMapping;
-            this.nearMapping = nearMapping;
-        }
-
-        /**
-         * @return Future ID.
-         */
-        IgniteUuid futureId() {
-            return futId;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        public ClusterNode node() {
-            return dhtMapping != null ? dhtMapping.node() : nearMapping.node();
-        }
-
-        /**
-         * @param e Error.
-         */
-        void onResult(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", 
err=" + e + ']');
-
-            // Fail.
-            onDone(e);
-        }
-
-        /**
-         * @param e Node failure.
-         */
-        void onResult(ClusterTopologyException e) {
-            if (log.isDebugEnabled())
-                log.debug("Remote node left grid while sending or waiting for 
reply (will ignore): " + this);
-
-            if (tx != null)
-                tx.removeMapping(nodeId);
-
-            onDone(tx);
-        }
-
-        /**
-         * @param res Result callback.
-         */
-        void onResult(GridDhtTxPrepareResponse<K, V> res) {
-            if (res.error() != null)
-                // Fail the whole compound future.
-                onError(res.error());
-            else {
-                // Process evicted readers (no need to remap).
-                if (nearMapping != null && !F.isEmpty(res.nearEvicted())) {
-                    nearMapping.evictReaders(res.nearEvicted());
-
-                    for (IgniteTxEntry<K, V> entry : nearMapping.entries()) {
-                        if (res.nearEvicted().contains(entry.txKey())) {
-                            while (true) {
-                                try {
-                                    GridDhtCacheEntry<K, V> cached = 
(GridDhtCacheEntry<K, V>)entry.cached();
-
-                                    
cached.removeReader(nearMapping.node().id(), res.messageId());
-
-                                    break;
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    GridCacheEntryEx<K, V> e = 
entry.context().cache().peekEx(entry.key());
-
-                                    if (e == null)
-                                        break;
-
-                                    entry.cached(e, entry.keyBytes());
-                                }
-                            }
-                        }
-                    }
-                }
-
-                // Process invalid partitions (no need to remap).
-                if (!F.isEmpty(res.invalidPartitions())) {
-                    for (Iterator<IgniteTxEntry<K, V>> it = 
dhtMapping.entries().iterator(); it.hasNext();) {
-                        IgniteTxEntry<K, V> entry  = it.next();
-
-                        if 
(res.invalidPartitions().contains(entry.cached().partition())) {
-                            it.remove();
-
-                            if (log.isDebugEnabled())
-                                log.debug("Removed mapping for entry from dht 
mapping [key=" + entry.key() +
-                                    ", tx=" + tx + ", dhtMapping=" + 
dhtMapping + ']');
-                        }
-                    }
-
-                    if (dhtMapping.empty()) {
-                        dhtMap.remove(nodeId);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Removed mapping for node entirely 
because all partitions are invalid [nodeId=" +
-                                nodeId + ", tx=" + tx + ']');
-                    }
-                }
-
-                long topVer = tx.topologyVersion();
-
-                boolean rec = 
cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED);
-
-                for (GridCacheEntryInfo<K, V> info : res.preloadEntries()) {
-                    GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(info.cacheId());
-
-                    while (true) {
-                        GridCacheEntryEx<K, V> entry = 
cacheCtx.cache().entryEx(info.key());
-
-                        GridDrType drType = cacheCtx.isDrEnabled() ? 
GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
-
-                        try {
-                            if (entry.initialValue(info.value(), 
info.valueBytes(), info.version(),
-                                info.ttl(), info.expireTime(), true, topVer, 
drType)) {
-                                if (rec && !entry.isInternal())
-                                    
cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
-                                        (IgniteUuid)null, null, 
EVT_CACHE_PRELOAD_OBJECT_LOADED, info.value(), true, null,
-                                        false, null, null, null);
-                            }
-
-                            break;
-                        }
-                        catch (IgniteCheckedException e) {
-                            // Fail the whole thing.
-                            onDone(e);
-
-                            return;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to set entry initial value 
(entry is obsolete, " +
-                                    "will retry): " + entry);
-                        }
-                    }
-                }
-
-                // Finish mini future.
-                onDone(tx);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MiniFuture.class, this, "done", isDone(), 
"cancelled", isCancelled(), "err", error());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
deleted file mode 100644
index 127d07e..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ /dev/null
@@ -1,613 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
-
-/**
- * DHT prepare request.
- */
-public class GridDhtTxPrepareRequest<K, V> extends 
GridDistributedTxPrepareRequest<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Max order. */
-    private UUID nearNodeId;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Mini future ID. */
-    private IgniteUuid miniId;
-
-    /** Topology version. */
-    private long topVer;
-
-    /** Invalidate near entries flags. */
-    private BitSet invalidateNearEntries;
-
-    /** Near writes. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Collection<IgniteTxEntry<K, V>> nearWrites;
-
-    /** Serialized near writes. */
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> nearWritesBytes;
-
-    /** Owned versions by key. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Map<IgniteTxKey<K>, GridCacheVersion> owned;
-
-    /** Owned versions bytes. */
-    private byte[] ownedBytes;
-
-    /** Near transaction ID. */
-    private GridCacheVersion nearXidVer;
-
-    /** {@code True} if this is last prepare request for node. */
-    private boolean last;
-
-    /** Subject ID. */
-    @GridDirectVersion(1)
-    private UUID subjId;
-
-    /** Task name hash. */
-    @GridDirectVersion(2)
-    private int taskNameHash;
-
-    @GridDirectVersion(3)
-    /** Preload keys. */
-    private BitSet preloadKeys;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtTxPrepareRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     * @param topVer Topology version.
-     * @param tx Transaction.
-     * @param dhtWrites DHT writes.
-     * @param nearWrites Near writes.
-     * @param grpLockKey Group lock key if preparing group-lock transaction.
-     * @param partLock {@code True} if group-lock transaction locks partition.
-     * @param txNodes Transaction nodes mapping.
-     * @param nearXidVer Near transaction ID.
-     * @param last {@code True} if this is last prepare request for node.
-     */
-    public GridDhtTxPrepareRequest(
-        IgniteUuid futId,
-        IgniteUuid miniId,
-        long topVer,
-        GridDhtTxLocalAdapter<K, V> tx,
-        Collection<IgniteTxEntry<K, V>> dhtWrites,
-        Collection<IgniteTxEntry<K, V>> nearWrites,
-        IgniteTxKey grpLockKey,
-        boolean partLock,
-        Map<UUID, Collection<UUID>> txNodes,
-        GridCacheVersion nearXidVer,
-        boolean last,
-        UUID subjId,
-        int taskNameHash) {
-        super(tx, null, dhtWrites, grpLockKey, partLock, txNodes);
-
-        assert futId != null;
-        assert miniId != null;
-
-        this.topVer = topVer;
-        this.futId = futId;
-        this.nearWrites = nearWrites;
-        this.miniId = miniId;
-        this.nearXidVer = nearXidVer;
-        this.last = last;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-
-        invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : 
dhtWrites.size());
-
-        nearNodeId = tx.nearNodeId();
-    }
-
-    /**
-     * @return {@code True} if this is last prepare request for node.
-     */
-    public boolean last() {
-        return last;
-    }
-
-    /**
-     * @return Near transaction ID.
-     */
-    public GridCacheVersion nearXidVersion() {
-        return nearXidVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean allowForStartup() {
-        return true;
-    }
-
-    /**
-     * @return Near node ID.
-     */
-    public UUID nearNodeId() {
-        return nearNodeId;
-    }
-
-    /**
-     * @return Subject ID.
-     */
-    @Nullable public UUID subjectId() {
-        return subjId;
-    }
-
-    /**
-     * @return Task name hash.
-     */
-    public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /**
-     * @return Near writes.
-     */
-    public Collection<IgniteTxEntry<K, V>> nearWrites() {
-        return nearWrites == null ? Collections.<IgniteTxEntry<K, 
V>>emptyList() : nearWrites;
-    }
-
-    /**
-     * @param idx Entry index to set invalidation flag.
-     * @param invalidate Invalidation flag value.
-     */
-    public void invalidateNearEntry(int idx, boolean invalidate) {
-        invalidateNearEntries.set(idx, invalidate);
-    }
-
-    /**
-     * @param idx Index to get invalidation flag value.
-     * @return Invalidation flag value.
-     */
-    public boolean invalidateNearEntry(int idx) {
-        return invalidateNearEntries.get(idx);
-    }
-
-    /**
-     * Marks last added key for preloading.
-     */
-    public void markKeyForPreload(int idx) {
-        if (preloadKeys == null)
-            preloadKeys = new BitSet();
-
-        preloadKeys.set(idx, true);
-    }
-
-    /**
-     * Checks whether entry info should be sent to primary node from backup.
-     *
-     * @param idx Index.
-     * @return {@code True} if value should be sent, {@code false} otherwise.
-     */
-    public boolean needPreloadKey(int idx) {
-        return preloadKeys != null && preloadKeys.get(idx);
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Mini future ID.
-     */
-    public IgniteUuid miniId() {
-        return miniId;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    @Override public long topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * Sets owner and its mapped version.
-     *
-     * @param key Key.
-     * @param ownerMapped Owner mapped version.
-     */
-    public void owned(IgniteTxKey<K> key, GridCacheVersion ownerMapped) {
-        if (owned == null)
-            owned = new GridLeanMap<>(3);
-
-        owned.put(key, ownerMapped);
-    }
-
-    /**
-     * @return Owned versions map.
-     */
-    public Map<IgniteTxKey<K>, GridCacheVersion> owned() {
-        return owned;
-    }
-
-    /** {@inheritDoc}
-     * @param ctx*/
-    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (ownedBytes == null && owned != null) {
-            ownedBytes = CU.marshal(ctx, owned);
-
-            if (ctx.deploymentEnabled()) {
-                for (IgniteTxKey<K> k : owned.keySet())
-                    prepareObject(k, ctx);
-            }
-        }
-
-        if (nearWrites != null) {
-            marshalTx(nearWrites, ctx);
-
-            nearWritesBytes = new ArrayList<>(nearWrites.size());
-
-            for (IgniteTxEntry<K, V> e : nearWrites)
-                nearWritesBytes.add(ctx.marshaller().marshal(e));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (ownedBytes != null && owned == null)
-            owned = ctx.marshaller().unmarshal(ownedBytes, ldr);
-
-        if (nearWritesBytes != null) {
-            nearWrites = new ArrayList<>(nearWritesBytes.size());
-
-            for (byte[] arr : nearWritesBytes)
-                nearWrites.add(ctx.marshaller().<IgniteTxEntry<K, 
V>>unmarshal(arr, ldr));
-
-            unmarshalTx(nearWrites, true, ctx, ldr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxPrepareRequest.class, this, "super", 
super.toString());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridDhtTxPrepareRequest _clone = new GridDhtTxPrepareRequest();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridDhtTxPrepareRequest _clone = (GridDhtTxPrepareRequest)_msg;
-
-        _clone.nearNodeId = nearNodeId;
-        _clone.futId = futId;
-        _clone.miniId = miniId;
-        _clone.topVer = topVer;
-        _clone.invalidateNearEntries = invalidateNearEntries;
-        _clone.nearWrites = nearWrites;
-        _clone.nearWritesBytes = nearWritesBytes;
-        _clone.owned = owned;
-        _clone.ownedBytes = ownedBytes;
-        _clone.nearXidVer = nearXidVer;
-        _clone.last = last;
-        _clone.subjId = subjId;
-        _clone.taskNameHash = taskNameHash;
-        _clone.preloadKeys = preloadKeys;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 22:
-                if (!commState.putGridUuid(futId))
-                    return false;
-
-                commState.idx++;
-
-            case 23:
-                if (!commState.putBitSet(invalidateNearEntries))
-                    return false;
-
-                commState.idx++;
-
-            case 24:
-                if (!commState.putBoolean(last))
-                    return false;
-
-                commState.idx++;
-
-            case 25:
-                if (!commState.putGridUuid(miniId))
-                    return false;
-
-                commState.idx++;
-
-            case 26:
-                if (!commState.putUuid(nearNodeId))
-                    return false;
-
-                commState.idx++;
-
-            case 27:
-                if (nearWritesBytes != null) {
-                    if (commState.it == null) {
-                        if (!commState.putInt(nearWritesBytes.size()))
-                            return false;
-
-                        commState.it = nearWritesBytes.iterator();
-                    }
-
-                    while (commState.it.hasNext() || commState.cur != NULL) {
-                        if (commState.cur == NULL)
-                            commState.cur = commState.it.next();
-
-                        if (!commState.putByteArray((byte[])commState.cur))
-                            return false;
-
-                        commState.cur = NULL;
-                    }
-
-                    commState.it = null;
-                } else {
-                    if (!commState.putInt(-1))
-                        return false;
-                }
-
-                commState.idx++;
-
-            case 28:
-                if (!commState.putCacheVersion(nearXidVer))
-                    return false;
-
-                commState.idx++;
-
-            case 29:
-                if (!commState.putByteArray(ownedBytes))
-                    return false;
-
-                commState.idx++;
-
-            case 30:
-                if (!commState.putLong(topVer))
-                    return false;
-
-                commState.idx++;
-
-            case 31:
-                if (!commState.putUuid(subjId))
-                    return false;
-
-                commState.idx++;
-
-            case 32:
-                if (!commState.putInt(taskNameHash))
-                    return false;
-
-                commState.idx++;
-
-            case 33:
-                if (!commState.putBitSet(preloadKeys))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 22:
-                IgniteUuid futId0 = commState.getGridUuid();
-
-                if (futId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                futId = futId0;
-
-                commState.idx++;
-
-            case 23:
-                BitSet invalidateNearEntries0 = commState.getBitSet();
-
-                if (invalidateNearEntries0 == BIT_SET_NOT_READ)
-                    return false;
-
-                invalidateNearEntries = invalidateNearEntries0;
-
-                commState.idx++;
-
-            case 24:
-                if (buf.remaining() < 1)
-                    return false;
-
-                last = commState.getBoolean();
-
-                commState.idx++;
-
-            case 25:
-                IgniteUuid miniId0 = commState.getGridUuid();
-
-                if (miniId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                miniId = miniId0;
-
-                commState.idx++;
-
-            case 26:
-                UUID nearNodeId0 = commState.getUuid();
-
-                if (nearNodeId0 == UUID_NOT_READ)
-                    return false;
-
-                nearNodeId = nearNodeId0;
-
-                commState.idx++;
-
-            case 27:
-                if (commState.readSize == -1) {
-                    if (buf.remaining() < 4)
-                        return false;
-
-                    commState.readSize = commState.getInt();
-                }
-
-                if (commState.readSize >= 0) {
-                    if (nearWritesBytes == null)
-                        nearWritesBytes = new ArrayList<>(commState.readSize);
-
-                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
-                        byte[] _val = commState.getByteArray();
-
-                        if (_val == BYTE_ARR_NOT_READ)
-                            return false;
-
-                        nearWritesBytes.add((byte[])_val);
-
-                        commState.readItems++;
-                    }
-                }
-
-                commState.readSize = -1;
-                commState.readItems = 0;
-
-                commState.idx++;
-
-            case 28:
-                GridCacheVersion nearXidVer0 = commState.getCacheVersion();
-
-                if (nearXidVer0 == CACHE_VER_NOT_READ)
-                    return false;
-
-                nearXidVer = nearXidVer0;
-
-                commState.idx++;
-
-            case 29:
-                byte[] ownedBytes0 = commState.getByteArray();
-
-                if (ownedBytes0 == BYTE_ARR_NOT_READ)
-                    return false;
-
-                ownedBytes = ownedBytes0;
-
-                commState.idx++;
-
-            case 30:
-                if (buf.remaining() < 8)
-                    return false;
-
-                topVer = commState.getLong();
-
-                commState.idx++;
-
-            case 31:
-                UUID subjId0 = commState.getUuid();
-
-                if (subjId0 == UUID_NOT_READ)
-                    return false;
-
-                subjId = subjId0;
-
-                commState.idx++;
-
-            case 32:
-                if (buf.remaining() < 4)
-                    return false;
-
-                taskNameHash = commState.getInt();
-
-                commState.idx++;
-
-            case 33:
-                BitSet preloadKeys0 = commState.getBitSet();
-
-                if (preloadKeys0 == BIT_SET_NOT_READ)
-                    return false;
-
-                preloadKeys = preloadKeys0;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 33;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
deleted file mode 100644
index e4ae3ac..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
-
-/**
- * DHT transaction prepare response.
- */
-public class GridDhtTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareResponse<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Evicted readers. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Collection<IgniteTxKey<K>> nearEvicted;
-
-    /** */
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> nearEvictedBytes;
-
-    /** Future ID.  */
-    private IgniteUuid futId;
-
-    /** Mini future ID. */
-    private IgniteUuid miniId;
-
-    /** Invalid partitions. */
-    @GridToStringInclude
-    @GridDirectCollection(int.class)
-    private Collection<Integer> invalidParts;
-
-    @GridDirectTransient
-    /** Preload entries. */
-    private List<GridCacheEntryInfo<K, V>> preloadEntries;
-
-    /** */
-    @GridDirectCollection(byte[].class)
-    @GridDirectVersion(1)
-    private List<byte[]> preloadEntriesBytes;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridDhtTxPrepareResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param xid Xid version.
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     */
-    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, 
IgniteUuid miniId) {
-        super(xid);
-
-        assert futId != null;
-        assert miniId != null;
-
-        this.futId = futId;
-        this.miniId = miniId;
-    }
-
-    /**
-     * @param xid Xid version.
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     * @param err Error.
-     */
-    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, 
IgniteUuid miniId, Throwable err) {
-        super(xid, err);
-
-        assert futId != null;
-        assert miniId != null;
-
-        this.futId = futId;
-        this.miniId = miniId;
-    }
-
-    /**
-     * @return Evicted readers.
-     */
-    public Collection<IgniteTxKey<K>> nearEvicted() {
-        return nearEvicted;
-    }
-
-    /**
-     * @param nearEvicted Evicted readers.
-     */
-    public void nearEvicted(Collection<IgniteTxKey<K>> nearEvicted) {
-        this.nearEvicted = nearEvicted;
-    }
-
-    /**
-     * @param nearEvictedBytes Near evicted bytes.
-     */
-    public void nearEvictedBytes(Collection<byte[]> nearEvictedBytes) {
-        this.nearEvictedBytes = nearEvictedBytes;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Mini future ID.
-     */
-    public IgniteUuid miniId() {
-        return miniId;
-    }
-
-    /**
-     * @return Invalid partitions.
-     */
-    public Collection<Integer> invalidPartitions() {
-        return invalidParts;
-    }
-
-    /**
-     * @param invalidParts Invalid partitions.
-     */
-    public void invalidPartitions(Collection<Integer> invalidParts) {
-        this.invalidParts = invalidParts;
-    }
-
-    /**
-     * Gets preload entries found on backup node.
-     *
-     * @return Collection of entry infos need to be preloaded.
-     */
-    public Collection<GridCacheEntryInfo<K, V>> preloadEntries() {
-        return preloadEntries == null ? Collections.<GridCacheEntryInfo<K, 
V>>emptyList() : preloadEntries;
-    }
-
-    /**
-     * Adds preload entry.
-     *
-     * @param info Info to add.
-     */
-    public void addPreloadEntry(GridCacheEntryInfo<K, V> info) {
-        if (preloadEntries == null)
-            preloadEntries = new ArrayList<>();
-
-        preloadEntries.add(info);
-    }
-
-    /** {@inheritDoc}
-     * @param ctx*/
-    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (nearEvictedBytes == null)
-            nearEvictedBytes = marshalCollection(nearEvicted, ctx);
-
-        if (preloadEntriesBytes == null && preloadEntries != null)
-            preloadEntriesBytes = marshalCollection(preloadEntries, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        // Unmarshal even if deployment is disabled, since we could get bytes 
initially.
-        if (nearEvicted == null && nearEvictedBytes != null)
-            nearEvicted = unmarshalCollection(nearEvictedBytes, ctx, ldr);
-
-        if (preloadEntries == null && preloadEntriesBytes != null)
-            preloadEntries = unmarshalCollection(preloadEntriesBytes, ctx, 
ldr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxPrepareResponse.class, this, "super", 
super.toString());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridDhtTxPrepareResponse _clone = new GridDhtTxPrepareResponse();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridDhtTxPrepareResponse _clone = (GridDhtTxPrepareResponse)_msg;
-
-        _clone.nearEvicted = nearEvicted;
-        _clone.nearEvictedBytes = nearEvictedBytes;
-        _clone.futId = futId;
-        _clone.miniId = miniId;
-        _clone.invalidParts = invalidParts;
-        _clone.preloadEntries = preloadEntries;
-        _clone.preloadEntriesBytes = preloadEntriesBytes;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 10:
-                if (!commState.putGridUuid(futId))
-                    return false;
-
-                commState.idx++;
-
-            case 11:
-                if (invalidParts != null) {
-                    if (commState.it == null) {
-                        if (!commState.putInt(invalidParts.size()))
-                            return false;
-
-                        commState.it = invalidParts.iterator();
-                    }
-
-                    while (commState.it.hasNext() || commState.cur != NULL) {
-                        if (commState.cur == NULL)
-                            commState.cur = commState.it.next();
-
-                        if (!commState.putInt((int)commState.cur))
-                            return false;
-
-                        commState.cur = NULL;
-                    }
-
-                    commState.it = null;
-                } else {
-                    if (!commState.putInt(-1))
-                        return false;
-                }
-
-                commState.idx++;
-
-            case 12:
-                if (!commState.putGridUuid(miniId))
-                    return false;
-
-                commState.idx++;
-
-            case 13:
-                if (nearEvictedBytes != null) {
-                    if (commState.it == null) {
-                        if (!commState.putInt(nearEvictedBytes.size()))
-                            return false;
-
-                        commState.it = nearEvictedBytes.iterator();
-                    }
-
-                    while (commState.it.hasNext() || commState.cur != NULL) {
-                        if (commState.cur == NULL)
-                            commState.cur = commState.it.next();
-
-                        if (!commState.putByteArray((byte[])commState.cur))
-                            return false;
-
-                        commState.cur = NULL;
-                    }
-
-                    commState.it = null;
-                } else {
-                    if (!commState.putInt(-1))
-                        return false;
-                }
-
-                commState.idx++;
-
-            case 14:
-                if (preloadEntriesBytes != null) {
-                    if (commState.it == null) {
-                        if (!commState.putInt(preloadEntriesBytes.size()))
-                            return false;
-
-                        commState.it = preloadEntriesBytes.iterator();
-                    }
-
-                    while (commState.it.hasNext() || commState.cur != NULL) {
-                        if (commState.cur == NULL)
-                            commState.cur = commState.it.next();
-
-                        if (!commState.putByteArray((byte[])commState.cur))
-                            return false;
-
-                        commState.cur = NULL;
-                    }
-
-                    commState.it = null;
-                } else {
-                    if (!commState.putInt(-1))
-                        return false;
-                }
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 10:
-                IgniteUuid futId0 = commState.getGridUuid();
-
-                if (futId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                futId = futId0;
-
-                commState.idx++;
-
-            case 11:
-                if (commState.readSize == -1) {
-                    if (buf.remaining() < 4)
-                        return false;
-
-                    commState.readSize = commState.getInt();
-                }
-
-                if (commState.readSize >= 0) {
-                    if (invalidParts == null)
-                        invalidParts = new ArrayList<>(commState.readSize);
-
-                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
-                        if (buf.remaining() < 4)
-                            return false;
-
-                        int _val = commState.getInt();
-
-                        invalidParts.add((Integer)_val);
-
-                        commState.readItems++;
-                    }
-                }
-
-                commState.readSize = -1;
-                commState.readItems = 0;
-
-                commState.idx++;
-
-            case 12:
-                IgniteUuid miniId0 = commState.getGridUuid();
-
-                if (miniId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                miniId = miniId0;
-
-                commState.idx++;
-
-            case 13:
-                if (commState.readSize == -1) {
-                    if (buf.remaining() < 4)
-                        return false;
-
-                    commState.readSize = commState.getInt();
-                }
-
-                if (commState.readSize >= 0) {
-                    if (nearEvictedBytes == null)
-                        nearEvictedBytes = new ArrayList<>(commState.readSize);
-
-                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
-                        byte[] _val = commState.getByteArray();
-
-                        if (_val == BYTE_ARR_NOT_READ)
-                            return false;
-
-                        nearEvictedBytes.add((byte[])_val);
-
-                        commState.readItems++;
-                    }
-                }
-
-                commState.readSize = -1;
-                commState.readItems = 0;
-
-                commState.idx++;
-
-            case 14:
-                if (commState.readSize == -1) {
-                    if (buf.remaining() < 4)
-                        return false;
-
-                    commState.readSize = commState.getInt();
-                }
-
-                if (commState.readSize >= 0) {
-                    if (preloadEntriesBytes == null)
-                        preloadEntriesBytes = new 
ArrayList<>(commState.readSize);
-
-                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
-                        byte[] _val = commState.getByteArray();
-
-                        if (_val == BYTE_ARR_NOT_READ)
-                            return false;
-
-                        preloadEntriesBytes.add((byte[])_val);
-
-                        commState.readItems++;
-                    }
-                }
-
-                commState.readSize = -1;
-                commState.readItems = 0;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 34;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
deleted file mode 100644
index 3352278..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.processor.*;
-import java.io.*;
-import java.util.*;
-
-import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
-
-/**
- * Transaction created by system implicitly on remote nodes.
- */
-public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, 
V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Near node ID. */
-    private UUID nearNodeId;
-
-    /** Remote future ID. */
-    private IgniteUuid rmtFutId;
-
-    /** Near transaction ID. */
-    private GridCacheVersion nearXidVer;
-
-    /** Transaction nodes mapping (primary node -> related backup nodes). */
-    private Map<UUID, Collection<UUID>> txNodes;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtTxRemote() {
-        // No-op.
-    }
-
-    /**
-     * This constructor is meant for optimistic transactions.
-     *
-     * @param nearNodeId Near node ID.
-     * @param rmtFutId Remote future ID.
-     * @param nodeId Node ID.
-     * @param rmtThreadId Remote thread ID.
-     * @param topVer Topology version.
-     * @param xidVer XID version.
-     * @param commitVer Commit version.
-     * @param sys System flag.
-     * @param concurrency Concurrency level (should be pessimistic).
-     * @param isolation Transaction isolation.
-     * @param invalidate Invalidate flag.
-     * @param timeout Timeout.
-     * @param ctx Cache context.
-     * @param txSize Expected transaction size.
-     * @param grpLockKey Group lock key if this is a group-lock transaction.
-     * @param nearXidVer Near transaction ID.
-     * @param txNodes Transaction nodes mapping.
-     */
-    public GridDhtTxRemote(
-        GridCacheSharedContext<K, V> ctx,
-        UUID nearNodeId,
-        IgniteUuid rmtFutId,
-        UUID nodeId,
-        long rmtThreadId,
-        long topVer,
-        GridCacheVersion xidVer,
-        GridCacheVersion commitVer,
-        boolean sys,
-        IgniteTxConcurrency concurrency,
-        IgniteTxIsolation isolation,
-        boolean invalidate,
-        long timeout,
-        int txSize,
-        @Nullable IgniteTxKey grpLockKey,
-        GridCacheVersion nearXidVer,
-        Map<UUID, Collection<UUID>> txNodes,
-        @Nullable UUID subjId,
-        int taskNameHash
-    ) {
-        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, 
isolation, invalidate, timeout, txSize,
-            grpLockKey, subjId, taskNameHash);
-
-        assert nearNodeId != null;
-        assert rmtFutId != null;
-
-        this.nearNodeId = nearNodeId;
-        this.rmtFutId = rmtFutId;
-        this.nearXidVer = nearXidVer;
-        this.txNodes = txNodes;
-
-        readMap = Collections.emptyMap();
-
-        writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
-
-        topologyVersion(topVer);
-    }
-
-    /**
-     * This constructor is meant for pessimistic transactions.
-     *
-     * @param nearNodeId Near node ID.
-     * @param rmtFutId Remote future ID.
-     * @param nodeId Node ID.
-     * @param nearXidVer Near transaction ID.
-     * @param rmtThreadId Remote thread ID.
-     * @param topVer Topology version.
-     * @param xidVer XID version.
-     * @param commitVer Commit version.
-     * @param sys System flag.
-     * @param concurrency Concurrency level (should be pessimistic).
-     * @param isolation Transaction isolation.
-     * @param invalidate Invalidate flag.
-     * @param timeout Timeout.
-     * @param ctx Cache context.
-     * @param txSize Expected transaction size.
-     * @param grpLockKey Group lock key if transaction is group-lock.
-     */
-    public GridDhtTxRemote(
-        GridCacheSharedContext<K, V> ctx,
-        UUID nearNodeId,
-        IgniteUuid rmtFutId,
-        UUID nodeId,
-        GridCacheVersion nearXidVer,
-        long rmtThreadId,
-        long topVer,
-        GridCacheVersion xidVer,
-        GridCacheVersion commitVer,
-        boolean sys,
-        IgniteTxConcurrency concurrency,
-        IgniteTxIsolation isolation,
-        boolean invalidate,
-        long timeout,
-        int txSize,
-        @Nullable IgniteTxKey grpLockKey,
-        @Nullable UUID subjId,
-        int taskNameHash
-    ) {
-        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, 
isolation, invalidate, timeout, txSize,
-            grpLockKey, subjId, taskNameHash);
-
-        assert nearNodeId != null;
-        assert rmtFutId != null;
-
-        this.nearXidVer = nearXidVer;
-        this.nearNodeId = nearNodeId;
-        this.rmtFutId = rmtFutId;
-
-        readMap = Collections.emptyMap();
-        writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
-
-        topologyVersion(topVer);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean dht() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID eventNodeId() {
-        return nearNodeId();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> masterNodeIds() {
-        return Arrays.asList(nearNodeId, nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID otherNodeId() {
-        return nearNodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean enforceSerializable() {
-        return false; // Serializable will be enforced on primary mode.
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion nearXidVersion() {
-        return nearXidVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<UUID, Collection<UUID>> transactionNodes() {
-        return txNodes;
-    }
-
-    /**
-     * @return Near node ID.
-     */
-    UUID nearNodeId() {
-        return nearNodeId;
-    }
-
-    /**
-     * @return Remote future ID.
-     */
-    IgniteUuid remoteFutureId() {
-        return rmtFutId;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean updateNearCache(GridCacheContext<K, V> 
cacheCtx, K key, long topVer) {
-        if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || 
cctx.localNodeId().equals(nearNodeId))
-            return false;
-
-        if (cacheCtx.config().getBackups() == 0)
-            return true;
-
-        // Check if we are on the backup node.
-        return !cacheCtx.affinity().backups(key, 
topVer).contains(cctx.localNode());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, 
int part) {
-        super.addInvalidPartition(cacheCtx, part);
-
-        for (Iterator<IgniteTxEntry<K, V>> it = writeMap.values().iterator(); 
it.hasNext();) {
-            IgniteTxEntry<K, V> e = it.next();
-
-            GridCacheEntryEx<K, V> cached = e.cached();
-
-            if (cached != null) {
-                if (cached.partition() == part)
-                    it.remove();
-            }
-            else if (cacheCtx.affinity().partition(e.key()) == part)
-                it.remove();
-        }
-    }
-
-    /**
-     * @param entry Write entry.
-     * @param ldr Class loader.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void addWrite(IgniteTxEntry<K, V> entry, ClassLoader ldr) throws 
IgniteCheckedException {
-        entry.unmarshal(cctx, false, ldr);
-
-        GridCacheContext<K, V> cacheCtx = entry.context();
-
-        try {
-            GridDhtCacheEntry<K, V> cached = 
cacheCtx.dht().entryExx(entry.key(), topologyVersion());
-
-            checkInternal(entry.txKey());
-
-            // Initialize cache entry.
-            entry.cached(cached, entry.keyBytes());
-
-            writeMap.put(entry.txKey(), entry);
-
-            addExplicit(entry);
-        }
-        catch (GridDhtInvalidPartitionException e) {
-            addInvalidPartition(cacheCtx, e.partition());
-        }
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param op Write operation.
-     * @param key Key to add to write set.
-     * @param keyBytes Key bytes.
-     * @param val Value.
-     * @param valBytes Value bytes.
-     * @param drVer Data center replication version.
-     * @param entryProcessors Entry processors.
-     * @param ttl TTL.
-     */
-    public void addWrite(GridCacheContext<K, V> cacheCtx,
-        GridCacheOperation op,
-        IgniteTxKey<K> key,
-        byte[] keyBytes,
-        @Nullable V val,
-        @Nullable byte[] valBytes,
-        @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> 
entryProcessors,
-        @Nullable GridCacheVersion drVer,
-        long ttl) {
-        checkInternal(key);
-
-        if (isSystemInvalidate())
-            return;
-
-        GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(key.key(), 
topologyVersion());
-
-        IgniteTxEntry<K, V> txEntry = new IgniteTxEntry<>(cacheCtx,
-            this,
-            op,
-            val,
-            ttl,
-            -1L,
-            cached,
-            drVer);
-
-        txEntry.keyBytes(keyBytes);
-        txEntry.valueBytes(valBytes);
-        txEntry.entryProcessors(entryProcessors);
-
-        writeMap.put(key, txEntry);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return GridToStringBuilder.toString(GridDhtTxRemote.class, this, 
"super", super.toString());
-    }
-}

Reply via email to