#ingite-9655 - Manual merge of changes.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0354a41b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0354a41b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0354a41b

Branch: refs/heads/sprint-1
Commit: 0354a41b94e891273a06587c6cea95aaee15d995
Parents: 31ce585
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Wed Jan 28 18:48:49 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Wed Jan 28 18:48:49 2015 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  34 +-
 .../processors/cache/GridCacheUtils.java        |  15 -
 .../distributed/GridCacheCommittedTxInfo.java   |   2 -
 .../GridCachePerThreadTxCommitBuffer.java       | 185 ------
 ...dCachePessimisticCheckCommittedTxFuture.java | 380 ------------
 ...CachePessimisticCheckCommittedTxRequest.java | 292 ---------
 ...achePessimisticCheckCommittedTxResponse.java | 231 -------
 .../distributed/GridCacheTxCommitBuffer.java    |  60 --
 .../distributed/GridDistributedLockRequest.java |  83 ---
 .../GridDistributedTxFinishRequest.java         |  90 +--
 .../GridDistributedTxPrepareRequest.java        |  49 +-
 .../GridDistributedTxRemoteAdapter.java         |  53 +-
 .../distributed/dht/GridDhtLockFuture.java      |  41 +-
 .../distributed/dht/GridDhtLockRequest.java     |   6 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  19 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |  16 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  70 +--
 .../cache/distributed/dht/GridDhtTxLocal.java   |  21 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  | 108 ++--
 .../distributed/dht/GridDhtTxPrepareFuture.java | 503 ++++++++++-----
 .../dht/GridDhtTxPrepareRequest.java            |   4 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |  12 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  41 --
 .../distributed/near/GridNearLockFuture.java    |   8 -
 .../distributed/near/GridNearLockRequest.java   |   6 +-
 .../near/GridNearTransactionalCache.java        |   3 +-
 .../near/GridNearTxFinishFuture.java            |   2 -
 .../near/GridNearTxFinishRequest.java           |   6 +-
 .../cache/distributed/near/GridNearTxLocal.java |  46 +-
 .../near/GridNearTxPrepareFuture.java           | 308 ++++++++--
 .../near/GridNearTxPrepareRequest.java          |  29 +-
 .../near/GridNearTxPrepareResponse.java         |  63 +-
 .../cache/transactions/IgniteTxAdapter.java     |  31 +-
 .../cache/transactions/IgniteTxEntry.java       |  17 -
 .../cache/transactions/IgniteTxEx.java          |   7 -
 .../cache/transactions/IgniteTxHandler.java     | 608 +++----------------
 .../transactions/IgniteTxLocalAdapter.java      | 118 +++-
 .../cache/transactions/IgniteTxLocalEx.java     |   5 +
 .../cache/transactions/IgniteTxManager.java     | 208 ++-----
 .../cache/transactions/IgniteTxProxyImpl.java   |   7 +
 .../util/GridBoundedConcurrentOrderedMap.java   |  20 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java | 182 +++---
 .../cache/GridCacheAbstractMetricsSelfTest.java |   4 +-
 .../cache/IgnitePutAllLargeBatchSelfTest.java   | 301 +++++++++
 ...tAllUpdateNonPreloadedPartitionSelfTest.java | 129 ++++
 .../distributed/GridCacheEventAbstractTest.java |   2 +-
 ...cOriginatingNodeFailureAbstractSelfTest.java |  29 +-
 ...ssimisticOriginatingNodeFailureSelfTest.java |   2 +-
 .../near/GridCacheNearMetricsSelfTest.java      |   8 +-
 .../near/GridCacheNearReadersSelfTest.java      |   2 +-
 ...ssimisticOriginatingNodeFailureSelfTest.java |   2 +-
 ...ssimisticOriginatingNodeFailureSelfTest.java |   5 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 +
 53 files changed, 1758 insertions(+), 2718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9271e29..216c13c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4190,7 +4190,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
                 true,
                 op.single(),
                 ctx.system(),
-                PESSIMISTIC,
+                OPTIMISTIC,
                 READ_COMMITTED,
                 tCfg.getDefaultTxTimeout(),
                 ctx.hasFlag(INVALIDATE),
@@ -4265,7 +4265,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
                 true,
                 op.single(),
                 ctx.system(),
-                PESSIMISTIC,
+                OPTIMISTIC,
                 READ_COMMITTED,
                 
ctx.kernalContext().config().getTransactionsConfiguration().getDefaultTxTimeout(),
                 ctx.hasFlag(INVALIDATE),
@@ -4301,13 +4301,24 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
         try {
             IgniteFuture fut = holder.future();
 
-            if (fut != null && !fut.isDone()) {
-                final IgniteTxLocalAdapter<K, V> tx0 = tx;
+            final IgniteTxLocalAdapter<K, V> tx0 = tx;
 
+            if (fut != null && !fut.isDone()) {
                 IgniteFuture<T> f = new GridEmbeddedFuture<>(fut,
                     new C2<T, Exception, IgniteFuture<T>>() {
                         @Override public IgniteFuture<T> apply(T t, Exception 
e) {
-                            return op.op(tx0);
+                            return op.op(tx0).chain(new CX1<IgniteFuture<T>, 
T>() {
+                                @Override public T applyx(IgniteFuture<T> 
tFut) throws IgniteCheckedException {
+                                    try {
+                                        return tFut.get();
+                                    }
+                                    catch (IgniteCheckedException e1) {
+                                        tx0.rollbackAsync();
+
+                                        throw e1;
+                                    }
+                                }
+                            });
                         }
                     }, ctx.kernalContext());
 
@@ -4316,7 +4327,18 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
                 return f;
             }
 
-            IgniteFuture<T> f = op.op(tx);
+            IgniteFuture<T> f = op.op(tx).chain(new CX1<IgniteFuture<T>, T>() {
+                @Override public T applyx(IgniteFuture<T> tFut) throws 
IgniteCheckedException {
+                    try {
+                        return tFut.get();
+                    }
+                    catch (IgniteCheckedException e1) {
+                        tx0.rollbackAsync();
+
+                        throw e1;
+                    }
+                }
+            });
 
             saveFuture(holder, f);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 129dc54..1ce00b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -134,13 +134,6 @@ public class GridCacheUtils {
         }
     };
 
-    /** Transfer required predicate. */
-    private static final IgnitePredicate TRANSFER_REQUIRED_PREDICATE = new 
P1<IgniteTxEntry>() {
-        @Override public boolean apply(IgniteTxEntry e) {
-            return e.transferRequired();
-        }
-    };
-
     /** Transaction entry to key. */
     private static final IgniteClosure tx2key = new C1<IgniteTxEntry, 
Object>() {
         @Override public Object apply(IgniteTxEntry e) {
@@ -843,14 +836,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * @return Transfer required predicate.
-     */
-    @SuppressWarnings("unchecked")
-    public static <K, V> IgnitePredicate<IgniteTxEntry<K, V>> 
transferRequired() {
-        return TRANSFER_REQUIRED_PREDICATE;
-    }
-
-    /**
      * Gets type filter for projections.
      *
      * @param keyType Key type.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
index 8231465..23ce77b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
@@ -58,8 +58,6 @@ public class GridCacheCommittedTxInfo<K, V> implements 
Externalizable {
 
         originatingTxId = tx.nearXidVersion();
         originatingNodeId = tx.eventNodeId();
-
-        recoveryWrites = tx.recoveryWrites();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java
deleted file mode 100644
index 0612a5d..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.timeout.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Committed tx buffer which should be used in synchronous commit mode.
- */
-public class GridCachePerThreadTxCommitBuffer<K, V> implements 
GridCacheTxCommitBuffer<K, V> {
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Cache context. */
-    private GridCacheSharedContext<K, V> cctx;
-
-    /** Store map. */
-    private Map<StoreKey, GridCacheCommittedTxInfo<K, V>> infoMap;
-
-    /**
-     * @param cctx Cache context.
-     */
-    public GridCachePerThreadTxCommitBuffer(GridCacheSharedContext<K, V> cctx) 
{
-        this.cctx = cctx;
-
-        log = cctx.logger(GridCachePerThreadTxCommitBuffer.class);
-
-        int logSize = cctx.txConfig().getPessimisticTxLogSize();
-
-        infoMap = logSize > 0 ?
-            new GridBoundedConcurrentLinkedHashMap<StoreKey, 
GridCacheCommittedTxInfo<K, V>>(logSize) :
-            new ConcurrentHashMap8<StoreKey, GridCacheCommittedTxInfo<K, V>>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addCommittedTx(IgniteTxEx<K, V> tx) {
-        long threadId = tx.threadId();
-
-        StoreKey key = new StoreKey(tx.eventNodeId(), threadId);
-
-        if (log.isDebugEnabled())
-            log.debug("Adding committed transaction [locNodeId=" + 
cctx.localNodeId() + ", key=" + key +
-                ", tx=" + tx + ']');
-
-        infoMap.put(key, new GridCacheCommittedTxInfo<>(tx));
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public GridCacheCommittedTxInfo<K, V> 
committedTx(GridCacheVersion originatingTxVer,
-        UUID nodeId, long threadId) {
-        assert originatingTxVer != null;
-
-        StoreKey key = new StoreKey(nodeId, threadId);
-
-        GridCacheCommittedTxInfo<K, V> txInfo = infoMap.get(key);
-
-        if (log.isDebugEnabled())
-            log.debug("Got committed transaction info by key [locNodeId=" + 
cctx.localNodeId() +
-                ", key=" + key + ", originatingTxVer=" + originatingTxVer + ", 
txInfo=" + txInfo + ']');
-
-        if (txInfo == null || 
!originatingTxVer.equals(txInfo.originatingTxId()))
-            return null;
-
-        return txInfo;
-    }
-
-    /**
-     * @param nodeId Left node ID.
-     */
-    @Override public void onNodeLeft(UUID nodeId) {
-        // Clear all node's records after clear interval.
-        cctx.kernalContext().timeout().addTimeoutObject(
-            new 
NodeLeftTimeoutObject(cctx.txConfig().getPessimisticTxLogLinger(), nodeId));
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return infoMap.size();
-    }
-
-    /**
-     * Store key.
-     */
-    private static class StoreKey {
-        /** Node ID which started transaction. */
-        private UUID nodeId;
-
-        /** Thread ID which started transaction. */
-        private long threadId;
-
-        /**
-         * @param nodeId Node ID.
-         * @param threadId Thread ID.
-         */
-        private StoreKey(UUID nodeId, long threadId) {
-            this.nodeId = nodeId;
-            this.threadId = threadId;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        public UUID nodeId() {
-            return nodeId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            StoreKey storeKey = (StoreKey)o;
-
-            return threadId == storeKey.threadId && 
nodeId.equals(storeKey.nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + (int)(threadId ^ (threadId >>> 32));
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        public String toString() {
-            return S.toString(StoreKey.class, this);
-        }
-    }
-
-    /**
-     * Node left timeout object which will clear all committed records from 
left node.
-     */
-    private class NodeLeftTimeoutObject extends GridTimeoutObjectAdapter {
-        /** Left node ID. */
-        private UUID leftNodeId;
-
-        /**
-         * @param timeout Timeout.
-         * @param leftNodeId Left node ID.
-         */
-        protected NodeLeftTimeoutObject(long timeout, UUID leftNodeId) {
-            super(timeout);
-
-            this.leftNodeId = leftNodeId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            Iterator<StoreKey> it = infoMap.keySet().iterator();
-
-            while (it.hasNext()) {
-                StoreKey key = it.next();
-
-                if (leftNodeId.equals(key.nodeId()))
-                    it.remove();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
deleted file mode 100644
index 12b9177..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Future verifying that all remote transactions related to some
- * optimistic transaction were prepared.
- */
-public class GridCachePessimisticCheckCommittedTxFuture<K, V> extends 
GridCompoundIdentityFuture<GridCacheCommittedTxInfo<K, V>>
-    implements GridCacheFuture<GridCacheCommittedTxInfo<K, V>> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Trackable flag. */
-    private boolean trackable = true;
-
-    /** Context. */
-    private final GridCacheSharedContext<K, V> cctx;
-
-    /** Future ID. */
-    private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-    /** Transaction. */
-    private final IgniteTxEx<K, V> tx;
-
-    /** All involved nodes. */
-    private final Map<UUID, ClusterNode> nodes;
-
-    /** ID of failed node started transaction. */
-    private final UUID failedNodeId;
-
-    /** Flag indicating that future checks near node instead of checking all 
topology in case of primary node crash. */
-    private boolean nearCheck;
-
-    /**
-     * @param cctx Context.
-     * @param tx Transaction.
-     * @param failedNodeId ID of failed node started transaction.
-     */
-    @SuppressWarnings("ConstantConditions")
-    public 
GridCachePessimisticCheckCommittedTxFuture(GridCacheSharedContext<K, V> cctx, 
IgniteTxEx<K, V> tx,
-        UUID failedNodeId) {
-        super(cctx.kernalContext(), new SingleReducer<K, V>());
-
-        this.cctx = cctx;
-        this.tx = tx;
-        this.failedNodeId = failedNodeId;
-
-        nodes = new GridLeanMap<>();
-
-        for (ClusterNode node : CU.allNodes(cctx, tx.topologyVersion()))
-            nodes.put(node.id(), node);
-    }
-
-    /**
-     * Initializes future.
-     */
-    public void prepare() {
-        if (log.isDebugEnabled())
-            log.debug("Checking if transaction was committed on remote nodes: 
" + tx);
-
-        // Check local node first (local node can be a backup node for some 
part of this transaction).
-        long originatingThreadId = tx.threadId();
-
-        if (tx instanceof IgniteTxRemoteEx)
-            originatingThreadId = ((IgniteTxRemoteEx)tx).remoteThreadId();
-
-        GridCacheCommittedTxInfo<K, V> txInfo = 
cctx.tm().txCommitted(tx.nearXidVersion(), tx.eventNodeId(),
-            originatingThreadId);
-
-        if (txInfo != null) {
-            onDone(txInfo);
-
-            markInitialized();
-
-            return;
-        }
-
-        Collection<ClusterNode> checkNodes = CU.remoteNodes(cctx, 
tx.topologyVersion());
-
-        if (tx instanceof GridDhtTxRemote) {
-            // If we got primary node failure and near node has not failed.
-            if (tx.nodeId().equals(failedNodeId) && 
!tx.eventNodeId().equals(failedNodeId)) {
-                nearCheck = true;
-
-                ClusterNode nearNode = cctx.discovery().node(tx.eventNodeId());
-
-                if (nearNode == null) {
-                    // Near node failed, separate check prepared future will 
take care of it.
-                    onDone(new ClusterTopologyException("Failed to check near 
transaction state (near node left grid): " +
-                        tx.eventNodeId()));
-
-                    return;
-                }
-
-                checkNodes = Collections.singletonList(nearNode);
-            }
-        }
-
-        for (ClusterNode rmtNode : checkNodes) {
-            // Skip left nodes and local node.
-            if (rmtNode.id().equals(failedNodeId))
-                continue;
-
-            GridCachePessimisticCheckCommittedTxRequest<K, V> req = new 
GridCachePessimisticCheckCommittedTxRequest<>(
-                tx,
-                originatingThreadId, futureId(), nearCheck);
-
-            if (rmtNode.isLocal())
-                add(cctx.tm().checkPessimisticTxCommitted(req));
-            else {
-                MiniFuture fut = new MiniFuture(rmtNode.id());
-
-                req.miniId(fut.futureId());
-
-                add(fut);
-
-                try {
-                    cctx.io().send(rmtNode.id(), req);
-                }
-                catch (ClusterTopologyException ignored) {
-                    fut.onNodeLeft();
-                }
-                catch (IgniteCheckedException e) {
-                    fut.onError(e);
-
-                    break;
-                }
-            }
-        }
-
-        markInitialized();
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param res Response.
-     */
-    public void onResult(UUID nodeId, 
GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
-        if (!isDone()) {
-            for (IgniteFuture<GridCacheCommittedTxInfo<K, V>> fut : pending()) 
{
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
-
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.nodeId().equals(nodeId);
-
-                        f.onResult(res);
-
-                        break;
-                    }
-                }
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return tx.xidVersion();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return nodes.values();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        for (IgniteFuture<?> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.nodeId().equals(nodeId)) {
-                    f.onNodeLeft();
-
-                    return true;
-                }
-            }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return trackable;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        trackable = false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable GridCacheCommittedTxInfo<K, V> 
res, @Nullable Throwable err) {
-        if (super.onDone(res, err)) {
-            cctx.mvcc().removeFuture(this);
-
-            if (log.isDebugEnabled())
-                log.debug("Completing check committed tx future for 
transaction [tx=" + tx + ", res=" + res +
-                    ", err=" + err + ']');
-
-            if (err == null)
-                cctx.tm().finishPessimisticTxOnRecovery(tx, res);
-            else {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to check prepared transactions, " +
-                        "invalidating transaction [err=" + err + ", tx=" + tx 
+ ']');
-
-                if (nearCheck)
-                    return true;
-
-                cctx.tm().salvageTx(tx);
-            }
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCachePessimisticCheckCommittedTxFuture.class, 
this, "super", super.toString());
-    }
-
-    /**
-     *
-     */
-    private class MiniFuture extends 
GridFutureAdapter<GridCacheCommittedTxInfo<K, V>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Mini future ID. */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-        /** Node ID. */
-        private UUID nodeId;
-
-        /**
-         * Empty constructor required by {@link Externalizable}
-         */
-        public MiniFuture() {
-            // No-op.
-        }
-
-        /**
-         * @param nodeId Node ID.
-         */
-        private MiniFuture(UUID nodeId) {
-            super(cctx.kernalContext());
-
-            this.nodeId = nodeId;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        private UUID nodeId() {
-            return nodeId;
-        }
-
-        /**
-         * @return Future ID.
-         */
-        private IgniteUuid futureId() {
-            return futId;
-        }
-
-        /**
-         * @param e Error.
-         */
-        private void onError(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", 
err=" + e + ']');
-
-            onDone(e);
-        }
-
-        /**
-         */
-        private void onNodeLeft() {
-            if (log.isDebugEnabled())
-                log.debug("Transaction node left grid (will ignore) [fut=" + 
this + ']');
-
-            if (nearCheck) {
-                onDone(new ClusterTopologyException("Failed to check near 
transaction state (near node left grid): " +
-                    nodeId));
-
-                return;
-            }
-
-            onDone((GridCacheCommittedTxInfo<K, V>)null);
-        }
-
-        /**
-         * @param res Result callback.
-         */
-        private void onResult(GridCachePessimisticCheckCommittedTxResponse<K, 
V> res) {
-            onDone(res.committedTxInfo());
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MiniFuture.class, this, "done", isDone(), "err", 
error());
-        }
-    }
-
-    /**
-     * Single value reducer.
-     */
-    private static class SingleReducer<K, V> implements
-        IgniteReducer<GridCacheCommittedTxInfo<K, V>, 
GridCacheCommittedTxInfo<K, V>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private AtomicReference<GridCacheCommittedTxInfo<K, V>> collected = 
new AtomicReference<>();
-
-        /** {@inheritDoc} */
-        @Override public boolean collect(@Nullable GridCacheCommittedTxInfo<K, 
V> info) {
-            if (info != null) {
-                collected.compareAndSet(null, info);
-
-                // Stop collecting on first collected info.
-                return false;
-            }
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheCommittedTxInfo<K, V> reduce() {
-            return collected.get();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
deleted file mode 100644
index 012106d..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
-
-/**
- * Message sent to check that transactions related to some pessimistic 
transaction
- * were prepared on remote node.
- */
-public class GridCachePessimisticCheckCommittedTxRequest<K, V> extends 
GridDistributedBaseMessage<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Mini future ID. */
-    private IgniteUuid miniId;
-
-    /** Near transaction ID. */
-    private GridCacheVersion nearXidVer;
-
-    /** Originating node ID. */
-    private UUID originatingNodeId;
-
-    /** Originating thread ID. */
-    private long originatingThreadId;
-
-    /** Flag indicating that this is near-only check. */
-    @GridDirectVersion(1)
-    private boolean nearOnlyCheck;
-
-    /**
-     * Empty constructor required by {@link Externalizable}
-     */
-    public GridCachePessimisticCheckCommittedTxRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param tx Transaction.
-     * @param originatingThreadId Originating thread ID.
-     * @param futId Future ID.
-     */
-    public GridCachePessimisticCheckCommittedTxRequest(IgniteTxEx<K, V> tx, 
long originatingThreadId, IgniteUuid futId,
-        boolean nearOnlyCheck) {
-        super(tx.xidVersion(), 0);
-
-        this.futId = futId;
-        this.nearOnlyCheck = nearOnlyCheck;
-
-        nearXidVer = tx.nearXidVersion();
-        originatingNodeId = tx.eventNodeId();
-        this.originatingThreadId = originatingThreadId;
-    }
-
-    /**
-     * @return Near version.
-     */
-    public GridCacheVersion nearXidVersion() {
-        return nearXidVer;
-    }
-
-    /**
-     * @return Tx originating node ID.
-     */
-    public UUID originatingNodeId() {
-        return originatingNodeId;
-    }
-
-    /**
-     * @return Tx originating thread ID.
-     */
-    public long originatingThreadId() {
-        return originatingThreadId;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Mini future ID.
-     */
-    public IgniteUuid miniId() {
-        return miniId;
-    }
-
-    /**
-     * @param miniId Mini ID to set.
-     */
-    public void miniId(IgniteUuid miniId) {
-        this.miniId = miniId;
-    }
-
-    /**
-     * @return Flag indicating that this request was sent only to near node. 
If this flag is set, no finalizing
-     *      will be executed on receiving (near) node since this is a user 
node.
-     */
-    public boolean nearOnlyCheck() {
-        return nearOnlyCheck;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridCachePessimisticCheckCommittedTxRequest _clone = new 
GridCachePessimisticCheckCommittedTxRequest();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridCachePessimisticCheckCommittedTxRequest _clone = 
(GridCachePessimisticCheckCommittedTxRequest)_msg;
-
-        _clone.futId = futId;
-        _clone.miniId = miniId;
-        _clone.nearXidVer = nearXidVer;
-        _clone.originatingNodeId = originatingNodeId;
-        _clone.originatingThreadId = originatingThreadId;
-        _clone.nearOnlyCheck = nearOnlyCheck;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 8:
-                if (!commState.putGridUuid(futId))
-                    return false;
-
-                commState.idx++;
-
-            case 9:
-                if (!commState.putGridUuid(miniId))
-                    return false;
-
-                commState.idx++;
-
-            case 10:
-                if (!commState.putCacheVersion(nearXidVer))
-                    return false;
-
-                commState.idx++;
-
-            case 11:
-                if (!commState.putUuid(originatingNodeId))
-                    return false;
-
-                commState.idx++;
-
-            case 12:
-                if (!commState.putLong(originatingThreadId))
-                    return false;
-
-                commState.idx++;
-
-            case 13:
-                if (!commState.putBoolean(nearOnlyCheck))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 8:
-                IgniteUuid futId0 = commState.getGridUuid();
-
-                if (futId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                futId = futId0;
-
-                commState.idx++;
-
-            case 9:
-                IgniteUuid miniId0 = commState.getGridUuid();
-
-                if (miniId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                miniId = miniId0;
-
-                commState.idx++;
-
-            case 10:
-                GridCacheVersion nearXidVer0 = commState.getCacheVersion();
-
-                if (nearXidVer0 == CACHE_VER_NOT_READ)
-                    return false;
-
-                nearXidVer = nearXidVer0;
-
-                commState.idx++;
-
-            case 11:
-                UUID originatingNodeId0 = commState.getUuid();
-
-                if (originatingNodeId0 == UUID_NOT_READ)
-                    return false;
-
-                originatingNodeId = originatingNodeId0;
-
-                commState.idx++;
-
-            case 12:
-                if (buf.remaining() < 8)
-                    return false;
-
-                originatingThreadId = commState.getLong();
-
-                commState.idx++;
-
-            case 13:
-                if (buf.remaining() < 1)
-                    return false;
-
-                nearOnlyCheck = commState.getBoolean();
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 20;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCachePessimisticCheckCommittedTxRequest.class, 
this, "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
deleted file mode 100644
index 8b50645..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Check prepared transactions response.
- */
-public class GridCachePessimisticCheckCommittedTxResponse<K, V> extends 
GridDistributedBaseMessage<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Mini future ID. */
-    private IgniteUuid miniId;
-
-    /** Committed transaction info. */
-    @GridDirectTransient
-    private GridCacheCommittedTxInfo<K, V> committedTxInfo;
-
-    /** Serialized transaction info. */
-    private byte[] committedTxInfoBytes;
-
-    /**
-     * Empty constructor required by {@link Externalizable}
-     */
-    public GridCachePessimisticCheckCommittedTxResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param txId Transaction ID.
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     * @param committedTxInfo Committed transaction info.
-     */
-    public GridCachePessimisticCheckCommittedTxResponse(GridCacheVersion txId, 
IgniteUuid futId, IgniteUuid miniId,
-        @Nullable GridCacheCommittedTxInfo<K, V> committedTxInfo) {
-        super(txId, 0);
-
-        this.futId = futId;
-        this.miniId = miniId;
-        this.committedTxInfo = committedTxInfo;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Mini future ID.
-     */
-    public IgniteUuid miniId() {
-        return miniId;
-    }
-
-    /**
-     * @return {@code True} if all remote transactions were prepared.
-     */
-    public GridCacheCommittedTxInfo<K, V> committedTxInfo() {
-        return committedTxInfo;
-    }
-
-    /** {@inheritDoc}
-     * @param ctx*/
-    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (committedTxInfo != null) {
-            marshalTx(committedTxInfo.recoveryWrites(), ctx);
-
-            committedTxInfoBytes = ctx.marshaller().marshal(committedTxInfo);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (committedTxInfoBytes != null) {
-            committedTxInfo = ctx.marshaller().unmarshal(committedTxInfoBytes, 
ldr);
-
-            unmarshalTx(committedTxInfo.recoveryWrites(), false, ctx, ldr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridCachePessimisticCheckCommittedTxResponse _clone = new 
GridCachePessimisticCheckCommittedTxResponse();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridCachePessimisticCheckCommittedTxResponse _clone = 
(GridCachePessimisticCheckCommittedTxResponse)_msg;
-
-        _clone.futId = futId;
-        _clone.miniId = miniId;
-        _clone.committedTxInfo = committedTxInfo;
-        _clone.committedTxInfoBytes = committedTxInfoBytes;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 8:
-                if (!commState.putByteArray(committedTxInfoBytes))
-                    return false;
-
-                commState.idx++;
-
-            case 9:
-                if (!commState.putGridUuid(futId))
-                    return false;
-
-                commState.idx++;
-
-            case 10:
-                if (!commState.putGridUuid(miniId))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 8:
-                byte[] committedTxInfoBytes0 = commState.getByteArray();
-
-                if (committedTxInfoBytes0 == BYTE_ARR_NOT_READ)
-                    return false;
-
-                committedTxInfoBytes = committedTxInfoBytes0;
-
-                commState.idx++;
-
-            case 9:
-                IgniteUuid futId0 = commState.getGridUuid();
-
-                if (futId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                futId = futId0;
-
-                commState.idx++;
-
-            case 10:
-                IgniteUuid miniId0 = commState.getGridUuid();
-
-                if (miniId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                miniId = miniId0;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 21;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCachePessimisticCheckCommittedTxResponse.class, 
this, "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxCommitBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxCommitBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxCommitBuffer.java
deleted file mode 100644
index 26a78f5..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxCommitBuffer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Buffer that stores transaction commit values in order to restore them in 
case of originating node crash.
- */
-public interface GridCacheTxCommitBuffer<K, V> {
-    /**
-     * Adds committed transaction to commit buffer.
-     *
-     * @param tx Committed transaction.
-     */
-    public void addCommittedTx(IgniteTxEx<K, V> tx);
-
-    /**
-     * Gets transaction from commit buffer.
-     *
-     * @param originatingTxVer Originating tx version.
-     * @param nodeId Originating node ID.
-     * @param threadId Originating thread ID.
-     * @return Committed info, if any.
-     */
-    @Nullable public GridCacheCommittedTxInfo<K, V> 
committedTx(GridCacheVersion originatingTxVer, UUID nodeId,
-        long threadId);
-
-    /**
-     * Callback called when lode left grid. Used to eventually cleanup the 
queue from committed tx info from
-     * left node.
-     *
-     * @param nodeId Left node ID.
-     */
-    public void onNodeLeft(UUID nodeId);
-
-    /**
-     * @return Buffer size.
-     */
-    public int size();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 4356dc4..da6ca72 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -73,14 +73,6 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
     @GridDirectTransient
     private List<K> keys;
 
-    /** Write entries. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private List<IgniteTxEntry<K, V>> writeEntries;
-
-    /** Serialized write entries. */
-    private byte[] writeEntriesBytes;
-
     /** Array indicating whether value should be returned for a key. */
     @GridToStringInclude
     private boolean[] retVals;
@@ -102,10 +94,6 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
     /** Partition lock flag. Only if group-lock transaction. */
     private boolean partLock;
 
-    /** DR versions. */
-    @GridToStringInclude
-    private GridCacheVersion[] drVersByIdx;
-
     /**
      * Empty constructor.
      */
@@ -252,13 +240,6 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
     }
 
     /**
-     * @return Write entries list.
-     */
-    public List<IgniteTxEntry<K, V>> writeEntries() {
-        return writeEntries;
-    }
-
-    /**
      * @return Tx size.
      */
     public int txSize() {
@@ -271,19 +252,15 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
      * @param key Key.
      * @param retVal Flag indicating whether value should be returned.
      * @param keyBytes Key bytes.
-     * @param writeEntry Write entry.
      * @param cands Candidates.
-     * @param drVer DR version.
      * @param ctx Context.
      * @throws IgniteCheckedException If failed.
      */
     public void addKeyBytes(
         K key,
         @Nullable byte[] keyBytes,
-        @Nullable IgniteTxEntry<K, V> writeEntry,
         boolean retVal,
         @Nullable Collection<GridCacheMvccCandidate<K>> cands,
-        @Nullable GridCacheVersion drVer,
         GridCacheContext<K, V> ctx
     ) throws IgniteCheckedException {
         if (ctx.deploymentEnabled())
@@ -302,21 +279,9 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
         keys.add(key);
 
         candidatesByIndex(idx, cands);
-        drVersionByIndex(idx, drVer);
 
         retVals[idx] = retVal;
 
-        if (writeEntry != null) {
-            if (writeEntries == null) {
-                assert idx == 0 : "Cannot start adding write entries in the 
middle of lock message [idx=" + idx +
-                    ", writeEntry=" + writeEntry + ']';
-
-                writeEntries = new ArrayList<>(keysCount());
-            }
-
-            writeEntries.add(writeEntry);
-        }
-
         idx++;
     }
 
@@ -355,39 +320,6 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
         return timeout;
     }
 
-    /**
-     * @param idx Key index.
-     * @param drVer DR version.
-     */
-    @SuppressWarnings({"unchecked"})
-    public void drVersionByIndex(int idx, GridCacheVersion drVer) {
-        assert idx < keysCount();
-
-        // If nothing to add.
-        if (drVer == null)
-            return;
-
-        if (drVersByIdx == null)
-            drVersByIdx = new GridCacheVersion[keysCount()];
-
-        drVersByIdx[idx] = drVer;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return DR versions for given key.
-     */
-    public GridCacheVersion drVersionByIndex(int idx) {
-        return drVersByIdx == null ? null : drVersByIdx[idx];
-    }
-
-    /**
-     * @return All DR versions.
-     */
-    public GridCacheVersion[] drVersions() {
-        return drVersByIdx;
-    }
-
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
@@ -399,12 +331,6 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
 
             grpLockKeyBytes = CU.marshal(ctx, grpLockKey);
         }
-
-        if (writeEntries != null) {
-            marshalTx(writeEntries, ctx);
-
-            writeEntriesBytes = ctx.marshaller().marshal(writeEntries);
-        }
     }
 
     /** {@inheritDoc} */
@@ -416,12 +342,6 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
 
         if (grpLockKey == null && grpLockKeyBytes != null)
             grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
-
-        if (writeEntriesBytes != null) {
-            writeEntries = ctx.marshaller().unmarshal(writeEntriesBytes, ldr);
-
-            unmarshalTx(writeEntries, false, ctx, ldr);
-        }
     }
 
     /** {@inheritDoc} */
@@ -452,15 +372,12 @@ public class GridDistributedLockRequest<K, V> extends 
GridDistributedBaseMessage
         _clone.isolation = isolation;
         _clone.keyBytes = keyBytes;
         _clone.keys = keys;
-        _clone.writeEntries = writeEntries;
-        _clone.writeEntriesBytes = writeEntriesBytes;
         _clone.retVals = retVals;
         _clone.idx = idx;
         _clone.txSize = txSize;
         _clone.grpLockKey = grpLockKey;
         _clone.grpLockKeyBytes = grpLockKeyBytes;
         _clone.partLock = partLock;
-        _clone.drVersByIdx = drVersByIdx;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 3792536..389cf30 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -24,7 +24,6 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.util.direct.*;
 import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
@@ -63,24 +62,6 @@ public class GridDistributedTxFinishRequest<K, V> extends 
GridDistributedBaseMes
     /** Min version used as base for completed versions. */
     private GridCacheVersion baseVer;
 
-    /** Transaction write entries. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Collection<IgniteTxEntry<K, V>> writeEntries;
-
-    /** */
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> writeEntriesBytes;
-
-    /** Write entries which have not been transferred to nodes during lock 
request. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Collection<IgniteTxEntry<K, V>> recoveryWrites;
-
-    /** */
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> recoveryWritesBytes;
-
     /** Expected txSize. */
     private int txSize;
 
@@ -113,9 +94,6 @@ public class GridDistributedTxFinishRequest<K, V> extends 
GridDistributedBaseMes
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
      * @param txSize Expected transaction size.
-     * @param writeEntries Write entries.
-     * @param recoveryWrites Recover entries. In pessimistic mode entries 
which were not transferred to remote nodes
-     *      with lock requests. {@code Null} for optimistic mode.
      * @param grpLockKey Group lock key if this is a group-lock transaction.
      */
     public GridDistributedTxFinishRequest(
@@ -132,11 +110,9 @@ public class GridDistributedTxFinishRequest<K, V> extends 
GridDistributedBaseMes
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers,
         int txSize,
-        Collection<IgniteTxEntry<K, V>> writeEntries,
-        Collection<IgniteTxEntry<K, V>> recoveryWrites,
         @Nullable IgniteTxKey grpLockKey
     ) {
-        super(xidVer, writeEntries == null ? 0 : writeEntries.size());
+        super(xidVer, 0);
         assert xidVer != null;
 
         this.futId = futId;
@@ -149,36 +125,12 @@ public class GridDistributedTxFinishRequest<K, V> extends 
GridDistributedBaseMes
         this.syncRollback = syncRollback;
         this.baseVer = baseVer;
         this.txSize = txSize;
-        this.writeEntries = writeEntries;
-        this.recoveryWrites = recoveryWrites;
         this.grpLockKey = grpLockKey;
 
         completedVersions(committedVers, rolledbackVers);
     }
 
     /**
-     * Clones write entries so that near entries are not passed to DHT cache.
-     */
-    public void cloneEntries() {
-        if (F.isEmpty(writeEntries))
-            return;
-
-        Collection<IgniteTxEntry<K, V>> cp = new 
ArrayList<>(writeEntries.size());
-
-        for (IgniteTxEntry<K, V> e : writeEntries) {
-            GridCacheContext<K, V> cacheCtx = e.context();
-
-            // Clone only if it is a near cache.
-            if (cacheCtx.isNear())
-                cp.add(e.cleanCopy(cacheCtx.nearTx().dht().context()));
-            else
-                cp.add(e);
-        }
-
-        writeEntries = cp;
-    }
-
-    /**
      * @return System flag.
      */
     public boolean system() {
@@ -290,24 +242,6 @@ public class GridDistributedTxFinishRequest<K, V> extends 
GridDistributedBaseMes
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (writeEntries != null) {
-            marshalTx(writeEntries, ctx);
-
-            writeEntriesBytes = new ArrayList<>(writeEntries.size());
-
-            for (IgniteTxEntry<K, V> e : writeEntries)
-                writeEntriesBytes.add(ctx.marshaller().marshal(e));
-        }
-
-        if (recoveryWrites != null) {
-            marshalTx(recoveryWrites, ctx);
-
-            recoveryWritesBytes = new ArrayList<>(recoveryWrites.size());
-
-            for (IgniteTxEntry<K, V> e : recoveryWrites)
-                recoveryWritesBytes.add(ctx.marshaller().marshal(e));
-        }
-
         if (grpLockKey != null && grpLockKeyBytes == null) {
             if (ctx.deploymentEnabled())
                 prepareObject(grpLockKey, ctx);
@@ -320,24 +254,6 @@ public class GridDistributedTxFinishRequest<K, V> extends 
GridDistributedBaseMes
     @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (writeEntriesBytes != null) {
-            writeEntries = new ArrayList<>(writeEntriesBytes.size());
-
-            for (byte[] arr : writeEntriesBytes)
-                writeEntries.add(ctx.marshaller().<IgniteTxEntry<K, 
V>>unmarshal(arr, ldr));
-
-            unmarshalTx(writeEntries, false, ctx, ldr);
-        }
-
-        if (recoveryWritesBytes != null) {
-            recoveryWrites = new ArrayList<>(recoveryWritesBytes.size());
-
-            for (byte[] arr : recoveryWritesBytes)
-                recoveryWrites.add(ctx.marshaller().<IgniteTxEntry<K, 
V>>unmarshal(arr, ldr));
-
-            unmarshalTx(recoveryWrites, false, ctx, ldr);
-        }
-
         if (grpLockKeyBytes != null && grpLockKey == null)
             grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
     }
@@ -365,10 +281,6 @@ public class GridDistributedTxFinishRequest<K, V> extends 
GridDistributedBaseMes
         _clone.invalidate = invalidate;
         _clone.commit = commit;
         _clone.baseVer = baseVer;
-        _clone.writeEntries = writeEntries;
-        _clone.writeEntriesBytes = writeEntriesBytes;
-        _clone.recoveryWrites = recoveryWrites;
-        _clone.recoveryWritesBytes = recoveryWritesBytes;
         _clone.txSize = txSize;
         _clone.grpLockKey = grpLockKey;
         _clone.grpLockKeyBytes = grpLockKeyBytes;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index f0655b9..5fc3607 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -53,7 +53,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
 
     /** Commit version for EC transactions. */
     @GridToStringInclude
-    private GridCacheVersion commitVer;
+    private GridCacheVersion writeVer;
 
     /** Transaction timeout. */
     @GridToStringInclude
@@ -112,6 +112,9 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
     /** */
     private byte[] txNodesBytes;
 
+    /** One phase commit flag. */
+    private boolean onePhaseCommit;
+
     /** System flag. */
     private boolean sys;
 
@@ -129,6 +132,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
      * @param grpLockKey Group lock key.
      * @param partLock {@code True} if preparing group-lock transaction with 
partition lock.
      * @param txNodes Transaction nodes mapping.
+     * @param onePhaseCommit One phase commit flag.
      */
     public GridDistributedTxPrepareRequest(
         IgniteTxEx<K, V> tx,
@@ -136,11 +140,12 @@ public class GridDistributedTxPrepareRequest<K, V> 
extends GridDistributedBaseMe
         Collection<IgniteTxEntry<K, V>> writes,
         IgniteTxKey grpLockKey,
         boolean partLock,
-        Map<UUID, Collection<UUID>> txNodes
+        Map<UUID, Collection<UUID>> txNodes,
+        boolean onePhaseCommit
     ) {
         super(tx.xidVersion(), 0);
 
-        commitVer = null;
+        writeVer = tx.writeVersion();
         threadId = tx.threadId();
         concurrency = tx.concurrency();
         isolation = tx.isolation();
@@ -154,6 +159,29 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
         this.grpLockKey = grpLockKey;
         this.partLock = partLock;
         this.txNodes = txNodes;
+        this.onePhaseCommit = onePhaseCommit;
+    }
+
+    /**
+     * Clones write entries so that near entries are not passed to DHT cache.
+     */
+    public void cloneEntries() {
+        if (F.isEmpty(writes))
+            return;
+
+        Collection<IgniteTxEntry<K, V>> cp = new ArrayList<>(writes.size());
+
+        for (IgniteTxEntry<K, V> e : writes) {
+            GridCacheContext<K, V> cacheCtx = e.context();
+
+            // Clone only if it is a near cache.
+            if (cacheCtx.isNear())
+                cp.add(e.cleanCopy(cacheCtx.nearTx().dht().context()));
+            else
+                cp.add(e);
+        }
+
+        writes = cp;
     }
 
     /**
@@ -200,7 +228,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
     /**
      * @return Commit version.
      */
-    public GridCacheVersion commitVersion() { return commitVer; }
+    public GridCacheVersion writeVersion() { return writeVer; }
 
     /**
      * @return Invalidate flag.
@@ -277,6 +305,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
         return txSize;
     }
 
+    /**
+     * @return One phase commit flag.
+     */
+    public boolean onePhaseCommit() {
+        return onePhaseCommit;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
@@ -420,7 +455,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
         _clone.threadId = threadId;
         _clone.concurrency = concurrency;
         _clone.isolation = isolation;
-        _clone.commitVer = commitVer;
+        _clone.writeVer = writeVer;
         _clone.timeout = timeout;
         _clone.invalidate = invalidate;
         _clone.reads = reads;
@@ -455,7 +490,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
 
         switch (commState.idx) {
             case 8:
-                if (!commState.putCacheVersion(commitVer))
+                if (!commState.putCacheVersion(writeVer))
                     return false;
 
                 commState.idx++;
@@ -599,7 +634,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMe
                 if (commitVer0 == CACHE_VER_NOT_READ)
                     return false;
 
-                commitVer = commitVer0;
+                writeVer = commitVer0;
 
                 commState.idx++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 9d07c52..a3ea0502 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -503,34 +503,33 @@ public class GridDistributedTxRemoteAdapter<K, V> extends 
IgniteTxAdapter<K, V>
 
                                     GridCacheVersion explicitVer = 
txEntry.drVersion();
 
-                                    if (finalizationStatus() == 
FinalizationStatus.RECOVERY_FINISH || optimistic()) {
-                                        // Primary node has left the grid so 
we have to process conflicts on backups.
-                                        if (explicitVer == null)
-                                            explicitVer = writeVersion(); // 
Force write version to be used.
-
-                                        GridDrResolveResult<V> drRes = 
cacheCtx.dr().resolveTx(cached,
-                                            txEntry,
-                                            explicitVer,
-                                            op,
-                                            val,
-                                            valBytes,
-                                            txEntry.ttl(),
-                                            txEntry.drExpireTime());
-
-                                        if (drRes != null) {
-                                            op = drRes.operation();
-                                            val = drRes.value();
-                                            valBytes = drRes.valueBytes();
-
-                                            if (drRes.isMerge())
-                                                explicitVer = writeVersion();
-                                            else if (op == NOOP)
-                                                txEntry.ttl(-1L);
-                                        }
-                                        else
-                                            // Nullify explicit version so 
that innerSet/innerRemove will work as usual.
-                                            explicitVer = null;
+
+                                    // Primary node has left the grid so we 
have to process conflicts on backups.
+                                    if (explicitVer == null)
+                                        explicitVer = writeVersion(); // Force 
write version to be used.
+
+                                    GridDrResolveResult<V> drRes = 
cacheCtx.dr().resolveTx(cached,
+                                        txEntry,
+                                        explicitVer,
+                                        op,
+                                        val,
+                                        valBytes,
+                                        txEntry.ttl(),
+                                        txEntry.drExpireTime());
+
+                                    if (drRes != null) {
+                                        op = drRes.operation();
+                                        val = drRes.value();
+                                        valBytes = drRes.valueBytes();
+
+                                        if (drRes.isMerge())
+                                            explicitVer = writeVersion();
+                                        else if (op == NOOP)
+                                            txEntry.ttl(-1L);
                                     }
+                                    else
+                                        // Nullify explicit version so that 
innerSet/innerRemove will work as usual.
+                                        explicitVer = null;
 
                                     if (op == CREATE || op == UPDATE) {
                                         // Invalidate only for near nodes 
(backups cannot be invalidated).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 74d9163..adba497 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -779,8 +779,8 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
             }
 
             if (tx != null) {
-                tx.addDhtMapping(dhtMap);
-                tx.addNearMapping(nearMap);
+                tx.addDhtNodeEntryMapping(dhtMap);
+                tx.addNearNodeEntryMapping(nearMap);
 
                 tx.needsCompletedVersions(hasRmtNodes);
             }
@@ -851,6 +851,29 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
                             // Must unswap entry so that isNewLocked returns 
correct value.
                             e.unswap(true, false);
 
+                            boolean needVal = false;
+
+                            try {
+                                needVal = e.isNewLocked();
+
+                                if (needVal) {
+                                    List<ClusterNode> owners = 
cctx.topology().owners(e.partition(),
+                                        tx != null ? tx.topologyVersion() : 
cctx.affinity().affinityTopologyVersion());
+
+                                    // Do not preload if local node is 
partition owner.
+                                    if (owners.contains(cctx.localNode()))
+                                        needVal = false;
+                                }
+                            }
+                            catch (GridCacheEntryRemovedException ex) {
+                                assert false : "Entry cannot become obsolete 
when DHT local candidate is added " +
+                                    "[e=" + e + ", ex=" + ex + ']';
+                            }
+
+                            // Skip entry if it is not new and is not present 
in updated mapping.
+                            if (tx != null && !needVal)
+                                continue;
+
                             boolean invalidateRdr = e.readerId(n.id()) != null;
 
                             IgniteTxEntry<K, V> entry = tx != null ? 
tx.entry(e.txKey()) : null;
@@ -858,20 +881,12 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
                             req.addDhtKey(
                                 e.key(),
                                 e.getOrMarshalKeyBytes(),
-                                tx != null ? tx.writeMap().get(e.txKey()) : 
null,
-                                entry != null ? entry.drVersion() : null,
                                 invalidateRdr,
                                 cctx);
 
-                            try {
-                                if (e.isNewLocked())
-                                    // Mark last added key as needed to be 
preloaded.
-                                    req.markLastKeyForPreload();
-                            }
-                            catch (GridCacheEntryRemovedException ex) {
-                                assert false : "Entry cannot become obsolete 
when DHT local candidate is added " +
-                                    "[e=" + e + ", ex=" + ex + ']';
-                            }
+                            if (needVal)
+                                // Mark last added key as needed to be 
preloaded.
+                                req.markLastKeyForPreload();
 
                             it.set(addOwned(req, e));
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index a4efd13..050748a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -237,8 +237,6 @@ public class GridDhtLockRequest<K, V> extends 
GridDistributedLockRequest<K, V> {
      *
      * @param key Key.
      * @param keyBytes Key bytes.
-     * @param writeEntry Write entry.
-     * @param drVer DR version.
      * @param invalidateEntry Flag indicating whether node should attempt to 
invalidate reader.
      * @param ctx Context.
      * @throws IgniteCheckedException If failed.
@@ -246,14 +244,12 @@ public class GridDhtLockRequest<K, V> extends 
GridDistributedLockRequest<K, V> {
     public void addDhtKey(
         K key,
         byte[] keyBytes,
-        IgniteTxEntry<K, V> writeEntry,
-        @Nullable GridCacheVersion drVer,
         boolean invalidateEntry,
         GridCacheContext<K, V> ctx
     ) throws IgniteCheckedException {
         invalidateEntries.set(idx, invalidateEntry);
 
-        addKeyBytes(key, keyBytes, writeEntry, false, null, drVer, ctx);
+        addKeyBytes(key, keyBytes, false, null, ctx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index f6e69eb..4e06bc7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -136,8 +136,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
         GridDhtLockResponse<K, V> res)
         throws IgniteCheckedException, GridDistributedLockCancelledException {
         List<K> keys = req.keys();
-        List<IgniteTxEntry<K, V>> writes = req.writeEntries();
-
         GridDhtTxRemote<K, V> tx = null;
 
         int size = F.size(keys);
@@ -150,12 +148,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
 
             IgniteTxKey<K> txKey = ctx.txKey(key);
 
-            IgniteTxEntry<K, V> writeEntry = writes == null ? null : 
writes.get(i);
-
             assert F.isEmpty(req.candidatesByIndex(i));
 
-            GridCacheVersion drVer = req.drVersionByIndex(i);
-
             if (log.isDebugEnabled())
                 log.debug("Unmarshalled key: " + key);
 
@@ -218,13 +212,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
 
                             tx.addWrite(
                                 ctx,
-                                writeEntry == null ? NOOP : writeEntry.op(),
+                                NOOP,
                                 txKey,
                                 req.keyBytes() != null ? req.keyBytes().get(i) 
: null,
-                                writeEntry == null ? null : writeEntry.value(),
-                                writeEntry == null ? null : 
writeEntry.valueBytes(),
-                                writeEntry == null ? null : 
writeEntry.entryProcessors(),
-                                drVer,
+                                null,
+                                null,
+                                null,
                                 req.accessTtl());
 
                             if (req.groupLock())
@@ -828,14 +821,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
                             if (log.isDebugEnabled())
                                 log.debug("Performing DHT lock [tx=" + tx + ", 
entries=" + entries + ']');
 
-                            assert req.writeEntries() == null || 
req.writeEntries().size() == entries.size();
-
                             IgniteFuture<GridCacheReturn<V>> txFut = 
tx.lockAllAsync(
                                 cacheCtx,
                                 entries,
-                                req.writeEntries(),
                                 req.onePhaseCommit(),
-                                req.drVersions(),
                                 req.messageId(),
                                 req.implicitTx(),
                                 req.txRead(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index ea68904..fdf6e87 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -92,8 +92,6 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
     public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, 
GridDhtTxLocalAdapter<K, V> tx, boolean commit) {
         super(cctx.kernalContext(), F.<IgniteTx>identityReducer(tx));
 
-        assert cctx != null;
-
         this.cctx = cctx;
         this.tx = tx;
         this.commit = commit;
@@ -283,6 +281,9 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
      */
     private boolean finish(Map<UUID, GridDistributedTxMapping<K, V>> dhtMap,
         Map<UUID, GridDistributedTxMapping<K, V>> nearMap) {
+        if (tx.onePhaseCommit())
+            return false;
+
         boolean res = false;
 
         boolean sync = commit ? tx.syncCommit() : tx.syncRollback();
@@ -323,10 +324,6 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
                 tx.rolledbackVersions(),
                 tx.pendingVersions(),
                 tx.size(),
-                tx.pessimistic() ? dhtMapping.writes() : null,
-                tx.pessimistic() && nearMapping != null ? nearMapping.writes() 
: null,
-                tx.recoveryWrites(),
-                tx.onePhaseCommit(),
                 tx.groupLockKey(),
                 tx.subjectId(),
                 tx.taskNameHash());
@@ -345,9 +342,6 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
                 }
             }
 
-            if (tx.onePhaseCommit())
-                req.writeVersion(tx.writeVersion());
-
             try {
                 cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : 
SYSTEM_POOL);
 
@@ -395,10 +389,6 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
                     tx.rolledbackVersions(),
                     tx.pendingVersions(),
                     tx.size(),
-                    null,
-                    tx.pessimistic() ? nearMapping.writes() : null,
-                    tx.recoveryWrites(),
-                    tx.onePhaseCommit(),
                     tx.groupLockKey(),
                     tx.subjectId(),
                     tx.taskNameHash());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index da02065..1d311d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -47,15 +47,6 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
     /** Transaction isolation. */
     private IgniteTxIsolation isolation;
 
-    /** Near writes. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Collection<IgniteTxEntry<K, V>> nearWrites;
-
-    /** Serialized near writes. */
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> nearWritesBytes;
-
     /** Mini future ID. */
     private IgniteUuid miniId;
 
@@ -70,9 +61,6 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
     @GridDirectCollection(GridCacheVersion.class)
     private Collection<GridCacheVersion> pendingVers;
 
-    /** One phase commit flag for fast-commit path. */
-    private boolean onePhaseCommit;
-
     /** One phase commit write version. */
     private GridCacheVersion writeVer;
 
@@ -117,10 +105,6 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
      * @param rolledbackVers Rolled back versions.
      * @param pendingVers Pending versions.
      * @param txSize Expected transaction size.
-     * @param writes Write entries.
-     * @param nearWrites Near cache writes.
-     * @param recoverWrites Recovery write entries.
-     * @param onePhaseCommit One phase commit flag.
      * @param grpLockKey Group lock key.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
@@ -145,16 +129,12 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
         Collection<GridCacheVersion> rolledbackVers,
         Collection<GridCacheVersion> pendingVers,
         int txSize,
-        Collection<IgniteTxEntry<K, V>> writes,
-        Collection<IgniteTxEntry<K, V>> nearWrites,
-        Collection<IgniteTxEntry<K, V>> recoverWrites,
-        boolean onePhaseCommit,
         @Nullable IgniteTxKey grpLockKey,
         @Nullable UUID subjId,
         int taskNameHash
     ) {
         super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, 
syncCommit, syncRollback, baseVer,
-            committedVers, rolledbackVers, txSize, writes, recoverWrites, 
grpLockKey);
+            committedVers, rolledbackVers, txSize, grpLockKey);
 
         assert miniId != null;
         assert nearNodeId != null;
@@ -164,10 +144,8 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
         this.topVer = topVer;
         this.nearNodeId = nearNodeId;
         this.isolation = isolation;
-        this.nearWrites = nearWrites;
         this.miniId = miniId;
         this.sysInvalidate = sysInvalidate;
-        this.onePhaseCommit = onePhaseCommit;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
     }
@@ -178,13 +156,6 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
     }
 
     /**
-     * @return Near writes.
-     */
-    public Collection<IgniteTxEntry<K, V>> nearWrites() {
-        return nearWrites == null ? Collections.<IgniteTxEntry<K, 
V>>emptyList() : nearWrites;
-    }
-
-    /**
      * @return Mini ID.
      */
     public IgniteUuid miniId() {
@@ -227,13 +198,6 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
     }
 
     /**
-     * @return One phase commit flag.
-     */
-    public boolean onePhaseCommit() {
-        return onePhaseCommit;
-    }
-
-    /**
      * @return Write version for one-phase commit transactions.
      */
     public GridCacheVersion writeVersion() {
@@ -314,35 +278,6 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
         return nearTtls;
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
-    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (nearWrites != null) {
-            marshalTx(nearWrites, ctx);
-
-            nearWritesBytes = new ArrayList<>(nearWrites.size());
-
-            for (IgniteTxEntry<K, V> e : nearWrites)
-                nearWritesBytes.add(ctx.marshaller().marshal(e));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (nearWritesBytes != null) {
-            nearWrites = new ArrayList<>(nearWritesBytes.size());
-
-            for (byte[] arr : nearWritesBytes)
-                nearWrites.add(ctx.marshaller().<IgniteTxEntry<K, 
V>>unmarshal(arr, ldr));
-
-            unmarshalTx(nearWrites, true, ctx, ldr);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtTxFinishRequest.class, this, 
super.toString());
@@ -366,13 +301,10 @@ public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest
 
         _clone.nearNodeId = nearNodeId;
         _clone.isolation = isolation;
-        _clone.nearWrites = nearWrites;
-        _clone.nearWritesBytes = nearWritesBytes;
         _clone.miniId = miniId;
         _clone.sysInvalidate = sysInvalidate;
         _clone.topVer = topVer;
         _clone.pendingVers = pendingVers;
-        _clone.onePhaseCommit = onePhaseCommit;
         _clone.writeVer = writeVer;
         _clone.subjId = subjId;
         _clone.taskNameHash = taskNameHash;

Reply via email to