http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 8361423..5b011e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -63,9 +63,6 @@ public class GridDhtTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> implements
     /** Near XID. */
     private GridCacheVersion nearXidVer;
 
-    /** Transaction nodes mapping (primary node -> related backup nodes). */
-    private Map<UUID, Collection<UUID>> txNodes;
-
     /** Future. */
     @GridToStringExclude
     private final AtomicReference<GridDhtTxPrepareFuture<K, V>> prepFut =
@@ -153,11 +150,6 @@ public class GridDhtTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> implements
     }
 
     /** {@inheritDoc} */
-    @Override public Map<UUID, Collection<UUID>> transactionNodes() {
-        return txNodes;
-    }
-
-    /** {@inheritDoc} */
     @Override public UUID eventNodeId() {
         return nearNodeId;
     }
@@ -201,6 +193,13 @@ public class GridDhtTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> implements
         return nearFutId;
     }
 
+    /**
+     * @param nearFutId Near future ID.
+     */
+    public void nearFutureId(IgniteUuid nearFutId) {
+        this.nearFutId = nearFutId;
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteUuid nearMiniId() {
         return nearMiniId;
@@ -289,7 +288,7 @@ public class GridDhtTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> implements
         if (fut == null) {
             // Future must be created before any exception can be thrown.
             if (!prepFut.compareAndSet(null, fut = new 
GridDhtTxPrepareFuture<>(cctx, this, nearMiniId,
-                Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), 
true, null)))
+                Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), 
true, needReturnValue(), null)))
                 return prepFut.get();
         }
         else
@@ -355,8 +354,6 @@ public class GridDhtTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> implements
         Map<UUID, Collection<UUID>> txNodes,
         boolean last,
         Collection<UUID> lastBackups) {
-        assert optimistic();
-
         // In optimistic mode prepare still can be called explicitly from 
salvageTx.
         GridDhtTxPrepareFuture<K, V> fut = prepFut.get();
 
@@ -365,7 +362,7 @@ public class GridDhtTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> implements
 
             // Future must be created before any exception can be thrown.
             if (!prepFut.compareAndSet(null, fut = new 
GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, verMap, last,
-                lastBackups))) {
+                needReturnValue(), lastBackups))) {
                 GridDhtTxPrepareFuture<K, V> f = prepFut.get();
 
                 assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id 
on existing future " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 564100b..b78561a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -238,10 +238,10 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
             }
 
             if (!F.isEmpty(dhtEntryMap))
-                addDhtMapping(dhtEntryMap);
+                addDhtNodeEntryMapping(dhtEntryMap);
 
             if (!F.isEmpty(nearEntryMap))
-                addNearMapping(nearEntryMap);
+                addNearNodeEntryMapping(nearEntryMap);
 
             mapped.set(true);
         }
@@ -284,18 +284,31 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
     /**
      * @param mappings Mappings to add.
      */
-    void addDhtMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> 
mappings) {
+    void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, 
V>>> mappings) {
         addMapping(mappings, dhtMap);
     }
 
     /**
      * @param mappings Mappings to add.
      */
-    void addNearMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> 
mappings) {
+    void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, 
V>>> mappings) {
         addMapping(mappings, nearMap);
     }
 
     /**
+     * @param mappings Mappings to add.
+     */
+    public void addDhtMapping(Map<UUID, GridDistributedTxMapping<K, V>> 
mappings) {
+        addMapping0(mappings, dhtMap);
+    }
+
+    /**
+     * @param mappings Mappings to add.
+     */
+    public void addNearMapping(Map<UUID, GridDistributedTxMapping<K, V>> 
mappings) {
+        addMapping0(mappings, nearMap);
+    }
+    /**
      * @param nodeId Node ID.
      * @return {@code True} if mapping was removed.
      */
@@ -353,21 +366,25 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
 
     /**
      * @param mappings Entry mappings.
-     * @param map Transaction mappings.
+     * @param dst Transaction mappings.
      */
-    private void addMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> 
mappings,
-        Map<UUID, GridDistributedTxMapping<K, V>> map) {
+    private void addMapping(
+        Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings,
+        Map<UUID, GridDistributedTxMapping<K, V>> dst
+    ) {
         for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapping : 
mappings.entrySet()) {
             ClusterNode n = mapping.getKey();
 
-            for (GridDhtCacheEntry<K, V> entry : mapping.getValue()) {
+            GridDistributedTxMapping<K, V> m = dst.get(n.id());
+
+            List<GridDhtCacheEntry<K, V>> entries = mapping.getValue();
+
+            for (GridDhtCacheEntry<K, V> entry : entries) {
                 IgniteTxEntry<K, V> txEntry = txMap.get(entry.txKey());
 
                 if (txEntry != null) {
-                    GridDistributedTxMapping<K, V> m = map.get(n.id());
-
                     if (m == null)
-                        map.put(n.id(), m = new GridDistributedTxMapping<>(n));
+                        dst.put(n.id(), m = new GridDistributedTxMapping<>(n));
 
                     m.add(txEntry);
                 }
@@ -375,13 +392,31 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
         }
     }
 
+    /**
+     * @param mappings Mappings to add.
+     * @param dst Map to add to.
+     */
+    private void addMapping0(
+        Map<UUID, GridDistributedTxMapping<K, V>> mappings,
+        Map<UUID, GridDistributedTxMapping<K, V>> dst
+    ) {
+        for (Map.Entry<UUID, GridDistributedTxMapping<K, V>> entry : 
mappings.entrySet()) {
+            GridDistributedTxMapping<K, V> targetMapping = 
dst.get(entry.getKey());
+
+            if (targetMapping == null)
+                dst.put(entry.getKey(), entry.getValue());
+            else {
+                for (IgniteTxEntry<K, V> txEntry : entry.getValue().entries())
+                    targetMapping.add(txEntry);
+            }
+        }
+    }
 
     /** {@inheritDoc} */
     @Override public void addInvalidPartition(GridCacheContext<K, V> ctx, int 
part) {
         assert false : "DHT transaction encountered invalid partition [part=" 
+ part + ", tx=" + this + ']';
     }
 
-
     /**
      * @param msgId Message ID.
      * @param e Entry to add.
@@ -393,17 +428,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
 
         IgniteTxState state = state();
 
-        assert state == ACTIVE || (state == PREPARING && optimistic()) : 
"Invalid tx state for " +
+        assert state == PREPARING : "Invalid tx state for " +
             "adding entry [msgId=" + msgId + ", e=" + e + ", tx=" + this + ']';
 
         e.unmarshal(cctx, false, cctx.deploy().globalLoader());
 
         checkInternal(e.txKey());
 
-        state = state();
-
-        assert state == ACTIVE || (state == PREPARING && optimistic()): 
"Invalid tx state for adding entry: " + e;
-
         GridCacheContext<K, V> cacheCtx = e.context();
 
         GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? 
cacheCtx.near().dht() : cacheCtx.dht();
@@ -419,6 +450,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
                 entry.ttl(e.ttl());
                 entry.filters(e.filters());
                 entry.drExpireTime(e.drExpireTime());
+                entry.drVersion(e.drVersion());
             }
             else {
                 entry = e;
@@ -471,23 +503,18 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
     /**
      * @param cacheCtx Cache context.
      * @param entries Entries to lock.
-     * @param writeEntries Write entries for implicit transactions mapped to 
one node.
      * @param onePhaseCommit One phase commit flag.
-     * @param drVers DR versions.
      * @param msgId Message ID.
-     * @param implicit Implicit flag.
      * @param read Read flag.
      * @param accessTtl TTL for read operation.
      * @return Lock future.
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     IgniteFuture<GridCacheReturn<V>> lockAllAsync(
         GridCacheContext<K, V> cacheCtx,
-        Collection<GridCacheEntryEx<K, V>> entries,
-        List<IgniteTxEntry<K, V>> writeEntries,
+        List<GridCacheEntryEx<K, V>> entries,
         boolean onePhaseCommit,
-        GridCacheVersion[] drVers,
         long msgId,
-        boolean implicit,
         final boolean read,
         long accessTtl
     ) {
@@ -508,19 +535,16 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
         onePhaseCommit(onePhaseCommit);
 
         try {
-            assert drVers == null || entries.size() == drVers.length;
-
             Set<K> skipped = null;
 
-            int idx = 0;
-            int drVerIdx = 0;
-
             long topVer = topologyVersion();
 
             GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? 
cacheCtx.near().dht() : cacheCtx.dht();
 
             // Enlist locks into transaction.
-            for (GridCacheEntryEx<K, V> entry : entries) {
+            for (int i = 0; i < entries.size(); i++) {
+                GridCacheEntryEx<K, V> entry = entries.get(i);
+
                 K key = entry.key();
 
                 IgniteTxEntry<K, V> txEntry = entry(entry.txKey());
@@ -533,9 +557,6 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
 
                     cached.unswap(!read, read);
 
-                    IgniteTxEntry<K, V>
-                        w = writeEntries == null ? null : 
writeEntries.get(idx++);
-
                     txEntry = addEntry(NOOP,
                         null,
                         null,
@@ -546,22 +567,9 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
                         false,
                         -1L,
                         -1L,
-                        drVers != null ? drVers[drVerIdx++] : null);
-
-                    if (w != null) {
-                        assert key.equals(w.key()) : "Invalid entry [cached=" 
+ cached + ", w=" + w + ']';
-
-                        txEntry.op(w.op());
-                        txEntry.value(w.value(), w.hasWriteValue(), 
w.hasReadValue());
-                        txEntry.valueBytes(w.valueBytes());
-                        txEntry.drVersion(w.drVersion());
-                        txEntry.entryProcessors(w.entryProcessors());
-                        txEntry.ttl(w.ttl());
-                        txEntry.filters(w.filters());
-                        txEntry.drExpireTime(w.drExpireTime());
-                        txEntry.expiry(w.expiry());
-                    }
-                    else if (read)
+                        null);
+
+                    if (read)
                         txEntry.ttl(accessTtl);
 
                     txEntry.cached(cached, txEntry.keyBytes());
@@ -718,7 +726,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
         }
 
         if (locNearMap != null)
-            addNearMapping(locNearMap);
+            addNearNodeEntryMapping(locNearMap);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d7f8ef2..e2bb7e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -103,6 +104,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
     /** IDs of backup nodes receiving last prepare request during this 
prepare. */
     private Collection<UUID> lastBackups;
 
+    /** Needs return value flag. */
+    private boolean retVal;
+
+    /** Return value. */
+    private GridCacheReturn<V> ret;
+
+    /** Keys that did not pass the filter. */
+    private Collection<IgniteTxKey<K>> filterFailedKeys;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -118,8 +128,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
      * @param last {@code True} if this is last prepare operation for node.
      * @param lastBackups IDs of backup nodes receiving last prepare request 
during this prepare.
      */
-    public GridDhtTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final 
GridDhtTxLocalAdapter<K, V> tx,
-        IgniteUuid nearMiniId, Map<IgniteTxKey<K>, GridCacheVersion> 
dhtVerMap, boolean last, Collection<UUID> lastBackups) {
+    public GridDhtTxPrepareFuture(
+        GridCacheSharedContext<K, V> cctx,
+        final GridDhtTxLocalAdapter<K, V> tx,
+        IgniteUuid nearMiniId,
+        Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap,
+        boolean last,
+        boolean retVal,
+        Collection<UUID> lastBackups
+    ) {
         super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, 
IgniteTxEx<K, V>>() {
             @Override public boolean collect(IgniteTxEx<K, V> e) {
                 return true;
@@ -131,8 +148,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
             }
         });
 
-        assert cctx != null;
-
         this.cctx = cctx;
         this.tx = tx;
         this.dhtVerMap = dhtVerMap;
@@ -148,6 +163,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
         dhtMap = tx.dhtMap();
         nearMap = tx.nearMap();
 
+        this.retVal = retVal;
+
         assert dhtMap != null;
         assert nearMap != null;
     }
@@ -220,13 +237,24 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
                 GridCacheEntryEx<K, V> cached = txEntry.cached();
 
                 try {
-                    // Don't compare entry against itself.
-                    if (!cached.lockedLocally(tx.xidVersion())) {
-                        if (log.isDebugEnabled())
-                            log.debug("Transaction entry is not locked by 
transaction (will wait) [entry=" + cached +
-                                ", tx=" + tx + ']');
+                    if (txEntry.explicitVersion() == null) {
+                        // Don't compare entry against itself.
+                        if (!cached.lockedLocally(tx.xidVersion())) {
+                            if (log.isDebugEnabled())
+                                log.debug("Transaction entry is not locked by 
transaction (will wait) [entry=" +
+                                    cached + ", tx=" + tx + ']');
+
+                            return false;
+                        }
+                    }
+                    else {
+                        if (!cached.lockedBy(txEntry.explicitVersion())) {
+                            if (log.isDebugEnabled())
+                                log.debug("Transaction entry is not locked by 
explicit version (will wait) [entry=" +
+                                    cached + ", tx=" + tx + ']');
 
-                        return false;
+                            return false;
+                        }
                     }
 
                     break; // While.
@@ -261,6 +289,67 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
     }
 
     /**
+     *
+     */
+    private void checkFilters() {
+        ret = new GridCacheReturn<>(null, true);
+
+        for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) {
+            GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+            GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+            try {
+                boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && 
!F.isAlwaysTrue(txEntry.filters());
+
+                if (hasFilters || retVal || txEntry.op() == 
GridCacheOperation.DELETE) {
+                    cached.unswap(true, retVal);
+
+                    V val = cached.innerGet(
+                        tx,
+                        /*swap*/true,
+                        /*read through*/retVal || hasFilters,
+                        /*fail fast*/false,
+                        /*unmarshal*/true,
+                        /*metrics*/retVal,
+                        /*event*/retVal,
+                        /*tmp*/false,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+                    if (retVal)
+                        ret.value(val);
+
+                    if (hasFilters && !cacheCtx.isAll(cached, 
txEntry.filters())) {
+                        txEntry.op(GridCacheOperation.NOOP);
+
+                        if (filterFailedKeys == null)
+                            filterFailedKeys = new ArrayList<>();
+
+                        filterFailedKeys.add(cached.txKey());
+
+                        ret.success(false);
+                    }
+                    else
+                        ret.success(txEntry.op() != GridCacheOperation.DELETE 
|| cached.hasValue());
+                }
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to get result value for cache entry: " + 
cached, e);
+            }
+            catch (GridCacheEntryRemovedException e) {
+                assert false : "Got entry removed exception while holding 
transactional lock on entry: " + e;
+            }
+            catch (GridCacheFilterFailedException e) {
+                assert false : "Got filter failed exception with fail fast 
false " + e;
+            }
+        }
+    }
+
+    /**
      * @param t Error.
      */
     public void onError(Throwable t) {
@@ -281,8 +370,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
             // If not local node.
             if (!tx.nearNodeId().equals(cctx.localNodeId())) {
                 // Send reply back to near node.
-                GridCacheMessage<K, V> res = new 
GridNearTxPrepareResponse<>(tx.nearXidVersion(), tx.nearFutureId(),
-                    nearMiniId, tx.xidVersion(), 
Collections.<Integer>emptySet(), t);
+                GridCacheMessage<K, V> res = new GridNearTxPrepareResponse<>(
+                    tx.nearXidVersion(),
+                    tx.nearFutureId(),
+                    nearMiniId,
+                    tx.xidVersion(),
+                    Collections.<Integer>emptySet(),
+                    ret,
+                    t);
 
                 try {
                     cctx.io().send(tx.nearNodeId(), res, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
@@ -379,54 +474,105 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
 
         this.err.compareAndSet(null, err);
 
-        if (replied.compareAndSet(false, true)) {
-            try {
-                // Must clear prepare future before response is sent or 
listeners are notified.
-                if (tx.optimistic())
-                    tx.clearPrepareFuture(this);
+        // Must clear prepare future before response is sent or listeners are 
notified.
+        if (tx.optimistic())
+            tx.clearPrepareFuture(this);
 
-                if (!tx.nearNodeId().equals(cctx.localNodeId())) {
-                    // Send reply back to originating near node.
-                    GridNearTxPrepareResponse<K, V> res = new 
GridNearTxPrepareResponse<>(tx.nearXidVersion(),
-                        tx.nearFutureId(), nearMiniId, tx.xidVersion(), 
tx.invalidPartitions(), this.err.get());
+        if (tx.onePhaseCommit()) {
+            assert last;
 
-                    addDhtValues(res);
+            // Must create prepare response before transaction is committed to 
grab correct return value.
+            final GridNearTxPrepareResponse<K, V> res = 
createPrepareResponse();
 
-                    GridCacheVersion min = tx.minVersion();
+            onComplete();
 
-                    res.completedVersions(cctx.tm().committedVersions(min), 
cctx.tm().rolledbackVersions(min));
+            if (!tx.colocated() && 
tx.markFinalizing(IgniteTxEx.FinalizationStatus.USER_FINISH)) {
+                IgniteFuture<IgniteTx> fut = this.err.get() == null ? 
tx.commitAsync() : tx.rollbackAsync();
 
-                    res.pending(localDhtPendingVersions(tx.writeEntries(), 
min));
+                fut.listenAsync(new CIX1<IgniteFuture<IgniteTx>>() {
+                    @Override public void applyx(IgniteFuture<IgniteTx> 
gridCacheTxGridFuture) {
+                        try {
+                            if (replied.compareAndSet(false, true))
+                                sendPrepareResponse(res);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to send prepare response for 
transaction: " + tx, e);
+                        }
+                    }
+                });
+            }
 
-                    cctx.io().send(tx.nearNodeId(), res, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
+            return true;
+        }
+        else {
+            if (replied.compareAndSet(false, true)) {
+                try {
+                    sendPrepareResponse(createPrepareResponse());
+
+                    return true;
                 }
+                catch (IgniteCheckedException e) {
+                    onError(e);
 
-                return true;
+                    return true;
+                }
+                finally {
+                    // Will call super.onDone().
+                    onComplete();
+                }
             }
-            catch (IgniteCheckedException e) {
-                onError(e);
+            else {
+                // Other thread is completing future. Wait for it to complete.
+                try {
+                    get();
+                }
+                catch (IgniteInterruptedException e) {
+                    onError(new IgniteCheckedException("Got interrupted while 
waiting for replies to be sent.", e));
+                }
+                catch (IgniteCheckedException ignored) {
+                    // No-op, get() was just synchronization.
+                }
 
-                return true;
-            }
-            finally {
-                // Will call super.onDone().
-                onComplete();
+                return false;
             }
         }
-        else {
-            // Other thread is completing future. Wait for it to complete.
-            try {
-                get();
-            }
-            catch (IgniteInterruptedException e) {
-                onError(new IgniteCheckedException("Got interrupted while 
waiting for replies to be sent.", e));
-            }
-            catch (IgniteCheckedException ignored) {
-                // No-op, get() was just synchronization.
-            }
+    }
 
-            return false;
-        }
+    /**
+     * @throws GridException If failed to send response.
+     */
+    private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) 
throws IgniteCheckedException {
+        if (!tx.nearNodeId().equals(cctx.localNodeId()))
+            cctx.io().send(tx.nearNodeId(), res);
+    }
+
+    /**
+     * @return Prepare response.
+     */
+    private GridNearTxPrepareResponse<K, V> createPrepareResponse() {
+        // Send reply back to originating near node.
+        GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(
+            tx.nearXidVersion(),
+            tx.colocated() ? tx.xid() : tx.nearFutureId(),
+            nearMiniId == null ? tx.xid() : nearMiniId,
+            tx.xidVersion(),
+            tx.invalidPartitions(),
+            ret,
+            err.get());
+
+        addDhtValues(res);
+
+        GridCacheVersion min = tx.minVersion();
+
+        res.completedVersions(cctx.tm().committedVersions(min), 
cctx.tm().rolledbackVersions(min));
+
+        res.pending(localDhtPendingVersions(tx.writeEntries(), min));
+
+        res.filterFailedKeys(filterFailedKeys);
+
+        tx.implicitSingleResult(ret);
+
+        return res;
     }
 
     /**
@@ -585,164 +731,135 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
             return;
 
         try {
-            Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new 
HashMap<>();
-            Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new 
HashMap<>();
-
-            boolean hasRemoteNodes = false;
-
-            // Assign keys to primary nodes.
-            if (!F.isEmpty(reads)) {
-                for (IgniteTxEntry<K, V> read : reads)
-                    hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, 
futNearMap);
-            }
-
-            if (!F.isEmpty(writes)) {
-                for (IgniteTxEntry<K, V> write : writes)
-                    hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, 
futNearMap);
-            }
-
-            if (isDone())
-                return;
-
-            tx.needsCompletedVersions(hasRemoteNodes);
-
-            // Create mini futures.
-            for (GridDistributedTxMapping<K, V> dhtMapping : 
futDhtMap.values()) {
-                assert !dhtMapping.empty();
-
-                ClusterNode n = dhtMapping.node();
-
-                assert !n.isLocal();
-
-                GridDistributedTxMapping<K, V> nearMapping = 
futNearMap.get(n.id());
-
-                MiniFuture fut = new MiniFuture(n.id(), dhtMap.get(n.id()), 
nearMap.get(n.id()));
+            // We are holding transaction-level locks for entries here, so we 
can get next write version.
+            tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
 
-                add(fut); // Append new future.
+            checkFilters();
 
-                Collection<IgniteTxEntry<K, V>> nearWrites = nearMapping == 
null ? null : nearMapping.writes();
+            {
+                Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new 
HashMap<>();
+                Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new 
HashMap<>();
 
-                GridDhtTxPrepareRequest<K, V> req = new 
GridDhtTxPrepareRequest<>(
-                    futId,
-                    fut.futureId(),
-                    tx.topologyVersion(),
-                    tx,
-                    dhtMapping.writes(),
-                    nearWrites,
-                    tx.groupLockKey(),
-                    tx.partitionLock(),
-                    txNodes,
-                    tx.nearXidVersion(),
-                    lastBackup(n.id()),
-                    tx.subjectId(),
-                    tx.taskNameHash());
+                boolean hasRemoteNodes = false;
 
-                int idx = 0;
+                // Assign keys to primary nodes.
+                if (!F.isEmpty(reads)) {
+                    for (IgniteTxEntry<K, V> read : reads)
+                        hasRemoteNodes |= map(tx.entry(read.txKey()), 
futDhtMap, futNearMap);
+                }
 
-                for (IgniteTxEntry<K, V> entry : dhtMapping.writes()) {
-                    try {
-                        GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, 
V>)entry.cached();
+                if (!F.isEmpty(writes)) {
+                    for (IgniteTxEntry<K, V> write : writes)
+                        hasRemoteNodes |= map(tx.entry(write.txKey()), 
futDhtMap, futNearMap);
+                }
 
-                        GridCacheMvccCandidate<K> added = 
cached.candidate(version());
+                tx.needsCompletedVersions(hasRemoteNodes);
+            }
 
-                        assert added != null || entry.groupLockEntry() : "Null 
candidate for non-group-lock entry " +
-                            "[added=" + added + ", entry=" + entry + ']';
-                        assert added == null || added.dhtLocal() : "Got 
non-dht-local candidate for prepare future" +
-                            "[added=" + added + ", entry=" + entry + ']';
+            if (isDone())
+                return;
 
-                        if (added.ownerVersion() != null)
-                            req.owned(entry.txKey(), added.ownerVersion());
+            if (last) {
+                assert tx.transactionNodes() != null;
 
-                        req.invalidateNearEntry(idx, cached.readerId(n.id()) 
!= null);
+                // Create mini futures.
+                for (GridDistributedTxMapping<K, V> dhtMapping : 
tx.dhtMap().values()) {
+                    assert !dhtMapping.empty();
 
-                        if (cached.isNewLocked())
-                            req.markKeyForPreload(idx);
+                    ClusterNode n = dhtMapping.node();
 
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        assert false : "Got removed exception on entry with 
dht local candidate: " + entry;
-                    }
+                    assert !n.isLocal();
 
-                    idx++;
-                }
+                    GridDistributedTxMapping<K, V> nearMapping = 
tx.nearMap().get(n.id());
 
-                if (!F.isEmpty(nearWrites)) {
-                    for (IgniteTxEntry<K, V> entry : nearWrites) {
-                        try {
-                            GridCacheMvccCandidate<K> added = 
entry.cached().candidate(version());
-
-                            assert added != null;
-                            assert added.dhtLocal();
-
-                            if (added.ownerVersion() != null)
-                                req.owned(entry.txKey(), added.ownerVersion());
-
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            assert false : "Got removed exception on entry 
with dht local candidate: " + entry;
-                        }
-                    }
-                }
+                    Collection<IgniteTxEntry<K, V>> nearWrites = nearMapping 
== null ? null : nearMapping.writes();
 
-                //noinspection TryWithIdenticalCatches
-                try {
-                    cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : 
SYSTEM_POOL);
-                }
-                catch (ClusterTopologyException e) {
-                    fut.onResult(e);
-                }
-                catch (IgniteCheckedException e) {
-                    fut.onResult(e);
-                }
-            }
+                    Collection<IgniteTxEntry<K, V>> dhtWrites = 
dhtMapping.writes();
 
-            for (GridDistributedTxMapping<K, V> nearMapping : 
futNearMap.values()) {
-                if (!futDhtMap.containsKey(nearMapping.node().id())) {
-                    assert nearMapping.writes() != null;
+                    if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
+                        continue;
 
-                    MiniFuture fut = new MiniFuture(nearMapping.node().id(), 
null, nearMapping);
+                    MiniFuture fut = new MiniFuture(n.id(), 
dhtMap.get(n.id()), nearMap.get(n.id()));
 
                     add(fut); // Append new future.
 
+                    assert txNodes != null;
+
                     GridDhtTxPrepareRequest<K, V> req = new 
GridDhtTxPrepareRequest<>(
                         futId,
                         fut.futureId(),
                         tx.topologyVersion(),
                         tx,
-                        null,
-                        nearMapping.writes(),
+                        dhtWrites,
+                        nearWrites,
                         tx.groupLockKey(),
                         tx.partitionLock(),
-                        null,
+                        txNodes,
                         tx.nearXidVersion(),
-                        false,
+                        true,
+                        tx.onePhaseCommit(),
+                        lastBackup(n.id()),
                         tx.subjectId(),
                         tx.taskNameHash());
 
-                    for (IgniteTxEntry<K, V> entry : nearMapping.writes()) {
+                    int idx = 0;
+
+                    for (IgniteTxEntry<K, V> entry : dhtWrites) {
                         try {
-                            GridCacheMvccCandidate<K> added = 
entry.cached().candidate(version());
+                            GridDhtCacheEntry<K, V> cached = 
(GridDhtCacheEntry<K, V>)entry.cached();
 
-                            assert added != null || entry.groupLockEntry() : 
"Null candidate for non-group-lock entry " +
-                                "[added=" + added + ", entry=" + entry + ']';
-                            assert added == null || added.dhtLocal() : "Got 
non-dht-local candidate for prepare future" +
-                                "[added=" + added + ", entry=" + entry + ']';
+                            if (entry.explicitVersion() == null) {
+                                GridCacheMvccCandidate<K> added = 
cached.candidate(version());
 
-                            if (added != null && added.ownerVersion() != null)
-                                req.owned(entry.txKey(), added.ownerVersion());
+                                assert added != null || entry.groupLockEntry() 
: "Null candidate for non-group-lock entry " +
+                                    "[added=" + added + ", entry=" + entry + 
']';
+                                assert added == null || added.dhtLocal() : 
"Got non-dht-local candidate for prepare future" +
+                                    "[added=" + added + ", entry=" + entry + 
']';
+
+                                if (added != null && added.ownerVersion() != 
null)
+                                    req.owned(entry.txKey(), 
added.ownerVersion());
+                            }
+
+                            // Do not invalidate near entry on originating 
transaction node.
+                            req.invalidateNearEntry(idx, 
!tx.nearNodeId().equals(n.id()) &&
+                                cached.readerId(n.id()) != null);
+
+                            if (cached.isNewLocked())
+                                req.markKeyForPreload(idx);
 
                             break;
                         }
                         catch (GridCacheEntryRemovedException ignore) {
                             assert false : "Got removed exception on entry 
with dht local candidate: " + entry;
                         }
+
+                        idx++;
+                    }
+
+                    if (!F.isEmpty(nearWrites)) {
+                        for (IgniteTxEntry<K, V> entry : nearWrites) {
+                            try {
+                                GridCacheMvccCandidate<K> added = 
entry.cached().candidate(version());
+
+                                assert added != null;
+                                assert added.dhtLocal();
+
+                                if (added.ownerVersion() != null)
+                                    req.owned(entry.txKey(), 
added.ownerVersion());
+
+                                break;
+                            }
+                            catch (GridCacheEntryRemovedException ignore) {
+                                assert false : "Got removed exception on entry 
with dht local candidate: " + entry;
+                            }
+                        }
                     }
 
+                    assert req.transactionNodes() != null;
+
                     //noinspection TryWithIdenticalCatches
                     try {
-                        cctx.io().send(nearMapping.node(), req, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
+                        cctx.io().send(n, req, tx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
                     }
                     catch (ClusterTopologyException e) {
                         fut.onResult(e);
@@ -751,6 +868,64 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
                         fut.onResult(e);
                     }
                 }
+
+                for (GridDistributedTxMapping<K, V> nearMapping : 
tx.nearMap().values()) {
+                    if (!tx.dhtMap().containsKey(nearMapping.node().id())) {
+                        assert nearMapping.writes() != null;
+
+                        MiniFuture fut = new 
MiniFuture(nearMapping.node().id(), null, nearMapping);
+
+                        add(fut); // Append new future.
+
+                        GridDhtTxPrepareRequest<K, V> req = new 
GridDhtTxPrepareRequest<>(
+                            futId,
+                            fut.futureId(),
+                            tx.topologyVersion(),
+                            tx,
+                            null,
+                            nearMapping.writes(),
+                            tx.groupLockKey(),
+                            tx.partitionLock(),
+                            tx.transactionNodes(),
+                            tx.nearXidVersion(),
+                            true,
+                            tx.onePhaseCommit(),
+                            tx.subjectId(),
+                            tx.taskNameHash());
+
+                        for (IgniteTxEntry<K, V> entry : nearMapping.writes()) 
{
+                            try {
+                                GridCacheMvccCandidate<K> added = 
entry.cached().candidate(version());
+
+                                assert added != null || entry.groupLockEntry() 
: "Null candidate for non-group-lock entry " +
+                                    "[added=" + added + ", entry=" + entry + 
']';
+                                assert added == null || added.dhtLocal() : 
"Got non-dht-local candidate for prepare future" +
+                                    "[added=" + added + ", entry=" + entry + 
']';
+
+                                if (added != null && added.ownerVersion() != 
null)
+                                    req.owned(entry.txKey(), 
added.ownerVersion());
+
+                                break;
+                            }
+                            catch (GridCacheEntryRemovedException ignore) {
+                                assert false : "Got removed exception on entry 
with dht local candidate: " + entry;
+                            }
+                        }
+
+                        assert req.transactionNodes() != null;
+
+                        //noinspection TryWithIdenticalCatches
+                        try {
+                            cctx.io().send(nearMapping.node(), req, 
tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                        }
+                        catch (ClusterTopologyException e) {
+                            fut.onResult(e);
+                        }
+                        catch (IgniteCheckedException e) {
+                            fut.onResult(e);
+                        }
+                    }
+                }
             }
         }
         finally {
@@ -976,8 +1151,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
             else {
                 // Process evicted readers (no need to remap).
                 if (nearMapping != null && !F.isEmpty(res.nearEvicted())) {
-                    nearMapping.evictReaders(res.nearEvicted());
-
                     for (IgniteTxEntry<K, V> entry : nearMapping.entries()) {
                         if (res.nearEvicted().contains(entry.txKey())) {
                             while (true) {
@@ -999,6 +1172,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
                             }
                         }
                     }
+
+                    nearMapping.evictReaders(res.nearEvicted());
                 }
 
                 // Process invalid partitions (no need to remap).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 5430e53..31e563c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -109,6 +109,7 @@ public class GridDhtTxPrepareRequest<K, V> extends 
GridDistributedTxPrepareReque
      * @param txNodes Transaction nodes mapping.
      * @param nearXidVer Near transaction ID.
      * @param last {@code True} if this is last prepare request for node.
+     * @param onePhaseCommit One phase commit flag.
      */
     public GridDhtTxPrepareRequest(
         IgniteUuid futId,
@@ -122,9 +123,10 @@ public class GridDhtTxPrepareRequest<K, V> extends 
GridDistributedTxPrepareReque
         Map<UUID, Collection<UUID>> txNodes,
         GridCacheVersion nearXidVer,
         boolean last,
+        boolean onePhaseCommit,
         UUID subjId,
         int taskNameHash) {
-        super(tx, null, dhtWrites, grpLockKey, partLock, txNodes);
+        super(tx, null, dhtWrites, grpLockKey, partLock, txNodes, 
onePhaseCommit);
 
         assert futId != null;
         assert miniId != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 2ed6262..e260b63 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -50,9 +50,6 @@ public class GridDhtTxRemote<K, V> extends 
GridDistributedTxRemoteAdapter<K, V>
     /** Near transaction ID. */
     private GridCacheVersion nearXidVer;
 
-    /** Transaction nodes mapping (primary node -> related backup nodes). */
-    private Map<UUID, Collection<UUID>> txNodes;
-
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -206,11 +203,6 @@ public class GridDhtTxRemote<K, V> extends 
GridDistributedTxRemoteAdapter<K, V>
         return nearXidVer;
     }
 
-    /** {@inheritDoc} */
-    @Override public Map<UUID, Collection<UUID>> transactionNodes() {
-        return txNodes;
-    }
-
     /**
      * @return Near node ID.
      */
@@ -289,7 +281,6 @@ public class GridDhtTxRemote<K, V> extends 
GridDistributedTxRemoteAdapter<K, V>
      * @param keyBytes Key bytes.
      * @param val Value.
      * @param valBytes Value bytes.
-     * @param drVer Data center replication version.
      * @param entryProcessors Entry processors.
      * @param ttl TTL.
      */
@@ -300,7 +291,6 @@ public class GridDhtTxRemote<K, V> extends 
GridDistributedTxRemoteAdapter<K, V>
         @Nullable V val,
         @Nullable byte[] valBytes,
         @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> 
entryProcessors,
-        @Nullable GridCacheVersion drVer,
         long ttl) {
         checkInternal(key);
 
@@ -316,7 +306,7 @@ public class GridDhtTxRemote<K, V> extends 
GridDistributedTxRemoteAdapter<K, V>
             ttl,
             -1L,
             cached,
-            drVer);
+            null);
 
         txEntry.keyBytes(keyBytes);
         txEntry.valueBytes(valBytes);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 8d0b009..a795f42 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -439,30 +439,6 @@ public final class GridDhtColocatedLockFuture<K, V> 
extends GridCompoundIdentity
         err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? 
null : t);
     }
 
-    /**
-     * @param cached Entry to check.
-     * @return {@code True} if filter passed.
-     */
-    private boolean filter(GridCacheEntryEx<K, V> cached) {
-        try {
-            if (!cctx.isAll(cached, filter)) {
-                if (log.isDebugEnabled())
-                    log.debug("Filter didn't pass for entry (will fail lock): 
" + cached);
-
-                onFailed(true);
-
-                return false;
-            }
-
-            return true;
-        }
-        catch (IgniteCheckedException e) {
-            onError(e);
-
-            return false;
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public boolean cancel() {
         if (onCancelled())
@@ -750,18 +726,6 @@ public final class GridDhtColocatedLockFuture<K, V> 
extends GridCompoundIdentity
 
                                 distributedKeys.add(key);
 
-                                if (inTx() && implicitTx() && mappings.size() 
== 1 && !cctx.writeThrough()) {
-                                    tx.onePhaseCommit(true);
-
-                                    req.onePhaseCommit(true);
-                                }
-
-                                IgniteTxEntry<K, V> writeEntry = tx != null ? 
tx.writeMap().get(txKey) : null;
-
-                                if (writeEntry != null)
-                                    // We are sending entry to remote node, 
clear transfer flag.
-                                    writeEntry.transferRequired(false);
-
                                 if (tx != null)
                                     tx.addKeyMapping(txKey, mapping.node());
 
@@ -770,8 +734,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends 
GridCompoundIdentity
                                     node.isLocal() ? null : 
entry.getOrMarshalKeyBytes(),
                                     retval,
                                     dhtVer, // Include DHT version to match 
remote DHT entry.
-                                    writeEntry,
-                                    inTx() ? tx.entry(txKey).drVersion() : 
null,
                                     cctx);
                             }
 
@@ -1011,9 +973,6 @@ public final class GridDhtColocatedLockFuture<K, V> 
extends GridCompoundIdentity
             if (tx != null) {
                 for (K key : distributedKeys)
                     tx.addKeyMapping(cctx.txKey(key), cctx.localNode());
-
-                if (tx.implicit() && !cctx.writeThrough())
-                    tx.onePhaseCommit(true);
             }
 
             lockLocally(distributedKeys, topVer, null);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 2409335..29f3622 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -874,8 +874,6 @@ public final class GridNearLockFuture<K, V> extends 
GridCompoundIdentityFuture<B
 
                                     distributedKeys.add(key);
 
-                                    IgniteTxEntry<K, V> writeEntry = tx != 
null ? tx.writeMap().get(txKey) : null;
-
                                     if (tx != null)
                                         tx.addKeyMapping(txKey, 
mapping.node());
 
@@ -884,13 +882,7 @@ public final class GridNearLockFuture<K, V> extends 
GridCompoundIdentityFuture<B
                                         node.isLocal() ? null : 
entry.getOrMarshalKeyBytes(),
                                         retval && dhtVer == null,
                                         dhtVer, // Include DHT version to 
match remote DHT entry.
-                                        writeEntry,
-                                        inTx() ? tx.entry(txKey).drVersion() : 
null,
                                         cctx);
-
-                                    // Clear transfer required flag since we 
are sending message.
-                                    if (writeEntry != null)
-                                        writeEntry.transferRequired(false);
                                 }
 
                                 if (cand.reentry())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 37b1b21..d9cb028 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -276,8 +276,6 @@ public class GridNearLockRequest<K, V> extends 
GridDistributedLockRequest<K, V>
      * @param retVal Flag indicating whether value should be returned.
      * @param keyBytes Key bytes.
      * @param dhtVer DHT version.
-     * @param writeEntry Write entry if implicit transaction mapped on one 
node.
-     * @param drVer DR version.
      * @param ctx Context.
      * @throws IgniteCheckedException If failed.
      */
@@ -286,14 +284,12 @@ public class GridNearLockRequest<K, V> extends 
GridDistributedLockRequest<K, V>
         byte[] keyBytes,
         boolean retVal,
         @Nullable GridCacheVersion dhtVer,
-        @Nullable IgniteTxEntry<K, V> writeEntry,
-        @Nullable GridCacheVersion drVer,
         GridCacheContext<K, V> ctx
     ) throws IgniteCheckedException {
         dhtVers[idx] = dhtVer;
 
         // Delegate to super.
-        addKeyBytes(key, keyBytes, writeEntry, retVal, null, drVer, ctx);
+        addKeyBytes(key, keyBytes, retVal, 
(Collection<GridCacheMvccCandidate<K>>)null, ctx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 9942e68..62421f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -263,7 +263,6 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
                 byte[] bytes = !keyBytes.isEmpty() ? keyBytes.get(i) : null;
 
                 Collection<GridCacheMvccCandidate<K>> cands = 
req.candidatesByIndex(i);
-                GridCacheVersion drVer = req.drVersionByIndex(i);
 
                 if (log.isDebugEnabled())
                     log.debug("Unmarshalled key: " + key);
@@ -312,7 +311,7 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
                                 }
 
                                 tx.addEntry(ctx, txKey, bytes, 
GridCacheOperation.NOOP, /*Value.*/null,
-                                    /*Value byts.*/null, drVer);
+                                    /*Value byts.*/null, /*dr version*/null);
                             }
 
                             // Add remote candidate before reordering.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index f63103e..0f7aed3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -357,8 +357,6 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
             null,
             null,
             tx.size(),
-            commit && tx.pessimistic() ? m.writes() : null,
-            commit && tx.pessimistic() ? F.view(tx.writeEntries(), CU.<K, 
V>transferRequired()) : null,
             tx.subjectId(),
             tx.taskNameHash()
         );

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index b84e724..48f72e2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -78,8 +78,6 @@ public class GridNearTxFinishRequest<K, V> extends 
GridDistributedTxFinishReques
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
      * @param txSize Expected transaction size.
-     * @param writeEntries Write entries.
-     * @param recoverEntries Recover entries.
      */
     public GridNearTxFinishRequest(
         IgniteUuid futId,
@@ -97,12 +95,10 @@ public class GridNearTxFinishRequest<K, V> extends 
GridDistributedTxFinishReques
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers,
         int txSize,
-        Collection<IgniteTxEntry<K, V>> writeEntries,
-        Collection<IgniteTxEntry<K, V>> recoverEntries,
         @Nullable UUID subjId,
         int taskNameHash) {
         super(xidVer, futId, null, threadId, commit, invalidate, sys, 
syncCommit, syncRollback, baseVer, committedVers,
-            rolledbackVers, txSize, writeEntries, recoverEntries, null);
+            rolledbackVers, txSize, null);
 
         this.explicitLock = explicitLock;
         this.storeEnabled = storeEnabled;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f790bee..21e804d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -374,11 +374,6 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
         return mappings;
     }
 
-    /** {@inheritDoc} */
-    @Override public Collection<IgniteTxEntry<K, V>> recoveryWrites() {
-        return F.view(writeEntries(), CU.<K, V>transferRequired());
-    }
-
     /**
      * @param nodeId Node ID.
      * @param dhtVer DHT version.
@@ -499,7 +494,6 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
      * @param mapQueue Mappings queue.
      */
     void removeKeysMapping(UUID failedNodeId, 
Iterable<GridDistributedTxMapping<K, V>> mapQueue) {
-        assert optimistic();
         assert failedNodeId != null;
         assert mapQueue != null;
 
@@ -572,9 +566,10 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
 
                 try {
                     // Handle explicit locks.
-                    GridCacheVersion base = txEntry.explicitVersion() != null 
? txEntry.explicitVersion() : xidVer;
+                    GridCacheVersion explicit = txEntry.explicitVersion();
 
-                    entry.readyNearLock(base, mapping.dhtVersion(), 
committedVers, rolledbackVers, pendingVers);
+                    if (explicit == null)
+                        entry.readyNearLock(xidVer, mapping.dhtVersion(), 
committedVers, rolledbackVers, pendingVers);
 
                     break;
                 }
@@ -694,12 +689,11 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() {
-        IgniteFuture<IgniteTxEx<K, V>> fut = prepFut.get();
+        GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, 
V>)prepFut.get();
 
         if (fut == null) {
             // Future must be created before any exception can be thrown.
-            fut = pessimistic() ? new 
PessimisticPrepareFuture<>(cctx.kernalContext(), this) :
-                new GridNearTxPrepareFuture<>(cctx, this);
+            fut = new GridNearTxPrepareFuture<>(cctx, this);
 
             if (!prepFut.compareAndSet(null, fut))
                 return prepFut.get();
@@ -713,19 +707,17 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
         // For pessimistic mode we don't distribute prepare request and do not 
lock topology version
         // as it was fixed on first lock.
         if (pessimistic()) {
-            PessimisticPrepareFuture<K, V> pessimisticFut = 
(PessimisticPrepareFuture<K, V>)fut;
-
             if (!state(PREPARING)) {
                 if (setRollbackOnly()) {
                     if (timedOut())
-                        pessimisticFut.onError(new 
IgniteTxTimeoutException("Transaction timed out and was " +
+                        fut.onError(new IgniteTxTimeoutException("Transaction 
timed out and was " +
                             "rolled back: " + this));
                     else
-                        pessimisticFut.onError(new 
IgniteCheckedException("Invalid transaction state for prepare [state=" +
+                        fut.onError(new IgniteCheckedException("Invalid 
transaction state for prepare [state=" +
                             state() + ", tx=" + this + ']'));
                 }
                 else
-                    pessimisticFut.onError(new 
IgniteTxRollbackException("Invalid transaction state for prepare " +
+                    fut.onError(new IgniteTxRollbackException("Invalid 
transaction state for prepare " +
                         "[state=" + state() + ", tx=" + this + ']'));
 
                 return fut;
@@ -734,26 +726,18 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
             try {
                 userPrepare();
 
-                if (!state(PREPARED)) {
-                    setRollbackOnly();
-
-                    pessimisticFut.onError(new IgniteCheckedException("Invalid 
transaction state for commit [state=" +
-                        state() + ", tx=" + this + ']'));
+                // Make sure to add future before calling prepare.
+                cctx.mvcc().addFuture(fut);
 
-                    return fut;
-                }
-
-                pessimisticFut.complete();
+                fut.prepare();
             }
             catch (IgniteCheckedException e) {
-                pessimisticFut.onError(e);
+                fut.onError(e);
             }
         }
         else {
             // In optimistic mode we must wait for topology map update.
-            GridNearTxPrepareFuture<K, V> pf = (GridNearTxPrepareFuture<K, 
V>)prepFut.get();
-
-            pf.prepare();
+            fut.prepare();
         }
 
         return fut;
@@ -892,8 +876,6 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
     public IgniteFuture<IgniteTxEx<K, V>> prepareAsyncLocal(@Nullable 
Collection<IgniteTxEntry<K, V>> reads,
         @Nullable Collection<IgniteTxEntry<K, V>> writes, Map<UUID, 
Collection<UUID>> txNodes, boolean last,
         Collection<UUID> lastBackups) {
-        assert optimistic();
-
         if (state() != PREPARING) {
             if (timedOut())
                 return new GridFinishedFuture<>(cctx.kernalContext(),
@@ -908,7 +890,7 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
         init();
 
         GridDhtTxPrepareFuture<K, V> fut = new GridDhtTxPrepareFuture<>(cctx, 
this, IgniteUuid.randomUuid(),
-            Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), last, 
lastBackups);
+            Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), last, 
needReturnValue() && implicit(), lastBackups);
 
         try {
             // At this point all the entries passed in must be enlisted in 
transaction because this is an

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 4b91ff4..e6dc3e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -255,6 +255,13 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
     }
 
     /**
+     * @param e Error.
+     */
+    void onError(Throwable e) {
+        onError(null, null, e);
+    }
+
+    /**
      * @param nodeId Sender.
      * @param res Result.
      */
@@ -323,65 +330,69 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
      * Waits for topology exchange future to be ready and then prepares user 
transaction.
      */
     public void prepare() {
-        GridDhtTopologyFuture topFut = topologyReadLock();
+        if (tx.optimistic()) {
+            GridDhtTopologyFuture topFut = topologyReadLock();
 
-        try {
-            if (topFut.isDone()) {
-                try {
-                    if (!tx.state(PREPARING)) {
-                        if (tx.setRollbackOnly()) {
-                            if (tx.timedOut())
-                                onError(null, null, new 
IgniteTxTimeoutException("Transaction timed out and " +
-                                    "was rolled back: " + this));
+            try {
+                if (topFut.isDone()) {
+                    try {
+                        if (!tx.state(PREPARING)) {
+                            if (tx.setRollbackOnly()) {
+                                if (tx.timedOut())
+                                    onError(null, null, new 
IgniteTxTimeoutException("Transaction timed out and " +
+                                        "was rolled back: " + this));
+                                else
+                                    onError(null, null, new 
IgniteCheckedException("Invalid transaction state for prepare " +
+                                        "[state=" + tx.state() + ", tx=" + 
this + ']'));
+                            }
                             else
-                                onError(null, null, new 
IgniteCheckedException("Invalid transaction state for prepare " +
-                                    "[state=" + tx.state() + ", tx=" + this + 
']'));
+                                onError(null, null, new 
IgniteTxRollbackException("Invalid transaction state for " +
+                                    "prepare [state=" + tx.state() + ", tx=" + 
this + ']'));
+
+                            return;
                         }
-                        else
-                            onError(null, null, new 
IgniteTxRollbackException("Invalid transaction state for " +
-                                "prepare [state=" + tx.state() + ", tx=" + 
this + ']'));
 
-                        return;
-                    }
+                        GridDiscoveryTopologySnapshot snapshot = 
topFut.topologySnapshot();
 
-                    GridDiscoveryTopologySnapshot snapshot = 
topFut.topologySnapshot();
+                        tx.topologyVersion(snapshot.topologyVersion());
+                        tx.topologySnapshot(snapshot);
 
-                    tx.topologyVersion(snapshot.topologyVersion());
-                    tx.topologySnapshot(snapshot);
+                        // Make sure to add future before calling prepare.
+                        cctx.mvcc().addFuture(this);
 
-                    // Make sure to add future before calling prepare.
-                    cctx.mvcc().addFuture(this);
+                        prepare0();
+                    }
+                    catch (IgniteTxTimeoutException | 
IgniteTxOptimisticException e) {
+                        onError(cctx.localNodeId(), null, e);
+                    }
+                    catch (IgniteCheckedException e) {
+                        tx.setRollbackOnly();
 
-                    prepare0();
-                }
-                catch (IgniteTxTimeoutException | IgniteTxOptimisticException 
e) {
-                    onError(cctx.localNodeId(), null, e);
-                }
-                catch (IgniteCheckedException e) {
-                    tx.setRollbackOnly();
+                        String msg = "Failed to prepare transaction (will 
attempt rollback): " + this;
 
-                    String msg = "Failed to prepare transaction (will attempt 
rollback): " + this;
+                        U.error(log, msg, e);
 
-                    U.error(log, msg, e);
+                        tx.rollbackAsync();
 
-                    tx.rollbackAsync();
+                        onError(null, null, new IgniteTxRollbackException(msg, 
e));
+                    }
+                }
+                else {
+                    topFut.syncNotify(false);
 
-                    onError(null, null, new IgniteTxRollbackException(msg, e));
+                    topFut.listenAsync(new CI1<IgniteFuture<Long>>() {
+                        @Override public void apply(IgniteFuture<Long> t) {
+                            prepare();
+                        }
+                    });
                 }
             }
-            else {
-                topFut.syncNotify(false);
-
-                topFut.listenAsync(new CI1<IgniteFuture<Long>>() {
-                    @Override public void apply(IgniteFuture<Long> t) {
-                        prepare();
-                    }
-                });
+            finally {
+                topologyReadUnlock();
             }
         }
-        finally {
-            topologyReadUnlock();
-        }
+        else
+            preparePessimistic();
     }
 
     /**
@@ -472,22 +483,24 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
 
         assert topVer > 0;
 
-        for (int cacheId : tx.activeCacheIds()) {
-            GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
-            if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
-                onDone(new ClusterTopologyException("Failed to map keys for 
cache (all " +
-                    "partition nodes left the grid): " + cacheCtx.name()));
-
-                return;
-            }
-        }
-
         txMapping = new GridDhtTxMapping<>();
 
         ConcurrentLinkedDeque8<GridDistributedTxMapping<K, V>> mappings =
             new ConcurrentLinkedDeque8<>();
 
+        if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+            for (int cacheId : tx.activeCacheIds()) {
+                GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+                if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
+                    onDone(new ClusterTopologyException("Failed to map keys 
for cache (all " +
+                        "partition nodes left the grid): " + cacheCtx.name()));
+
+                    return;
+                }
+            }
+        }
+
         // Assign keys to primary nodes.
         GridDistributedTxMapping<K, V> cur = null;
 
@@ -538,10 +551,169 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
 
         txMapping.initLast(mappings);
 
+        tx.transactionNodes(txMapping.transactionNodes());
+
+        checkOnePhase();
+
         proceedPrepare(mappings);
     }
 
     /**
+     *
+     */
+    private void preparePessimistic() {
+        Map<ClusterNode, GridDistributedTxMapping<K, V>> mappings = new 
HashMap<>();
+
+        long topVer = tx.topologyVersion();
+
+        txMapping = new GridDhtTxMapping<>();
+
+        for (IgniteTxEntry<K, V> txEntry : tx.allEntries()) {
+            GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+            List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), 
topVer);
+
+            ClusterNode primary = F.first(nodes);
+
+            GridDistributedTxMapping<K, V> nodeMapping = mappings.get(primary);
+
+            if (nodeMapping == null) {
+                nodeMapping = new GridDistributedTxMapping<>(primary);
+
+                mappings.put(primary, nodeMapping);
+            }
+
+            txEntry.nodeId(primary.id());
+
+            nodeMapping.add(txEntry);
+
+            txMapping.addMapping(nodes);
+        }
+
+        tx.transactionNodes(txMapping.transactionNodes());
+
+        checkOnePhase();
+
+        for (final GridDistributedTxMapping<K, V> m : mappings.values()) {
+            final ClusterNode node = m.node();
+
+            GridNearTxPrepareRequest<K, V> req = new 
GridNearTxPrepareRequest<>(
+                futId,
+                tx.topologyVersion(),
+                tx,
+                tx.optimistic() && tx.serializable() ? m.reads() : null,
+                m.writes(),
+                    /*grp lock key*/null,
+                    /*part lock*/false,
+                tx.syncCommit(),
+                tx.syncRollback(),
+                txMapping.transactionNodes(),
+                true,
+                txMapping.transactionNodes().get(node.id()),
+                tx.onePhaseCommit(),
+                tx.needReturnValue() && tx.implicit(),
+                tx.implicitSingle(),
+                tx.subjectId(),
+                tx.taskNameHash());
+
+            for (IgniteTxEntry<K, V> txEntry : m.writes()) {
+                assert txEntry.cached().detached() : "Expected detached entry 
while preparing transaction " +
+                    "[locNodeId=" + cctx.localNodeId() +
+                    ", txEntry=" + txEntry + ']';
+
+                if (txEntry.op() == TRANSFORM)
+                    req.addDhtVersion(txEntry.txKey(), null);
+            }
+
+            if (node.isLocal()) {
+                IgniteFuture<IgniteTxEx<K, V>> fut = 
cctx.tm().txHandler().prepareTx(node.id(), tx, req);
+
+                // Add new future.
+                add(new GridEmbeddedFuture<>(
+                    cctx.kernalContext(),
+                    fut,
+                    new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() {
+                        @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, 
V> t, Exception ex) {
+                            if (ex != null) {
+                                onError(node.id(), null, ex);
+
+                                return t;
+                            }
+
+                            IgniteTxLocalEx<K, V> dhtTx = (IgniteTxLocalEx<K, 
V>)t;
+
+                            Collection<Integer> invalidParts = 
dhtTx.invalidPartitions();
+
+                            assert F.isEmpty(invalidParts);
+
+                            if (!m.empty()) {
+                                for (IgniteTxEntry<K, V> writeEntry : 
m.entries()) {
+                                    IgniteTxKey<K> key = writeEntry.txKey();
+
+                                    IgniteTxEntry<K, V> dhtTxEntry = 
dhtTx.entry(key);
+
+                                    if (dhtTxEntry.op() == NOOP)
+                                        tx.entry(key).op(NOOP);
+                                }
+
+                                tx.addDhtVersion(m.node().id(), 
dhtTx.xidVersion());
+
+                                m.dhtVersion(dhtTx.xidVersion());
+
+                                GridCacheVersion min = dhtTx.minVersion();
+
+                                IgniteTxManager<K, V> tm = 
cctx.near().dht().context().tm();
+
+                                tx.readyNearLocks(m, 
Collections.<GridCacheVersion>emptyList(),
+                                    tm.committedVersions(min), 
tm.rolledbackVersions(min));
+                            }
+
+                            
tx.implicitSingleResult(dhtTx.implicitSingleResult());
+
+                            return tx;
+                        }
+                    }
+                ));
+            }
+            else {
+                MiniFuture fut = new MiniFuture(m, null);
+
+                req.miniId(fut.futureId());
+
+                add(fut); // Append new future.
+
+                try {
+                    cctx.io().send(node, req);
+                }
+                catch (IgniteCheckedException e) {
+                    // Fail the whole thing.
+                    fut.onResult(e);
+                }
+            }
+        }
+
+        markInitialized();
+    }
+
+    /**
+     * Checks if mapped transaction can be committed on one phase.
+     * One-phase commit can be done if transaction maps to one primary node 
and not more than one backup.
+     */
+    private void checkOnePhase() {
+        if (cctx.isStoreEnabled())
+            return;
+
+        Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();
+
+        if (map.size() == 1) {
+            Collection<UUID> backups = F.firstEntry(map).getValue();
+
+            if (backups.size() <= 1)
+                tx.onePhaseCommit(true);
+        }
+    }
+
+    /**
      * Continues prepare after previous mapping successfully finished.
      *
      * @param mappings Queue of mappings.
@@ -571,6 +743,9 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
             txMapping.transactionNodes(),
             m.last(),
             m.lastBackups(),
+            tx.onePhaseCommit(),
+            tx.needReturnValue() && tx.implicit(),
+            tx.implicitSingle(),
             tx.subjectId(),
             tx.taskNameHash());
 
@@ -618,7 +793,18 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
 
                         assert F.isEmpty(invalidParts);
 
+                        tx.implicitSingleResult(dhtTx.implicitSingleResult());
+
                         if (!m.empty()) {
+                            for (IgniteTxEntry<K, V> writeEntry : m.entries()) 
{
+                                IgniteTxKey<K> key = writeEntry.txKey();
+
+                                IgniteTxEntry<K, V> dhtTxEntry = 
dhtTx.entry(key);
+
+                                if (dhtTxEntry.op() == NOOP)
+                                    tx.entry(key).op(NOOP);
+                            }
+
                             tx.addDhtVersion(m.node().id(), 
dhtTx.xidVersion());
 
                             m.dhtVersion(dhtTx.xidVersion());
@@ -872,6 +1058,17 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
                         }
                     }
 
+                    if (tx.implicitSingle())
+                        tx.implicitSingleResult(res.returnValue());
+
+                    for (IgniteTxKey<K> key : res.filterFailedKeys()) {
+                        IgniteTxEntry<K, V> txEntry = tx.entry(key);
+
+                        assert txEntry != null : "Missing tx entry for write 
key: " + key;
+
+                        txEntry.op(NOOP);
+                    }
+
                     if (!m.empty()) {
                         // Register DHT version.
                         tx.addDhtVersion(m.node().id(), res.dhtVersion());
@@ -882,7 +1079,8 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
                     }
 
                     // Proceed prepare before finishing mini future.
-                    proceedPrepare(mappings);
+                    if (mappings != null)
+                        proceedPrepare(mappings);
 
                     // Finish this mini future.
                     onDone(tx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 8aecfe0..045bcf9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed.near;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.util.direct.*;
@@ -55,8 +56,15 @@ public class GridNearTxPrepareRequest<K, V> extends 
GridDistributedTxPrepareRequ
 
     /** IDs of backup nodes receiving last prepare request during this 
prepare. */
     @GridDirectCollection(UUID.class)
+    @GridToStringInclude
     private Collection<UUID> lastBackups;
 
+    /** Need return value flag. */
+    private boolean retVal;
+
+    /** Implicit single flag. */
+    private boolean implicitSingle;
+
     /** Subject ID. */
     @GridDirectVersion(1)
     private UUID subjId;
@@ -99,10 +107,13 @@ public class GridNearTxPrepareRequest<K, V> extends 
GridDistributedTxPrepareRequ
         Map<UUID, Collection<UUID>> txNodes,
         boolean last,
         Collection<UUID> lastBackups,
+        boolean onePhaseCommit,
+        boolean retVal,
+        boolean implicitSingle,
         @Nullable UUID subjId,
         int taskNameHash
     ) {
-        super(tx, reads, writes, grpLockKey, partLock, txNodes);
+        super(tx, reads, writes, grpLockKey, partLock, txNodes, 
onePhaseCommit);
 
         assert futId != null;
 
@@ -111,6 +122,8 @@ public class GridNearTxPrepareRequest<K, V> extends 
GridDistributedTxPrepareRequ
         this.near = near;
         this.last = last;
         this.lastBackups = lastBackups;
+        this.retVal = retVal;
+        this.implicitSingle = implicitSingle;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
     }
@@ -172,6 +185,20 @@ public class GridNearTxPrepareRequest<K, V> extends 
GridDistributedTxPrepareRequ
     }
 
     /**
+     * @return Whether return value is requested.
+     */
+    public boolean returnValue() {
+        return retVal;
+    }
+
+    /**
+     * @return Implicit single flag.
+     */
+    public boolean implicitSingle() {
+        return implicitSingle;
+    }
+
+    /**
      * @return Topology version.
      */
     @Override public long topologyVersion() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 7024e70..8cd1f12 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -69,6 +69,20 @@ public class GridNearTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareRes
     @GridDirectCollection(byte[].class)
     private Collection<byte[]> ownedValsBytes;
 
+    /** Cache return value. */
+    @GridDirectTransient
+    private GridCacheReturn<V> retVal;
+
+    /** Return value bytes. */
+    private byte[] retValBytes;
+
+    /** Filter failed keys. */
+    @GridDirectTransient
+    private Collection<K> filterFailedKeys;
+
+    /** Filter failed key bytes. */
+    private byte[] filterFailedKeyBytes;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -84,8 +98,15 @@ public class GridNearTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareRes
      * @param invalidParts Invalid partitions.
      * @param err Error.
      */
-    public GridNearTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, 
IgniteUuid miniId, GridCacheVersion dhtVer,
-        Collection<Integer> invalidParts, Throwable err) {
+    public GridNearTxPrepareResponse(
+        GridCacheVersion xid,
+        IgniteUuid futId,
+        IgniteUuid miniId,
+        GridCacheVersion dhtVer,
+        Collection<Integer> invalidParts,
+        GridCacheReturn<V> retVal,
+        Throwable err
+    ) {
         super(xid, err);
 
         assert futId != null;
@@ -96,6 +117,7 @@ public class GridNearTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareRes
         this.miniId = miniId;
         this.dhtVer = dhtVer;
         this.invalidParts = invalidParts;
+        this.retVal = retVal;
     }
 
     /**
@@ -146,6 +168,9 @@ public class GridNearTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareRes
      * @param valBytes Value bytes.
      */
     public void addOwnedValue(IgniteTxKey<K> key, GridCacheVersion ver, V val, 
byte[] valBytes) {
+        if (val == null && valBytes == null)
+            return;
+
         if (ownedVals == null)
             ownedVals = new HashMap<>();
 
@@ -161,6 +186,27 @@ public class GridNearTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareRes
     }
 
     /**
+     * @return Return value.
+     */
+    public GridCacheReturn<V> returnValue() {
+        return retVal;
+    }
+
+    /**
+     * @param filterFailedKeys Collection of keys that did not pass the filter.
+     */
+    public void filterFailedKeys(Collection<K> filterFailedKeys) {
+        this.filterFailedKeys = filterFailedKeys;
+    }
+
+    /**
+     * @return Collection of keys that did not pass the filter.
+     */
+    public Collection<K> filterFailedKeys() {
+        return filterFailedKeys == null ? Collections.<K>emptyList() : 
filterFailedKeys;
+    }
+
+    /**
      * @param key Key.
      * @return {@code True} if response has owned value for given key.
      */
@@ -203,6 +249,13 @@ public class GridNearTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareRes
                 
ownedValsBytes.add(ctx.marshaller().marshal(F.t(entry.getKey(), tup.get1(), 
valBytes, rawBytes)));
             }
         }
+
+
+        if (retValBytes == null && retVal != null)
+            retValBytes = ctx.marshaller().marshal(retVal);
+
+        if (filterFailedKeyBytes == null && filterFailedKeys != null)
+            filterFailedKeyBytes = ctx.marshaller().marshal(filterFailedKeys);
     }
 
     /** {@inheritDoc} */
@@ -220,6 +273,12 @@ public class GridNearTxPrepareResponse<K, V> extends 
GridDistributedTxPrepareRes
                 ownedVals.put(tup.get1(), F.t(tup.get2(), val, tup.get4() ? 
null : tup.get3()));
             }
         }
+
+        if (retVal == null && retValBytes != null)
+            retVal = ctx.marshaller().unmarshal(retValBytes, ldr);
+
+        if (filterFailedKeys == null && filterFailedKeyBytes != null)
+            filterFailedKeys = 
ctx.marshaller().unmarshal(filterFailedKeyBytes, ldr);
     }
 
     /** {@inheritDoc} */

Reply via email to